You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2015/03/10 17:53:38 UTC

svn commit: r1665614 - in /qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java: bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/ broker-core/src/main/java/...

Author: rgodfrey
Date: Tue Mar 10 16:53:37 2015
New Revision: 1665614

URL: http://svn.apache.org/r1665614
Log:
Refactor use of futures

Modified:
    qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java
    qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHAVirtualHostNodeImpl.java
    qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java
    qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/AbstractConfiguredObject.java
    qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/InternalBrokerHolder.java

Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java?rev=1665614&r1=1665613&r2=1665614&view=diff
==============================================================================
--- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java (original)
+++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java Tue Mar 10 16:53:37 2015
@@ -163,6 +163,8 @@ public class ReplicatedEnvironmentFacade
          * with NO_SYN durability in case if such Node crushes.
          */
         put(ReplicationConfig.LOG_FLUSH_TASK_INTERVAL, "1 min");
+
+        put(ReplicationConfig.CONSISTENCY_POLICY, "TimeConsistencyPolicy(1 s,30 s)");
     }});
 
     public static final String PERMITTED_NODE_LIST = "permittedNodes";

Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHAVirtualHostNodeImpl.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHAVirtualHostNodeImpl.java?rev=1665614&r1=1665613&r2=1665614&view=diff
==============================================================================
--- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHAVirtualHostNodeImpl.java (original)
+++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHAVirtualHostNodeImpl.java Tue Mar 10 16:53:37 2015
@@ -429,18 +429,14 @@ public class BDBHAVirtualHostNodeImpl ex
     @StateTransition( currentState = { State.ACTIVE, State.STOPPED, State.ERRORED}, desiredState = State.DELETED )
     protected ListenableFuture<Void> doDelete()
     {
-        final SettableFuture<Void> returnVal = SettableFuture.create();
 
         // get helpers before close. on close all children are closed and not available anymore
         final Set<InetSocketAddress> helpers = getRemoteNodeAddresses();
-        final ListenableFuture<Void> superFuture = super.doDelete();
-        superFuture.addListener(new Runnable()
+        return doAfter(super.doDelete(),new Runnable()
         {
             @Override
             public void run()
             {
-                try
-                {
                     if (getConfigurationStore() != null)
                     {
                         getEventLogger().message(getVirtualHostNodeLogSubject(), HighAvailabilityMessages.DELETED());
@@ -458,15 +454,11 @@ public class BDBHAVirtualHostNodeImpl ex
                                         + ". To finish deletion a removal of the node from any of remote nodes (" + helpers + ") is required.");
                         }
                     }
-                }
-                finally
-                {
-                    returnVal.set(null);
-                }
+
             }
-        }, getTaskExecutor().getExecutor());
+        });
+
 
-        return returnVal;
     }
 
     @Override
@@ -706,23 +698,15 @@ public class BDBHAVirtualHostNodeImpl ex
         final VirtualHost<?,?,?> virtualHost = getVirtualHost();
         if (virtualHost!= null)
         {
-            final SettableFuture<Void> returnVal = SettableFuture.create();
-            virtualHost.closeAsync().addListener(new Runnable()
+            return doAfter(virtualHost.closeAsync(), new Runnable()
             {
                 @Override
                 public void run()
                 {
-                    try
-                    {
                         childRemoved(virtualHost);
-                    }
-                    finally
-                    {
-                        returnVal.set(null);
-                    }
+
                 }
-            }, getTaskExecutor().getExecutor());
-            return returnVal;
+            });
         }
         else
         {

Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java?rev=1665614&r1=1665613&r2=1665614&view=diff
==============================================================================
--- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java (original)
+++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java Tue Mar 10 16:53:37 2015
@@ -30,6 +30,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.UUID;
+import java.util.concurrent.Callable;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.CopyOnWriteArrayList;
@@ -37,13 +38,14 @@ import java.util.concurrent.atomic.Atomi
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 
+import com.google.common.util.concurrent.FutureCallback;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.SettableFuture;
 import org.apache.log4j.Logger;
 
 import org.apache.qpid.exchange.ExchangeDefaults;
 import org.apache.qpid.server.binding.BindingImpl;
-import org.apache.qpid.server.configuration.updater.Task;
 import org.apache.qpid.server.logging.EventLogger;
 import org.apache.qpid.server.logging.LogSubject;
 import org.apache.qpid.server.logging.messages.ExchangeMessages;
@@ -607,15 +609,15 @@ public abstract class AbstractExchange<T
     @Override
     public boolean addBinding(final String bindingKey, final AMQQueue queue, final Map<String, Object> arguments)
     {
-        return getTaskExecutor().run(new Task<Boolean>()
-                                    {
+        return doSync(doOnConfigThread(new Callable<ListenableFuture<Boolean>>()
+        {
+            @Override
+            public ListenableFuture<Boolean> call() throws Exception
+            {
+                return makeBindingAsync(null, bindingKey, queue, arguments, false);
+            }
+        }));
 
-                                        @Override
-                                        public Boolean execute()
-                                        {
-                                            return makeBinding(null, bindingKey, queue, arguments, false);
-                                        }
-                                    });
 
     }
 
@@ -624,12 +626,20 @@ public abstract class AbstractExchange<T
                                   final AMQQueue queue,
                                   final Map<String, Object> arguments)
     {
-        final BindingImpl existingBinding = getBinding(bindingKey, queue);
-        return makeBinding(existingBinding == null ? null : existingBinding.getId(),
-                           bindingKey,
-                           queue,
-                           arguments,
-                           true);
+        return doSync(doOnConfigThread(new Callable<ListenableFuture<Boolean>>()
+        {
+            @Override
+            public ListenableFuture<Boolean> call() throws Exception
+            {
+
+                final BindingImpl existingBinding = getBinding(bindingKey, queue);
+                return makeBindingAsync(existingBinding == null ? null : existingBinding.getId(),
+                                   bindingKey,
+                                   queue,
+                                   arguments,
+                                   true);
+            }
+        }));
     }
 
 
@@ -680,7 +690,7 @@ public abstract class AbstractExchange<T
         return _bindingsMap.get(new BindingIdentifier(bindingKey,queue));
     }
 
-    private boolean makeBinding(UUID id,
+    private ListenableFuture<Boolean> makeBindingAsync(UUID id,
                                 String bindingKey,
                                 AMQQueue queue,
                                 Map<String, Object> arguments,
@@ -714,23 +724,45 @@ public abstract class AbstractExchange<T
                 attributes.put(Binding.ID, id);
                 attributes.put(Binding.ARGUMENTS, arguments);
 
-                BindingImpl b = new BindingImpl(attributes, queue, this);
-                // TODO - RG - Fix Bindings
-                b.createAsync(); // Must be called before addBinding as it resolves automated attributes.
+                final BindingImpl b = new BindingImpl(attributes, queue, this);
 
-                addBinding(b);
-                return true;
+                final SettableFuture<Boolean> returnVal = SettableFuture.create();
+
+                Futures.addCallback(b.createAsync(), new FutureCallback<Void>()
+                {
+                    @Override
+                    public void onSuccess(final Void result)
+                    {
+                        try
+                        {
+                            addBinding(b);
+                            returnVal.set(true);
+                        }
+                        catch(Throwable t)
+                        {
+                            returnVal.setException(t);
+                        }
+                    }
+
+                    @Override
+                    public void onFailure(final Throwable t)
+                    {
+                        returnVal.setException(t);
+                    }
+                }, getTaskExecutor().getExecutor()); // Must be called before addBinding as it resolves automated attributes.
+
+                return returnVal;
             }
             else if(force)
             {
                 Map<String,Object> oldArguments = existingMapping.getArguments();
                 existingMapping.setArguments(arguments);
                 onBindingUpdated(existingMapping, oldArguments);
-                return true;
+                return Futures.immediateFuture(true);
             }
             else
             {
-                return false;
+                return Futures.immediateFuture(false);
             }
         }
     }

Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/AbstractConfiguredObject.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/AbstractConfiguredObject.java?rev=1665614&r1=1665613&r2=1665614&view=diff
==============================================================================
--- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/AbstractConfiguredObject.java (original)
+++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/AbstractConfiguredObject.java Tue Mar 10 16:53:37 2015
@@ -41,15 +41,18 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.UUID;
+import java.util.concurrent.Callable;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executor;
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
 
 import javax.security.auth.Subject;
 
+import com.google.common.util.concurrent.AbstractFuture;
 import com.google.common.util.concurrent.FutureCallback;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
@@ -454,48 +457,74 @@ public abstract class AbstractConfigured
 
     public final ListenableFuture<Void> openAsync()
     {
-        final SettableFuture<Void> returnVal = SettableFuture.create();
+        return doOnConfigThread(new Callable<ListenableFuture<Void>>()
+                                {
+                                    @Override
+                                    public ListenableFuture<Void> call() throws Exception
+                                    {
+                                        if (_dynamicState.compareAndSet(DynamicState.UNINIT, DynamicState.OPENED))
+                                        {
+                                            _openFailed = false;
+                                            OpenExceptionHandler exceptionHandler = new OpenExceptionHandler();
+                                            try
+                                            {
+                                                doResolution(true, exceptionHandler);
+                                                doValidation(true, exceptionHandler);
+                                                doOpening(true, exceptionHandler);
+                                                return doAttainState(exceptionHandler);
+                                            }
+                                            catch (RuntimeException e)
+                                            {
+                                                exceptionHandler.handleException(e, AbstractConfiguredObject.this);
+                                                return Futures.immediateFuture(null);
+                                            }
+                                        }
+                                        else
+                                        {
+                                            return Futures.immediateFuture(null);
+                                        }
+
+                                    }
+                                });
 
-        _taskExecutor.run(new VoidTask()
+    }
+
+    protected final <T> ListenableFuture<T> doOnConfigThread(final Callable<ListenableFuture<T>> action)
+    {
+        final SettableFuture<T> returnVal = SettableFuture.create();
+
+        _taskExecutor.submit(new Task<Void>()
         {
 
             @Override
-            public void execute()
+            public Void execute()
             {
-                if (_dynamicState.compareAndSet(DynamicState.UNINIT, DynamicState.OPENED))
+                try
                 {
-                    _openFailed = false;
-                    OpenExceptionHandler exceptionHandler = new OpenExceptionHandler();
-                    try
-                    {
-                        doResolution(true, exceptionHandler);
-                        doValidation(true, exceptionHandler);
-                        doOpening(true, exceptionHandler);
-                        doAttainState(exceptionHandler).addListener(
-                                new Runnable()
-                                {
-                                    @Override
-                                    public void run()
-                                    {
-                                        returnVal.set(null);
-                                    }
-                                }, MoreExecutors.sameThreadExecutor()
-                                                                   );
-                    }
-                    catch (RuntimeException e)
+                    Futures.addCallback(action.call(), new FutureCallback<T>()
                     {
-                        exceptionHandler.handleException(e, AbstractConfiguredObject.this);
-                        returnVal.set(null);
-                    }
+                        @Override
+                        public void onSuccess(final T result)
+                        {
+                            returnVal.set(result);
+                        }
+
+                        @Override
+                        public void onFailure(final Throwable t)
+                        {
+                            returnVal.setException(t);
+                        }
+                    });
                 }
-                else
+                catch (Exception e)
                 {
-                    returnVal.set(null);
+                    returnVal.setException(e);
                 }
+                return null;
             }
         });
-        return returnVal;
 
+        return returnVal;
     }
 
 
@@ -558,13 +587,25 @@ public abstract class AbstractConfigured
             {
                 counter.incrementCount();
                 ListenableFuture<Void> close = child.closeAsync();
-                close.addListener(new Runnable()
+                Futures.addCallback(close, new FutureCallback<Void>()
                 {
                     @Override
-                    public void run()
+                    public void onSuccess(final Void result)
                     {
                         counter.decrementCount();
                     }
+
+                    @Override
+                    public void onFailure(final Throwable t)
+                    {
+                        LOGGER.error("Exception occurred while closing "
+                                     + child.getClass().getSimpleName()
+                                     + " : '"
+                                     + child.getName()
+                                     + "'", t);
+                        // No need to decrement counter as setting the exception will complete the future
+                        returnVal.setException(t);
+                    }
                 }, MoreExecutors.sameThreadExecutor());
             }
         });
@@ -598,70 +639,48 @@ public abstract class AbstractConfigured
     @Override
     public final ListenableFuture<Void> closeAsync()
     {
-        LOGGER.debug("Closing " + getClass().getSimpleName() + " : " + getName());
-        if(_dynamicState.compareAndSet(DynamicState.OPENED, DynamicState.CLOSED))
+        return doOnConfigThread(new Callable<ListenableFuture<Void>>()
         {
-            final SettableFuture<Void> returnVal = SettableFuture.create();
-
-            final ListenableFuture<Void> beforeClose = beforeClose();
-
-            if(beforeClose != null)
+            @Override
+            public ListenableFuture<Void> call() throws Exception
             {
-                beforeClose.addListener(new Runnable()
+                LOGGER.debug("Closing " + getClass().getSimpleName() + " : " + getName());
+
+                if(_dynamicState.compareAndSet(DynamicState.OPENED, DynamicState.CLOSED))
                 {
-                    @Override
-                    public void run()
+
+                    return doAfter(beforeClose(), new Callable<ListenableFuture<Void>>()
                     {
-                        final ListenableFuture<Void> childCloseFuture = closeChildren();
-                        childCloseFuture.addListener(new Runnable()
+                        @Override
+                        public ListenableFuture<Void> call() throws Exception
                         {
-                            @Override
-                            public void run()
+                            return closeChildren();
+                        }
+                    }).then(new Runnable()
                             {
-                                onClose();
-                                unregister(false);
-                                LOGGER.debug("Closed " + AbstractConfiguredObject.this.getClass().getSimpleName() + " : " + getName());
-                                returnVal.set(null);
-                            }
-                        }, getTaskExecutor().getExecutor());
-                    }
-                }, getTaskExecutor().getExecutor());
-            }
-            else
-            {
-                final ListenableFuture<Void> childCloseFuture = closeChildren();
-                childCloseFuture.addListener(new Runnable()
+                                @Override
+                                public void run()
+                                {
+                                    onClose();
+                                    unregister(false);
+                                    LOGGER.debug("Closed " + AbstractConfiguredObject.this.getClass().getSimpleName() + " : " + getName());
+                                }
+                            });
+                }
+                else
                 {
-                    @Override
-                    public void run()
-                    {
+                    LOGGER.debug("Closed " + getClass().getSimpleName() + " : " + getName());
 
-                        onClose();
-                        unregister(false);
-                        LOGGER.debug("Closed "
-                                     + AbstractConfiguredObject.this.getClass().getSimpleName()
-                                     + " : "
-                                     + getName());
-                        returnVal.set(null);
-                    }
-                }, getTaskExecutor().getExecutor());
+                    return Futures.immediateFuture(null);
+                }
             }
+        });
 
-            return returnVal;
-
-
-        }
-        else
-        {
-            LOGGER.debug("Closed " + getClass().getSimpleName() + " : " + getName());
-
-            return Futures.immediateFuture(null);
-        }
     }
 
     protected ListenableFuture<Void> beforeClose()
     {
-        return null;
+        return Futures.immediateFuture(null);
     }
 
     protected void onClose()
@@ -675,15 +694,11 @@ public abstract class AbstractConfigured
 
     public final ListenableFuture<Void> createAsync()
     {
-        final SettableFuture<Void> returnVal = SettableFuture.create();
-
-        _taskExecutor.run(new VoidTask()
+        return doOnConfigThread(new Callable<ListenableFuture<Void>>()
         {
-
             @Override
-            public void execute()
+            public ListenableFuture<Void> call() throws Exception
             {
-
                 if (_dynamicState.compareAndSet(DynamicState.UNINIT, DynamicState.OPENED))
                 {
                     final AuthenticatedPrincipal currentUser = SecurityManager.getCurrentUser();
@@ -716,42 +731,23 @@ public abstract class AbstractConfigured
 
                     final AbstractConfiguredObjectExceptionHandler unregisteringExceptionHandler =
                             new CreateExceptionHandler(true);
+
                     try
                     {
                         doCreation(true, unregisteringExceptionHandler);
                         doOpening(true, unregisteringExceptionHandler);
-                        Futures.addCallback(doAttainState(unregisteringExceptionHandler),
-                                            new FutureCallback<Void>()
-                                            {
-                                                @Override
-                                                public void onSuccess(final Void result)
-                                                {
-                                                    returnVal.set(null);
-                                                }
-
-                                                @Override
-                                                public void onFailure(final Throwable t)
-                                                {
-                                                    if (t instanceof RuntimeException)
-                                                    {
-                                                        unregisteringExceptionHandler.handleException((RuntimeException) t,
-                                                                                                      AbstractConfiguredObject.this);
-                                                    }
-                                                    returnVal.set(null);
-                                                }
-                                            },
-                                            getTaskExecutor().getExecutor());
+                        return doAttainState(unregisteringExceptionHandler);
                     }
                     catch (RuntimeException e)
                     {
                         unregisteringExceptionHandler.handleException(e, AbstractConfiguredObject.this);
-                        returnVal.set(null);
                     }
                 }
+                return Futures.immediateFuture(null);
+
             }
         });
 
-        return returnVal;
     }
 
     protected void validateOnCreate()
@@ -822,8 +818,15 @@ public abstract class AbstractConfigured
                 }
                 catch(RuntimeException e)
                 {
-                    exceptionHandler.handleException(e, AbstractConfiguredObject.this);
-                    returnVal.set(null);
+                    try
+                    {
+                        exceptionHandler.handleException(e, AbstractConfiguredObject.this);
+                        returnVal.set(null);
+                    }
+                    catch(Throwable t)
+                    {
+                        returnVal.setException(t);
+                    }
                 }
             }
         });
@@ -851,11 +854,18 @@ public abstract class AbstractConfigured
                                                 @Override
                                                 public void onFailure(final Throwable t)
                                                 {
-                                                    if(t instanceof RuntimeException)
+                                                    try
                                                     {
-                                                        exceptionHandler.handleException((RuntimeException) t, configuredObject);
+                                                        if (t instanceof RuntimeException)
+                                                        {
+                                                            exceptionHandler.handleException((RuntimeException) t,
+                                                                                             configuredObject);
+                                                        }
+                                                    }
+                                                    finally
+                                                    {
+                                                        counter.decrementCount();
                                                     }
-                                                    counter.decrementCount();
                                                 }
                                             },getTaskExecutor().getExecutor());
 
@@ -1257,90 +1267,68 @@ public abstract class AbstractConfigured
     private ListenableFuture<Void> setDesiredState(final State desiredState)
             throws IllegalStateTransitionException, AccessControlException
     {
-
-        final SettableFuture<Void> returnVal = SettableFuture.create();
-        runTask(new Task<Void>()
+        return doOnConfigThread(new Callable<ListenableFuture<Void>>()
+        {
+            @Override
+            public ListenableFuture<Void> call() throws Exception
+            {
+                final State state = getState();
+                final State currentDesiredState = getDesiredState();
+                if(desiredState == currentDesiredState && desiredState != state)
+                {
+                    return doAfter(attainStateIfOpenedOrReopenFailed(), new Runnable()
+                    {
+                        @Override
+                        public void run()
                         {
-                            @Override
-                            public Void execute()
+                            final State currentState = getState();
+                            if (currentState != state)
                             {
+                                notifyStateChanged(state, currentState);
+                            }
 
-                                final State state = getState();
-                                final State currentDesiredState = getDesiredState();
-                                if(desiredState == currentDesiredState && desiredState != state)
-                                {
-                                    attainStateIfOpenedOrReopenFailed().addListener(new Runnable()
-                                    {
-                                        @Override
-                                        public void run()
-                                        {
-                                            try
-                                            {
-                                                final State currentState = getState();
-                                                if (currentState != state)
-                                                {
-                                                    notifyStateChanged(state, currentState);
-                                                }
-                                            }
-                                            finally
-                                            {
-                                                returnVal.set(null);
-                                            }
-                                        }
-                                    }
-                                    ,_taskExecutor.getExecutor());
-                                }
-                                else
-                                {
-                                    try
-                                    {
-                                        authoriseSetDesiredState(desiredState);
-                                        validateChange(createProxyForValidation(Collections.<String, Object>singletonMap(
-                                                ConfiguredObject.DESIRED_STATE,
-                                                desiredState)), Collections.singleton(ConfiguredObject.DESIRED_STATE));
+                        }
 
-                                        if (changeAttribute(ConfiguredObject.DESIRED_STATE, currentDesiredState, desiredState))
-                                        {
-                                            attributeSet(ConfiguredObject.DESIRED_STATE,
-                                                         currentDesiredState,
-                                                         desiredState);
+                    });
 
-                                            attainStateIfOpenedOrReopenFailed().addListener(new Runnable()
-                                            {
-                                                @Override
-                                                public void run()
-                                                {
-                                                    try
-                                                    {
-                                                        if (getState() == desiredState)
-                                                        {
-                                                            notifyStateChanged(state, desiredState);
-                                                        }
-                                                    }
-                                                    finally
-                                                    {
-                                                        returnVal.set(null);
-                                                    }
 
-                                                }
-                                            }, _taskExecutor.getExecutor());
-                                        }
-                                        else
-                                        {
-                                            returnVal.set(null);
-                                        }
-                                    }
-                                    catch (RuntimeException | Error e)
-                                    {
-                                        returnVal.set(null);
-                                        throw e;
-                                    }
+                }
+                else
+                {
+                    authoriseSetDesiredState(desiredState);
+                    validateChange(createProxyForValidation(Collections.<String, Object>singletonMap(
+                            ConfiguredObject.DESIRED_STATE,
+                            desiredState)), Collections.singleton(ConfiguredObject.DESIRED_STATE));
+
+                    if (changeAttribute(ConfiguredObject.DESIRED_STATE, currentDesiredState, desiredState))
+                    {
+                        attributeSet(ConfiguredObject.DESIRED_STATE,
+                                     currentDesiredState,
+                                     desiredState);
+
+                        return doAfter(attainStateIfOpenedOrReopenFailed(),new Runnable()
+                                       {
+                                           @Override
+                                           public void run()
+                                           {
+                                               if (getState() == desiredState)
+                                               {
+                                                   notifyStateChanged(state, desiredState);
+                                               }
+
+                                           }
+                                       }
+                            );
+                    }
+                    else
+                    {
+                        return Futures.immediateFuture(null);
+                    }
+                }
+
+            }
+        });
 
-                                }
-                                return null;
-                            }
-                        });
-        return returnVal;
     }
 
     @Override
@@ -1727,11 +1715,11 @@ public abstract class AbstractConfigured
         doSync(deleteAsync());
     }
 
-    private void doSync(ListenableFuture<Void> async)
+    protected final <R>  R doSync(ListenableFuture<R> async)
     {
         try
         {
-            async.get();
+            return async.get();
         }
         catch (InterruptedException e)
         {
@@ -1762,12 +1750,7 @@ public abstract class AbstractConfigured
 
     public final ListenableFuture<Void> deleteAsync()
     {
-       /* if(getState() == State.UNINITIALIZED)
-        {
-            _desiredState = State.DELETED;
-        }
-       */ return setDesiredState(State.DELETED);
-
+        return setDesiredState(State.DELETED);
     }
 
     public final void start()
@@ -1870,6 +1853,128 @@ public abstract class AbstractConfigured
         doSync(setAttributesAsync(attributes));
     }
 
+    protected final ChainedListenableFuture doAfter(ListenableFuture<Void> first, final Runnable second)
+    {
+        return doAfter(getTaskExecutor().getExecutor(), first, second);
+    }
+
+    protected static final ChainedListenableFuture doAfter(Executor executor, ListenableFuture<Void> first, final Runnable second)
+    {
+        final ChainedSettableFuture returnVal = new ChainedSettableFuture(executor);
+        Futures.addCallback(first, new FutureCallback<Void>()
+        {
+            @Override
+            public void onSuccess(final Void result)
+            {
+                try
+                {
+                    second.run();
+                    returnVal.set(null);
+                }
+                catch(Throwable e)
+                {
+                    returnVal.setException(e);
+                }
+            }
+
+            @Override
+            public void onFailure(final Throwable t)
+            {
+                returnVal.setException(t);
+            }
+        }, executor);
+
+        return returnVal;
+    }
+
+    public static interface ChainedListenableFuture extends ListenableFuture<Void>
+    {
+        ChainedListenableFuture then(Runnable r);
+        ChainedListenableFuture then(Callable<ListenableFuture<Void>> r);
+    }
+
+    public static class ChainedSettableFuture extends AbstractFuture<Void> implements ChainedListenableFuture
+    {
+        private final Executor _exector;
+
+        public ChainedSettableFuture(final Executor executor)
+        {
+            _exector = executor;
+        }
+
+        @Override
+        public boolean set(Void value)
+        {
+            return super.set(value);
+        }
+
+        @Override
+        public boolean setException(Throwable throwable)
+        {
+            return super.setException(throwable);
+        }
+
+        @Override
+        public ChainedListenableFuture then(final Runnable r)
+        {
+            return doAfter(_exector, this, r);
+        }
+
+        @Override
+        public ChainedListenableFuture then(final Callable<ListenableFuture<Void>> r)
+        {
+            return doAfter(_exector, this,r);
+        }
+    }
+
+    protected final ChainedListenableFuture doAfter(ListenableFuture<Void> first, final Callable<ListenableFuture<Void>> second)
+    {
+        return doAfter(getTaskExecutor().getExecutor(), first, second);
+    }
+
+    protected static final ChainedListenableFuture doAfter(final Executor executor, ListenableFuture<Void> first, final Callable<ListenableFuture<Void>> second)
+    {
+        final ChainedSettableFuture returnVal = new ChainedSettableFuture(executor);
+        Futures.addCallback(first, new FutureCallback<Void>()
+        {
+            @Override
+            public void onSuccess(final Void result)
+            {
+                try
+                {
+                    final ListenableFuture<Void> future = second.call();
+                    Futures.addCallback(future, new FutureCallback<Void>()
+                    {
+                        @Override
+                        public void onSuccess(final Void result)
+                        {
+                            returnVal.set(null);
+                        }
+
+                        @Override
+                        public void onFailure(final Throwable t)
+                        {
+                            returnVal.setException(t);
+                        }
+                    }, executor);
+
+                }
+                catch(Throwable e)
+                {
+                    returnVal.setException(e);
+                }
+            }
+
+            @Override
+            public void onFailure(final Throwable t)
+            {
+                returnVal.setException(t);
+            }
+        }, executor);
+
+        return returnVal;
+    }
+
     @Override
     public ListenableFuture<Void> setAttributesAsync(final Map<String, Object> attributes) throws IllegalStateException, AccessControlException, IllegalArgumentException
     {

Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/InternalBrokerHolder.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/InternalBrokerHolder.java?rev=1665614&r1=1665613&r2=1665614&view=diff
==============================================================================
--- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/InternalBrokerHolder.java (original)
+++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/InternalBrokerHolder.java Tue Mar 10 16:53:37 2015
@@ -21,16 +21,16 @@
 package org.apache.qpid.test.utils;
 
 import java.security.PrivilegedAction;
-import java.util.Map;
 import java.util.Set;
 
+import javax.security.auth.Subject;
+
 import org.apache.log4j.Logger;
 
 import org.apache.qpid.server.Broker;
 import org.apache.qpid.server.BrokerOptions;
 import org.apache.qpid.server.security.SecurityManager;
-
-import javax.security.auth.Subject;
+import org.apache.qpid.server.util.Action;
 
 public class InternalBrokerHolder implements BrokerHolder
 {
@@ -50,7 +50,14 @@ public class InternalBrokerHolder implem
     {
         LOGGER.info("Starting internal broker (same JVM)");
 
-        _broker = new Broker();
+        _broker = new Broker(new Action<Integer>()
+        {
+            @Override
+            public void performAction(final Integer object)
+            {
+                _broker = null;
+            }
+        });
         _broker.startup(options);
     }
 
@@ -63,7 +70,10 @@ public class InternalBrokerHolder implem
             @Override
             public Object run()
             {
-                _broker.shutdown();
+                if(_broker != null)
+                {
+                    _broker.shutdown();
+                }
                 return null;
             }
 



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org