You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2016/11/15 20:02:48 UTC

svn commit: r1769874 - in /qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model: AbstractConfiguredObject.java port/AmqpPortImpl.java

Author: rgodfrey
Date: Tue Nov 15 20:02:48 2016
New Revision: 1769874

URL: http://svn.apache.org/viewvc?rev=1769874&view=rev
Log:
QPID-7511 : Calling closeAsync on an object for which closeAsync has already been called should return the future that he first call generated, rather than immediately returning true

Modified:
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/AbstractConfiguredObject.java
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/port/AmqpPortImpl.java

Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/AbstractConfiguredObject.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/AbstractConfiguredObject.java?rev=1769874&r1=1769873&r2=1769874&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/AbstractConfiguredObject.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/AbstractConfiguredObject.java Tue Nov 15 20:02:48 2016
@@ -67,6 +67,7 @@ import com.google.common.util.concurrent
 import com.google.common.util.concurrent.FutureCallback;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.MoreExecutors;
 import com.google.common.util.concurrent.SettableFuture;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -116,7 +117,38 @@ public abstract class AbstractConfigured
     private UserPreferences _userPreferences;
 
     private enum DynamicState { UNINIT, OPENED, CLOSED };
-    private final AtomicReference<DynamicState> _dynamicState = new AtomicReference<>(DynamicState.UNINIT);
+
+    private static class DynamicStateWithFuture
+    {
+        private final DynamicState _dynamicState;
+        private final ListenableFuture<Void> _future;
+
+        private DynamicStateWithFuture(final DynamicState dynamicState, final ListenableFuture<Void> future)
+        {
+            _dynamicState = dynamicState;
+            _future = future;
+        }
+
+        public DynamicState getDynamicState()
+        {
+            return _dynamicState;
+        }
+
+        public ListenableFuture<Void> getFuture()
+        {
+            return _future;
+        }
+    }
+
+    private static final DynamicStateWithFuture UNINIT = new DynamicStateWithFuture(
+            DynamicState.UNINIT,
+            Futures.<Void>immediateFuture(null));
+    private static final DynamicStateWithFuture OPENED = new DynamicStateWithFuture(
+            DynamicState.OPENED,
+            Futures.<Void>immediateFuture(null));
+
+
+    private final AtomicReference<DynamicStateWithFuture> _dynamicState = new AtomicReference<>(UNINIT);
 
 
 
@@ -572,7 +604,7 @@ public abstract class AbstractConfigured
                                     @Override
                                     public ListenableFuture<Void> execute()
                                     {
-                                        if (_dynamicState.compareAndSet(DynamicState.UNINIT, DynamicState.OPENED))
+                                        if (_dynamicState.compareAndSet(UNINIT, OPENED))
                                         {
                                             _openFailed = false;
                                             OpenExceptionHandler exceptionHandler = new OpenExceptionHandler();
@@ -763,43 +795,62 @@ public abstract class AbstractConfigured
             public ListenableFuture<Void> execute()
             {
                 LOGGER.debug("Closing " + AbstractConfiguredObject.this.getClass().getSimpleName() + " : " + getName());
-
-                if(_dynamicState.compareAndSet(DynamicState.OPENED, DynamicState.CLOSED))
+                final SettableFuture<Void> returnFuture = SettableFuture.create();
+                DynamicStateWithFuture desiredStateWithFuture = new DynamicStateWithFuture(DynamicState.CLOSED, returnFuture);
+                DynamicStateWithFuture currentStateWithFuture;
+                while((currentStateWithFuture = _dynamicState.get()) == OPENED)
                 {
-
-                    return doAfter(beforeClose(), new Callable<ListenableFuture<Void>>()
-                    {
-                        @Override
-                        public ListenableFuture<Void> call() throws Exception
-                        {
-                            return closeChildren();
-                        }
-                    }).then(new Callable<ListenableFuture<Void>>()
+                    if(_dynamicState.compareAndSet(OPENED, desiredStateWithFuture))
                     {
-                        @Override
-                        public ListenableFuture<Void> call() throws Exception
-                        {
-                            return onClose();
-                        }
-                    }).then(new Runnable()
-                    {
-                        @Override
-                        public void run()
+                        final ChainedListenableFuture<Void> future =
+                                doAfter(beforeClose(), new Callable<ListenableFuture<Void>>()
+                                {
+                                    @Override
+                                    public ListenableFuture<Void> call() throws Exception
+                                    {
+                                        return closeChildren();
+                                    }
+                                }).then(new Callable<ListenableFuture<Void>>()
+                                {
+                                    @Override
+                                    public ListenableFuture<Void> call() throws Exception
+                                    {
+                                        return onClose();
+                                    }
+                                }).then(new Callable<ListenableFuture<Void>>()
+                                {
+                                    @Override
+                                    public ListenableFuture<Void> call() throws Exception
+                                    {
+                                        unregister(false);
+                                        LOGGER.debug("Closed "
+                                                     + AbstractConfiguredObject.this.getClass().getSimpleName()
+                                                     + " : "
+                                                     + getName());
+                                        return Futures.immediateFuture(null);
+                                    }
+                                });
+                        addFutureCallback(future, new FutureCallback<Void>()
                         {
-                            unregister(false);
-                            LOGGER.debug("Closed "
-                                         + AbstractConfiguredObject.this.getClass().getSimpleName()
-                                         + " : "
-                                         + getName());
-                        }
-                    });
-                }
-                else
-                {
-                    LOGGER.debug("Closed " + AbstractConfiguredObject.this.getClass().getSimpleName() + " : " + getName());
+                            @Override
+                            public void onSuccess(final Void result)
+                            {
+                                returnFuture.set(null);
+                            }
+
+                            @Override
+                            public void onFailure(final Throwable t)
+                            {
+                                returnFuture.setException(t);
+                            }
+                        }, MoreExecutors.directExecutor());
 
-                    return Futures.immediateFuture(null);
+                        return returnFuture;
+                    }
                 }
+
+                return currentStateWithFuture.getFuture();
+
             }
 
             @Override
@@ -845,7 +896,7 @@ public abstract class AbstractConfigured
             @Override
             public ListenableFuture<Void> execute()
             {
-                if (_dynamicState.compareAndSet(DynamicState.UNINIT, DynamicState.OPENED))
+                if (_dynamicState.compareAndSet(UNINIT, OPENED))
                 {
                     initializeAttributes();
 
@@ -991,7 +1042,7 @@ public abstract class AbstractConfigured
         if (!_openComplete)
         {
             _openFailed = true;
-            _dynamicState.compareAndSet(DynamicState.OPENED, DynamicState.UNINIT);
+            _dynamicState.compareAndSet(OPENED, UNINIT);
         }
 
         //TODO: children of ERRORED CO will continue to remain in ACTIVE state
@@ -1017,11 +1068,14 @@ public abstract class AbstractConfigured
             @Override
             public void performAction(final ConfiguredObject<?> child)
             {
-                if (child instanceof AbstractConfiguredObject
-                    && ((AbstractConfiguredObject)child)._dynamicState.get() == DynamicState.OPENED)
+                if (child instanceof AbstractConfiguredObject)
                 {
-                    final AbstractConfiguredObject configuredObject = (AbstractConfiguredObject) child;
-                    childStateFutures.add(configuredObject.doAttainState(exceptionHandler));
+                    AbstractConfiguredObject<?> abstractConfiguredChild = (AbstractConfiguredObject<?>) child;
+                    if(abstractConfiguredChild._dynamicState.get().getDynamicState() == DynamicState.OPENED)
+                    {
+                        final AbstractConfiguredObject configuredObject = abstractConfiguredChild;
+                        childStateFutures.add(configuredObject.doAttainState(exceptionHandler));
+                    }
                 }
                 else if(child instanceof AbstractConfiguredObjectProxy
                     && ((AbstractConfiguredObjectProxy)child).getDynamicState() == DynamicState.OPENED)
@@ -1098,9 +1152,9 @@ public abstract class AbstractConfigured
         return returnVal;
     }
 
-    protected void doOpening(boolean skipCheck, final AbstractConfiguredObjectExceptionHandler exceptionHandler)
+    protected final void doOpening(boolean skipCheck, final AbstractConfiguredObjectExceptionHandler exceptionHandler)
     {
-        if(skipCheck || _dynamicState.compareAndSet(DynamicState.UNINIT,DynamicState.OPENED))
+        if(skipCheck || _dynamicState.compareAndSet(UNINIT, OPENED))
         {
             onOpen();
             notifyStateChanged(State.UNINITIALIZED, getState());
@@ -1139,7 +1193,7 @@ public abstract class AbstractConfigured
 
     protected final void doValidation(final boolean skipCheck, final AbstractConfiguredObjectExceptionHandler exceptionHandler)
     {
-        if(skipCheck || _dynamicState.get() != DynamicState.OPENED)
+        if(skipCheck || _dynamicState.get().getDynamicState() != DynamicState.OPENED)
         {
             applyToChildren(new Action<ConfiguredObject<?>>()
             {
@@ -1174,7 +1228,7 @@ public abstract class AbstractConfigured
 
     protected final void doResolution(boolean skipCheck, final AbstractConfiguredObjectExceptionHandler exceptionHandler)
     {
-        if(skipCheck || _dynamicState.get() != DynamicState.OPENED)
+        if(skipCheck || _dynamicState.get().getDynamicState() != DynamicState.OPENED)
         {
             onResolve();
             postResolve();
@@ -1236,7 +1290,7 @@ public abstract class AbstractConfigured
 
     protected final void doCreation(final boolean skipCheck, final AbstractConfiguredObjectExceptionHandler exceptionHandler)
     {
-        if(skipCheck || _dynamicState.get() != DynamicState.OPENED)
+        if(skipCheck || _dynamicState.get().getDynamicState() != DynamicState.OPENED)
         {
             onCreate();
             applyToChildren(new Action<ConfiguredObject<?>>()

Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/port/AmqpPortImpl.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/port/AmqpPortImpl.java?rev=1769874&r1=1769873&r2=1769874&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/port/AmqpPortImpl.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/port/AmqpPortImpl.java Tue Nov 15 20:02:48 2016
@@ -301,13 +301,7 @@ public class AmqpPortImpl extends Abstra
     protected ListenableFuture<Void> beforeClose()
     {
         _closing.set(true);
-
-        if (_connectionCount.get() == 0)
-        {
-            _noConnectionsRemain.set(null);
-        }
-
-        return _noConnectionsRemain;
+        return Futures.immediateFuture(null);
     }
 
     @Override
@@ -320,7 +314,6 @@ public class AmqpPortImpl extends Abstra
                 _container.getEventLogger().message(BrokerMessages.SHUTTING_DOWN(String.valueOf(transport), _transport.getAcceptingPort()));
             }
 
-
             _transport.close();
         }
         return Futures.immediateFuture(null);
@@ -610,10 +603,6 @@ public class AmqpPortImpl extends Abstra
            _connectionCountWarningGiven.compareAndSet(true,false);
         }
 
-        if (_closing.get() && _connectionCount.get() == 0)
-        {
-            _noConnectionsRemain.set(null);
-        }
 
         return openConnections;
     }



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org