You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2016/11/15 20:02:48 UTC
svn commit: r1769874 - in
/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model:
AbstractConfiguredObject.java port/AmqpPortImpl.java
Author: rgodfrey
Date: Tue Nov 15 20:02:48 2016
New Revision: 1769874
URL: http://svn.apache.org/viewvc?rev=1769874&view=rev
Log:
QPID-7511 : Calling closeAsync on an object for which closeAsync has already been called should return the future that he first call generated, rather than immediately returning true
Modified:
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/AbstractConfiguredObject.java
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/port/AmqpPortImpl.java
Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/AbstractConfiguredObject.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/AbstractConfiguredObject.java?rev=1769874&r1=1769873&r2=1769874&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/AbstractConfiguredObject.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/AbstractConfiguredObject.java Tue Nov 15 20:02:48 2016
@@ -67,6 +67,7 @@ import com.google.common.util.concurrent
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.SettableFuture;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -116,7 +117,38 @@ public abstract class AbstractConfigured
private UserPreferences _userPreferences;
private enum DynamicState { UNINIT, OPENED, CLOSED };
- private final AtomicReference<DynamicState> _dynamicState = new AtomicReference<>(DynamicState.UNINIT);
+
+ private static class DynamicStateWithFuture
+ {
+ private final DynamicState _dynamicState;
+ private final ListenableFuture<Void> _future;
+
+ private DynamicStateWithFuture(final DynamicState dynamicState, final ListenableFuture<Void> future)
+ {
+ _dynamicState = dynamicState;
+ _future = future;
+ }
+
+ public DynamicState getDynamicState()
+ {
+ return _dynamicState;
+ }
+
+ public ListenableFuture<Void> getFuture()
+ {
+ return _future;
+ }
+ }
+
+ private static final DynamicStateWithFuture UNINIT = new DynamicStateWithFuture(
+ DynamicState.UNINIT,
+ Futures.<Void>immediateFuture(null));
+ private static final DynamicStateWithFuture OPENED = new DynamicStateWithFuture(
+ DynamicState.OPENED,
+ Futures.<Void>immediateFuture(null));
+
+
+ private final AtomicReference<DynamicStateWithFuture> _dynamicState = new AtomicReference<>(UNINIT);
@@ -572,7 +604,7 @@ public abstract class AbstractConfigured
@Override
public ListenableFuture<Void> execute()
{
- if (_dynamicState.compareAndSet(DynamicState.UNINIT, DynamicState.OPENED))
+ if (_dynamicState.compareAndSet(UNINIT, OPENED))
{
_openFailed = false;
OpenExceptionHandler exceptionHandler = new OpenExceptionHandler();
@@ -763,43 +795,62 @@ public abstract class AbstractConfigured
public ListenableFuture<Void> execute()
{
LOGGER.debug("Closing " + AbstractConfiguredObject.this.getClass().getSimpleName() + " : " + getName());
-
- if(_dynamicState.compareAndSet(DynamicState.OPENED, DynamicState.CLOSED))
+ final SettableFuture<Void> returnFuture = SettableFuture.create();
+ DynamicStateWithFuture desiredStateWithFuture = new DynamicStateWithFuture(DynamicState.CLOSED, returnFuture);
+ DynamicStateWithFuture currentStateWithFuture;
+ while((currentStateWithFuture = _dynamicState.get()) == OPENED)
{
-
- return doAfter(beforeClose(), new Callable<ListenableFuture<Void>>()
- {
- @Override
- public ListenableFuture<Void> call() throws Exception
- {
- return closeChildren();
- }
- }).then(new Callable<ListenableFuture<Void>>()
+ if(_dynamicState.compareAndSet(OPENED, desiredStateWithFuture))
{
- @Override
- public ListenableFuture<Void> call() throws Exception
- {
- return onClose();
- }
- }).then(new Runnable()
- {
- @Override
- public void run()
+ final ChainedListenableFuture<Void> future =
+ doAfter(beforeClose(), new Callable<ListenableFuture<Void>>()
+ {
+ @Override
+ public ListenableFuture<Void> call() throws Exception
+ {
+ return closeChildren();
+ }
+ }).then(new Callable<ListenableFuture<Void>>()
+ {
+ @Override
+ public ListenableFuture<Void> call() throws Exception
+ {
+ return onClose();
+ }
+ }).then(new Callable<ListenableFuture<Void>>()
+ {
+ @Override
+ public ListenableFuture<Void> call() throws Exception
+ {
+ unregister(false);
+ LOGGER.debug("Closed "
+ + AbstractConfiguredObject.this.getClass().getSimpleName()
+ + " : "
+ + getName());
+ return Futures.immediateFuture(null);
+ }
+ });
+ addFutureCallback(future, new FutureCallback<Void>()
{
- unregister(false);
- LOGGER.debug("Closed "
- + AbstractConfiguredObject.this.getClass().getSimpleName()
- + " : "
- + getName());
- }
- });
- }
- else
- {
- LOGGER.debug("Closed " + AbstractConfiguredObject.this.getClass().getSimpleName() + " : " + getName());
+ @Override
+ public void onSuccess(final Void result)
+ {
+ returnFuture.set(null);
+ }
+
+ @Override
+ public void onFailure(final Throwable t)
+ {
+ returnFuture.setException(t);
+ }
+ }, MoreExecutors.directExecutor());
- return Futures.immediateFuture(null);
+ return returnFuture;
+ }
}
+
+ return currentStateWithFuture.getFuture();
+
}
@Override
@@ -845,7 +896,7 @@ public abstract class AbstractConfigured
@Override
public ListenableFuture<Void> execute()
{
- if (_dynamicState.compareAndSet(DynamicState.UNINIT, DynamicState.OPENED))
+ if (_dynamicState.compareAndSet(UNINIT, OPENED))
{
initializeAttributes();
@@ -991,7 +1042,7 @@ public abstract class AbstractConfigured
if (!_openComplete)
{
_openFailed = true;
- _dynamicState.compareAndSet(DynamicState.OPENED, DynamicState.UNINIT);
+ _dynamicState.compareAndSet(OPENED, UNINIT);
}
//TODO: children of ERRORED CO will continue to remain in ACTIVE state
@@ -1017,11 +1068,14 @@ public abstract class AbstractConfigured
@Override
public void performAction(final ConfiguredObject<?> child)
{
- if (child instanceof AbstractConfiguredObject
- && ((AbstractConfiguredObject)child)._dynamicState.get() == DynamicState.OPENED)
+ if (child instanceof AbstractConfiguredObject)
{
- final AbstractConfiguredObject configuredObject = (AbstractConfiguredObject) child;
- childStateFutures.add(configuredObject.doAttainState(exceptionHandler));
+ AbstractConfiguredObject<?> abstractConfiguredChild = (AbstractConfiguredObject<?>) child;
+ if(abstractConfiguredChild._dynamicState.get().getDynamicState() == DynamicState.OPENED)
+ {
+ final AbstractConfiguredObject configuredObject = abstractConfiguredChild;
+ childStateFutures.add(configuredObject.doAttainState(exceptionHandler));
+ }
}
else if(child instanceof AbstractConfiguredObjectProxy
&& ((AbstractConfiguredObjectProxy)child).getDynamicState() == DynamicState.OPENED)
@@ -1098,9 +1152,9 @@ public abstract class AbstractConfigured
return returnVal;
}
- protected void doOpening(boolean skipCheck, final AbstractConfiguredObjectExceptionHandler exceptionHandler)
+ protected final void doOpening(boolean skipCheck, final AbstractConfiguredObjectExceptionHandler exceptionHandler)
{
- if(skipCheck || _dynamicState.compareAndSet(DynamicState.UNINIT,DynamicState.OPENED))
+ if(skipCheck || _dynamicState.compareAndSet(UNINIT, OPENED))
{
onOpen();
notifyStateChanged(State.UNINITIALIZED, getState());
@@ -1139,7 +1193,7 @@ public abstract class AbstractConfigured
protected final void doValidation(final boolean skipCheck, final AbstractConfiguredObjectExceptionHandler exceptionHandler)
{
- if(skipCheck || _dynamicState.get() != DynamicState.OPENED)
+ if(skipCheck || _dynamicState.get().getDynamicState() != DynamicState.OPENED)
{
applyToChildren(new Action<ConfiguredObject<?>>()
{
@@ -1174,7 +1228,7 @@ public abstract class AbstractConfigured
protected final void doResolution(boolean skipCheck, final AbstractConfiguredObjectExceptionHandler exceptionHandler)
{
- if(skipCheck || _dynamicState.get() != DynamicState.OPENED)
+ if(skipCheck || _dynamicState.get().getDynamicState() != DynamicState.OPENED)
{
onResolve();
postResolve();
@@ -1236,7 +1290,7 @@ public abstract class AbstractConfigured
protected final void doCreation(final boolean skipCheck, final AbstractConfiguredObjectExceptionHandler exceptionHandler)
{
- if(skipCheck || _dynamicState.get() != DynamicState.OPENED)
+ if(skipCheck || _dynamicState.get().getDynamicState() != DynamicState.OPENED)
{
onCreate();
applyToChildren(new Action<ConfiguredObject<?>>()
Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/port/AmqpPortImpl.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/port/AmqpPortImpl.java?rev=1769874&r1=1769873&r2=1769874&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/port/AmqpPortImpl.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/port/AmqpPortImpl.java Tue Nov 15 20:02:48 2016
@@ -301,13 +301,7 @@ public class AmqpPortImpl extends Abstra
protected ListenableFuture<Void> beforeClose()
{
_closing.set(true);
-
- if (_connectionCount.get() == 0)
- {
- _noConnectionsRemain.set(null);
- }
-
- return _noConnectionsRemain;
+ return Futures.immediateFuture(null);
}
@Override
@@ -320,7 +314,6 @@ public class AmqpPortImpl extends Abstra
_container.getEventLogger().message(BrokerMessages.SHUTTING_DOWN(String.valueOf(transport), _transport.getAcceptingPort()));
}
-
_transport.close();
}
return Futures.immediateFuture(null);
@@ -610,10 +603,6 @@ public class AmqpPortImpl extends Abstra
_connectionCountWarningGiven.compareAndSet(true,false);
}
- if (_closing.get() && _connectionCount.get() == 0)
- {
- _noConnectionsRemain.set(null);
- }
return openConnections;
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org