You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kw...@apache.org on 2015/08/05 17:54:30 UTC
svn commit: r1694253 - in /qpid/java/trunk:
bdbstore/src/main/java/org/apache/qpid/server/virtualhost/berkeleydb/
broker-core/src/main/java/org/apache/qpid/server/binding/
broker-core/src/main/java/org/apache/qpid/server/configuration/updater/
broker-c...
Author: kwall
Date: Wed Aug 5 15:54:30 2015
New Revision: 1694253
URL: http://svn.apache.org/r1694253
Log:
QPID-6670: [Java Broker] Delete future for queue/exchange must chain binding deletion too
* Changed Queue/Exchange doDelete so that the future returned completes only when the binding
children are also deleted too. Uses established doAfter/Async patterns already adopted elsewhere
* On the 0-8..0-91 paths, close the session model object explicitly (like 0-10/1.0 paths) so that
io thread blocks until session close (rather than running asynchronously).
Modified:
qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/virtualhost/berkeleydb/BDBHAReplicaVirtualHostImpl.java
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/binding/BindingImpl.java
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/configuration/updater/TaskExecutorImpl.java
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/exchange/ExchangeImpl.java
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/exchange/TopicExchange.java
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/Exchange.java
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/adapter/SessionAdapter.java
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/RedirectingVirtualHostImpl.java
qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
Modified: qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/virtualhost/berkeleydb/BDBHAReplicaVirtualHostImpl.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/virtualhost/berkeleydb/BDBHAReplicaVirtualHostImpl.java?rev=1694253&r1=1694252&r2=1694253&view=diff
==============================================================================
--- qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/virtualhost/berkeleydb/BDBHAReplicaVirtualHostImpl.java (original)
+++ qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/virtualhost/berkeleydb/BDBHAReplicaVirtualHostImpl.java Wed Aug 5 15:54:30 2015
@@ -164,6 +164,14 @@ public class BDBHAReplicaVirtualHostImpl
}
@Override
+ public ListenableFuture<Void> removeExchangeAsync(final ExchangeImpl<?> exchange, final boolean force)
+ throws ExchangeIsAlternateException, RequiredExchangeException
+ {
+ throwUnsupportedForReplica();
+ return null;
+ }
+
+ @Override
public MessageDestination getAttainedMessageDestination(final String name)
{
return null;
@@ -333,6 +341,13 @@ public class BDBHAReplicaVirtualHostImpl
}
@Override
+ public ListenableFuture<Integer> removeQueueAsync(final AMQQueue<?> queue)
+ {
+ throwUnsupportedForReplica();
+ return null;
+ }
+
+ @Override
public int removeQueue(final AMQQueue<?> queue)
{
throwUnsupportedForReplica();
Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/binding/BindingImpl.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/binding/BindingImpl.java?rev=1694253&r1=1694252&r2=1694253&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/binding/BindingImpl.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/binding/BindingImpl.java Wed Aug 5 15:54:30 2015
@@ -24,7 +24,6 @@ import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
-import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
@@ -49,13 +48,12 @@ import org.apache.qpid.server.model.Stat
import org.apache.qpid.server.model.StateTransition;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.security.SecurityManager;
-import org.apache.qpid.server.util.StateChangeListener;
public class BindingImpl
extends AbstractConfiguredObject<BindingImpl>
implements org.apache.qpid.server.model.Binding<BindingImpl>
{
- private String _bindingKey;
+ private final String _bindingKey;
private final AMQQueue _queue;
private final ExchangeImpl _exchange;
@ManagedAttributeField
@@ -64,8 +62,6 @@ public class BindingImpl
private BindingLogSubject _logSubject;
final AtomicBoolean _deleted = new AtomicBoolean();
- final CopyOnWriteArrayList<StateChangeListener<BindingImpl,State>> _stateChangeListeners =
- new CopyOnWriteArrayList<StateChangeListener<BindingImpl, State>>();
public BindingImpl(Map<String, Object> attributes, AMQQueue queue, ExchangeImpl exchange)
{
@@ -115,7 +111,7 @@ public class BindingImpl
{
if(!attributes.containsKey(DURABLE))
{
- attributes = new HashMap(attributes);
+ attributes = new HashMap<>(attributes);
attributes.put(DURABLE, queue.isDurable() && exchange.isDurable());
}
return attributes;
@@ -193,6 +189,7 @@ public class BindingImpl
return result;
}
+ @Override
public String toString()
{
return "Binding{bindingKey="+_bindingKey+", exchange="+_exchange+", queue="+_queue+", id= " + getId() + " }";
@@ -201,18 +198,27 @@ public class BindingImpl
@StateTransition(currentState = State.ACTIVE, desiredState = State.DELETED)
private ListenableFuture<Void> doDelete()
{
- if(_deleted.compareAndSet(false,true))
+ if (_deleted.compareAndSet(false, true))
{
- for(StateChangeListener<BindingImpl,State> listener : _stateChangeListeners)
+ ListenableFuture<Void> removeBinding = _exchange.removeBindingAsync(this);
+ return doAfter(removeBinding, new Runnable()
{
- listener.stateChanged(this, State.ACTIVE, State.DELETED);
- }
+ @Override
+ public void run()
+ {
+ getEventLogger().message(_logSubject, BindingMessages.DELETED());
+ deleted();
+ setState(State.DELETED);
+ }
+ });
+ }
+ else
+ {
getEventLogger().message(_logSubject, BindingMessages.DELETED());
+ deleted();
+ setState(State.DELETED);
+ return Futures.immediateFuture(null);
}
-
- deleted();
- setState(State.DELETED);
- return Futures.immediateFuture(null);
}
@StateTransition(currentState = State.UNINITIALIZED, desiredState = State.ACTIVE)
@@ -222,16 +228,6 @@ public class BindingImpl
return Futures.immediateFuture(null);
}
- public void addStateChangeListener(StateChangeListener<BindingImpl,State> listener)
- {
- _stateChangeListeners.add(listener);
- }
-
- public void removeStateChangeListener(StateChangeListener<BindingImpl,State> listener)
- {
- _stateChangeListeners.remove(listener);
- }
-
private EventLogger getEventLogger()
{
return _exchange.getEventLogger();
Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/configuration/updater/TaskExecutorImpl.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/configuration/updater/TaskExecutorImpl.java?rev=1694253&r1=1694252&r2=1694253&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/configuration/updater/TaskExecutorImpl.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/configuration/updater/TaskExecutorImpl.java Wed Aug 5 15:54:30 2015
@@ -79,7 +79,7 @@ public class TaskExecutorImpl implements
{
if (_running.compareAndSet(false, true))
{
- LOGGER.debug("Starting task executor");
+ LOGGER.debug("Starting task executor {}", _name);
_executor = Executors.newFixedThreadPool(1, new ThreadFactory()
{
@Override
@@ -101,7 +101,7 @@ public class TaskExecutorImpl implements
ExecutorService executor = _executor;
if (executor != null)
{
- LOGGER.debug("Stopping task executor immediately");
+ LOGGER.debug("Stopping task executor {} immediately", _name);
List<Runnable> cancelledTasks = executor.shutdownNow();
for (Runnable runnable : cancelledTasks)
{
@@ -126,7 +126,7 @@ public class TaskExecutorImpl implements
ExecutorService executor = _executor;
if (executor != null)
{
- LOGGER.debug("Stopping task executor");
+ LOGGER.debug("Stopping task executor {}", _name);
executor.shutdown();
_executor = null;
_taskThread = null;
@@ -138,19 +138,18 @@ public class TaskExecutorImpl implements
@Override
public <T> Future<T> submit(Task<T> task)
{
- checkState();
- LOGGER.debug("Submitting task: {}", task);
- Future<T> future = null;
+ checkState(task);
if (isTaskExecutorThread())
{
+ LOGGER.debug("Running task {} immediately", task);
T result = executeTask(task);
return new ImmediateFuture(result);
}
else
{
- future = _executor.submit(new CallableWrapper(task));
+ LOGGER.debug("Submitting task {} to executor {}", task, _name);
+ return _executor.submit(new CallableWrapper(task));
}
- return future;
}
@Override
@@ -303,11 +302,12 @@ public class TaskExecutorImpl implements
return Thread.currentThread() == _taskThread;
}
- private void checkState()
+ private <T> void checkState(Task<T> task)
{
if (!_running.get())
{
- throw new IllegalStateException("Task executor is not in ACTIVE state");
+ LOGGER.error("Task executor {} is not in ACTIVE state, unable to execute : {} ", _name, task);
+ throw new IllegalStateException("Task executor " + _name + " is not in ACTIVE state");
}
}
Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java?rev=1694253&r1=1694252&r2=1694253&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java Wed Aug 5 15:54:30 2015
@@ -70,7 +70,6 @@ import org.apache.qpid.server.store.Mess
import org.apache.qpid.server.store.StorableMessageMetaData;
import org.apache.qpid.server.txn.ServerTransaction;
import org.apache.qpid.server.util.Action;
-import org.apache.qpid.server.util.StateChangeListener;
import org.apache.qpid.server.virtualhost.ExchangeIsAlternateException;
import org.apache.qpid.server.virtualhost.RequiredExchangeException;
import org.apache.qpid.server.virtualhost.ReservedExchangeNameException;
@@ -89,7 +88,7 @@ public abstract class AbstractExchange<T
private VirtualHostImpl _virtualHost;
- private final List<Action<ExchangeImpl>> _closeTaskList = new CopyOnWriteArrayList<Action<ExchangeImpl>>();
+ private final List<Action<ExchangeImpl>> _closeTaskList = new CopyOnWriteArrayList<>();
/**
* Whether the exchange is automatically deleted once all queues have detached from it
@@ -113,8 +112,6 @@ public abstract class AbstractExchange<T
private final ConcurrentMap<BindingIdentifier, BindingImpl> _bindingsMap = new ConcurrentHashMap<BindingIdentifier, BindingImpl>();
- private StateChangeListener<BindingImpl, State> _bindingListener;
-
public AbstractExchange(Map<String, Object> attributes, VirtualHostImpl vhost)
{
super(parentsMap(vhost), attributes);
@@ -127,18 +124,6 @@ public abstract class AbstractExchange<T
_virtualHost = vhost;
_logSubject = new ExchangeLogSubject(this, this.getVirtualHost());
-
- _bindingListener = new StateChangeListener<BindingImpl, State>()
- {
- @Override
- public void stateChanged(final BindingImpl binding, final State oldState, final State newState)
- {
- if(newState == State.DELETED)
- {
- removeBinding(binding);
- }
- }
- };
}
@Override
@@ -187,7 +172,7 @@ public abstract class AbstractExchange<T
}
@Override
- public void deleteWithChecks()
+ public ListenableFuture<Void> deleteWithChecks()
{
if(hasReferrers())
{
@@ -201,28 +186,44 @@ public abstract class AbstractExchange<T
if(_closed.compareAndSet(false,true))
{
- List<BindingImpl> bindings = new ArrayList<BindingImpl>(_bindings);
+ List<ListenableFuture<Void>> removeBindingFutures = new ArrayList<>();
+ ListenableFuture<Void> atLeastOne = Futures.immediateFuture(null);
+ removeBindingFutures.add(atLeastOne);
+
+ List<BindingImpl> bindings = new ArrayList<>(_bindings);
for(BindingImpl binding : bindings)
{
- binding.removeStateChangeListener(_bindingListener);
- removeBinding(binding);
+ ListenableFuture<Void> deleteFuture = binding.deleteAsync();
+ removeBindingFutures.add(deleteFuture);
}
- if(_alternateExchange != null)
+ ListenableFuture<List<Void>> combinedFuture = Futures.allAsList(removeBindingFutures);
+ return doAfter(combinedFuture, new Runnable()
{
- ((ExchangeImpl)_alternateExchange).removeReference(this);
- }
+ @Override
+ public void run()
+ {
+ if (_alternateExchange != null)
+ {
+ ((ExchangeImpl) _alternateExchange).removeReference(AbstractExchange.this);
+ }
- getEventLogger().message(_logSubject, ExchangeMessages.DELETED());
-
- for(Action<ExchangeImpl> task : _closeTaskList)
- {
- task.performAction(this);
- }
- _closeTaskList.clear();
+ getEventLogger().message(_logSubject, ExchangeMessages.DELETED());
+ for (Action<ExchangeImpl> task : _closeTaskList)
+ {
+ task.performAction(AbstractExchange.this);
+ }
+ _closeTaskList.clear();
+ deleted();
+ }
+ });
+ }
+ else
+ {
+ deleted();
+ return Futures.immediateFuture(null);
}
- deleted();
}
@Override
@@ -633,8 +634,8 @@ public abstract class AbstractExchange<T
}));
}
-
- private void removeBinding(final BindingImpl binding)
+ @Override
+ public ListenableFuture<Void> removeBindingAsync(final BindingImpl binding)
{
String bindingKey = binding.getBindingKey();
AMQQueue queue = binding.getAMQQueue();
@@ -657,39 +658,43 @@ public abstract class AbstractExchange<T
queue.removeBinding(b);
// TODO - RG - Fix bindings!
- if(getTaskExecutor().isTaskExecutorThread())
- {
- b.deleteAsync();
- }
- else
- {
- b.delete();
- }
+ return autoDeleteIfNecessaryAsync();
+ }
+ else
+ {
+ return Futures.immediateFuture(null);
+ }
- autoDeleteIfNeccessary();
+ }
+
+ private ListenableFuture<Void> autoDeleteIfNecessaryAsync()
+ {
+ if (isAutoDeletePending())
+ {
+ _logger.debug("Auto-deleting exchange: {}", this);
+ return deleteAsync();
}
+ return Futures.immediateFuture(null);
}
- private void autoDeleteIfNeccessary()
+ private void autoDeleteIfNecessary()
{
- if ((getLifetimePolicy() == LifetimePolicy.DELETE_ON_NO_OUTBOUND_LINKS || getLifetimePolicy() == LifetimePolicy.DELETE_ON_NO_LINKS )
- && getBindingCount() == 0)
+ if (isAutoDeletePending())
{
_logger.debug("Auto-deleting exchange: {}", this);
- if(getTaskExecutor().isTaskExecutorThread())
- {
- deleteAsync();
- }
- else
- {
- delete();
- }
+ delete();
}
}
+ private boolean isAutoDeletePending()
+ {
+ return (getLifetimePolicy() == LifetimePolicy.DELETE_ON_NO_OUTBOUND_LINKS || getLifetimePolicy() == LifetimePolicy.DELETE_ON_NO_LINKS )
+ && getBindingCount() == 0;
+ }
+
public BindingImpl getBinding(String bindingKey, AMQQueue queue)
{
assert queue != null;
@@ -782,8 +787,6 @@ public abstract class AbstractExchange<T
@Override
public void addBinding(final BindingImpl b)
{
- b.addStateChangeListener(_bindingListener);
-
BindingIdentifier identifier = new BindingIdentifier(b.getName(), b.getAMQQueue());
_bindingsMap.put(identifier, b);
@@ -818,9 +821,16 @@ public abstract class AbstractExchange<T
{
try
{
- _virtualHost.removeExchange(this,true);
- preSetAlternateExchange();
- setState(State.DELETED);
+ ListenableFuture<Void> removeExchangeFuture = _virtualHost.removeExchangeAsync(this, true);
+ return doAfter(removeExchangeFuture, new Runnable()
+ {
+ @Override
+ public void run()
+ {
+ preSetAlternateExchange();
+ setState(State.DELETED);
+ }
+ });
}
catch (ExchangeIsAlternateException e)
{
@@ -917,7 +927,7 @@ public abstract class AbstractExchange<T
else
{
binding.delete();
- autoDeleteIfNeccessary();
+ autoDeleteIfNecessary();
return true;
}
}
Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/exchange/ExchangeImpl.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/exchange/ExchangeImpl.java?rev=1694253&r1=1694252&r2=1694253&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/exchange/ExchangeImpl.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/exchange/ExchangeImpl.java Wed Aug 5 15:54:30 2015
@@ -22,6 +22,8 @@ package org.apache.qpid.server.exchange;
import java.util.Map;
+import com.google.common.util.concurrent.ListenableFuture;
+
import org.apache.qpid.server.binding.BindingImpl;
import org.apache.qpid.server.logging.EventLogger;
import org.apache.qpid.server.message.MessageDestination;
@@ -46,7 +48,7 @@ public interface ExchangeImpl<T extends
AMQQueue queue,
Map<String, Object> arguments);
- void deleteWithChecks();
+ ListenableFuture<Void> deleteWithChecks();
/**
* Determines whether a message would be isBound to a particular queue using a specific routing key and arguments
@@ -94,6 +96,8 @@ public interface ExchangeImpl<T extends
boolean hasReferrers();
+ ListenableFuture<Void> removeBindingAsync(BindingImpl binding);
+
BindingImpl getBinding(String bindingName, AMQQueue queue);
EventLogger getEventLogger();
Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/exchange/TopicExchange.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/exchange/TopicExchange.java?rev=1694253&r1=1694252&r2=1694253&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/exchange/TopicExchange.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/exchange/TopicExchange.java Wed Aug 5 15:54:30 2015
@@ -255,7 +255,7 @@ public class TopicExchange extends Abstr
{
Map<String,Object> bindingArgs = _bindings.remove(binding);
- _logger.debug("deregisterQueue {}", bindingArgs);
+ _logger.debug("deregisterQueue args: {}", bindingArgs);
String bindingKey = TopicNormalizer.normalize(binding.getBindingKey());
TopicExchangeResult result = _topicExchangeResults.get(bindingKey);
Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/Exchange.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/Exchange.java?rev=1694253&r1=1694252&r2=1694253&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/Exchange.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/Exchange.java Wed Aug 5 15:54:30 2015
@@ -23,6 +23,8 @@ package org.apache.qpid.server.model;
import java.util.Collection;
import java.util.Map;
+import com.google.common.util.concurrent.ListenableFuture;
+
import org.apache.qpid.server.message.MessageDestination;
@ManagedObject( description = Exchange.CLASS_DESCRIPTION )
@@ -69,5 +71,5 @@ public interface Exchange<X extends Exch
- void deleteWithChecks();
+ ListenableFuture<Void> deleteWithChecks();
}
Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java?rev=1694253&r1=1694252&r2=1694253&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java Wed Aug 5 15:54:30 2015
@@ -24,6 +24,8 @@ import java.util.Collection;
import java.util.List;
import java.util.Map;
+import com.google.common.util.concurrent.ListenableFuture;
+
import org.apache.qpid.server.message.MessageInfo;
import org.apache.qpid.server.queue.QueueEntryVisitor;
import org.apache.qpid.server.store.MessageDurability;
@@ -181,7 +183,7 @@ public interface Queue<X extends Queue<X
void visit(QueueEntryVisitor visitor);
- int deleteAndReturnCount();
+ ListenableFuture<Integer> deleteAndReturnCount();
void setNotificationListener(QueueNotificationListener listener);
Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/adapter/SessionAdapter.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/adapter/SessionAdapter.java?rev=1694253&r1=1694252&r2=1694253&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/adapter/SessionAdapter.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/adapter/SessionAdapter.java Wed Aug 5 15:54:30 2015
@@ -46,6 +46,7 @@ public final class SessionAdapter extend
{
// Attributes
private final AMQSessionModel _session;
+ private final Action _deleteModelTask;
public SessionAdapter(final AbstractAMQPConnection<?> connectionAdapter,
final AMQSessionModel session)
@@ -68,7 +69,7 @@ public final class SessionAdapter extend
}
});
session.setModelObject(this);
- session.addDeleteTask(new Action()
+ _deleteModelTask = new Action()
{
@Override
public void performAction(final Object object)
@@ -76,7 +77,8 @@ public final class SessionAdapter extend
session.removeDeleteTask(this);
deleteAsync();
}
- });
+ };
+ session.addDeleteTask(_deleteModelTask);
setState(State.ACTIVE);
}
@@ -177,6 +179,7 @@ public final class SessionAdapter extend
{
deleted();
setState(State.DELETED);
+ _session.removeDeleteTask(_deleteModelTask);
return Futures.immediateFuture(null);
}
Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java?rev=1694253&r1=1694252&r2=1694253&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java Wed Aug 5 15:54:30 2015
@@ -38,7 +38,11 @@ import java.util.concurrent.ConcurrentLi
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executor;
import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
@@ -52,6 +56,7 @@ import org.apache.qpid.server.message.Me
import org.apache.qpid.server.message.MessageInfoFacade;
import org.apache.qpid.server.model.CustomRestHeaders;
import org.apache.qpid.server.model.RestContentHeader;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -1866,88 +1871,131 @@ public abstract class AbstractQueue<X ex
_deleteTaskList.remove(task);
}
- // TODO list all thrown exceptions
- public int deleteAndReturnCount()
+ public ListenableFuture<Integer> deleteAndReturnCount()
{
// Check access
_virtualHost.getSecurityManager().authoriseDelete(this);
+ final ListenableFuture<Integer> returnCountFuture = Futures.immediateFuture(getQueueDepthMessages());
if (!_deleted.getAndSet(true))
{
+ final List<ListenableFuture<Void>> removeBindingFutures = new ArrayList<>();
+ final ListenableFuture<Void> atLeastOne = Futures.immediateFuture(null);
+ removeBindingFutures.add(atLeastOne);
+ final ArrayList<BindingImpl> bindingCopy = new ArrayList<>(_bindings);
- final ArrayList<BindingImpl> bindingCopy = new ArrayList<BindingImpl>(_bindings);
-
+ // TODO - RG - Need to sort out bindings!
for (BindingImpl b : bindingCopy)
{
- // TODO - RG - Need to sort out bindings!
- if(getTaskExecutor().isTaskExecutorThread())
- {
- b.deleteAsync();
- }
- else
- {
- b.delete();
- }
+ final ListenableFuture<Void> removeFuture = b.deleteAsync();
+ removeBindingFutures.add(removeFuture);
}
- QueueConsumerList.ConsumerNodeIterator consumerNodeIterator = _consumerList.iterator();
-
- while (consumerNodeIterator.advance())
+ ListenableFuture<List<Void>> combinedFuture = Futures.allAsList(removeBindingFutures);
+ final ListenableFuture<Void> result = doAfter(combinedFuture, new Runnable()
{
- QueueConsumer s = consumerNodeIterator.getNode().getConsumer();
- if (s != null)
+ @Override
+ public void run()
{
- s.queueDeleted();
- }
- }
+ QueueConsumerList.ConsumerNodeIterator consumerNodeIterator = _consumerList.iterator();
+ while (consumerNodeIterator.advance())
+ {
+ QueueConsumer s = consumerNodeIterator.getNode().getConsumer();
+ if (s != null)
+ {
+ s.queueDeleted();
+ }
+ }
- List<QueueEntry> entries = getMessagesOnTheQueue(new QueueEntryFilter()
- {
+ List<QueueEntry> entries = getMessagesOnTheQueue(new AcquireAllQueueEntryFilter());
- public boolean accept(QueueEntry entry)
+ routeToAlternate(entries);
+
+ preSetAlternateExchange();
+
+ performQueueDeleteTasks();
+ deleted();
+
+ //Log Queue Deletion
+ getEventLogger().message(_logSubject, QueueMessages.DELETED());
+ }
+ });
+
+ return new ListenableFuture<Integer>()
+ {
+ @Override
+ public void addListener(final Runnable listener, final Executor executor)
{
- return entry.acquire();
+ result.addListener(listener, executor);
}
- public boolean filterComplete()
+ @Override
+ public boolean cancel(final boolean mayInterruptIfRunning)
{
- return false;
+ return result.cancel(mayInterruptIfRunning);
}
- });
- ServerTransaction txn = new LocalTransaction(getVirtualHost().getMessageStore());
+ @Override
+ public boolean isCancelled()
+ {
+ return result.isCancelled();
+ }
+ @Override
+ public boolean isDone()
+ {
+ return result.isDone();
+ }
- for(final QueueEntry entry : entries)
- {
- // TODO log requeues with a post enqueue action
- int requeues = entry.routeToAlternate(null, txn);
+ @Override
+ public Integer get() throws InterruptedException, ExecutionException
+ {
+ result.get();
+ return returnCountFuture.get();
+ }
- if(requeues == 0)
+ @Override
+ public Integer get(final long timeout, final TimeUnit unit)
+ throws InterruptedException, ExecutionException, TimeoutException
{
- // TODO log discard
+ result.get(timeout, unit);
+ return returnCountFuture.get();
}
- }
+ };
+ }
+ else
+ {
+ return returnCountFuture;
+ }
+ }
- txn.commit();
+ protected void routeToAlternate(List<QueueEntry> entries)
+ {
+ ServerTransaction txn = new LocalTransaction(getVirtualHost().getMessageStore());
- preSetAlternateExchange();
+ for(final QueueEntry entry : entries)
+ {
+ // TODO log requeues with a post enqueue action
+ int requeues = entry.routeToAlternate(null, txn);
- for (Action<? super AMQQueue> task : _deleteTaskList)
+ if(requeues == 0)
{
- task.performAction(this);
+ // TODO log discard
}
+ }
- _deleteTaskList.clear();
- closeAsync();
- deleted();
- //Log Queue Deletion
- getEventLogger().message(_logSubject, QueueMessages.DELETED());
+ txn.commit();
+ }
+ protected void performQueueDeleteTasks()
+ {
+ for (Action<? super AMQQueue> task : _deleteTaskList)
+ {
+ task.performAction(this);
}
- return getQueueDepthMessages();
+ _deleteTaskList.clear();
}
@Override
@@ -2551,6 +2599,19 @@ public abstract class AbstractQueue<X ex
}
+ private static class AcquireAllQueueEntryFilter implements QueueEntryFilter
+ {
+ public boolean accept(QueueEntry entry)
+ {
+ return entry.acquire();
+ }
+
+ public boolean filterComplete()
+ {
+ return false;
+ }
+ }
+
private final class QueueEntryListener implements StateChangeListener<MessageInstance, QueueEntry.State>
{
@@ -2994,10 +3055,17 @@ public abstract class AbstractQueue<X ex
@StateTransition(currentState = State.ACTIVE, desiredState = State.DELETED)
private ListenableFuture<Void> doDelete()
{
- _virtualHost.removeQueue(this);
- preSetAlternateExchange();
- setState(State.DELETED);
- return Futures.immediateFuture(null);
+ ListenableFuture<Integer> removeFuture = _virtualHost.removeQueueAsync(this);
+ return doAfter(removeFuture, new Runnable()
+ {
+ @Override
+ public void run()
+ {
+ preSetAlternateExchange();
+ setState(State.DELETED);
+ }
+ });
+
}
Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java?rev=1694253&r1=1694252&r2=1694253&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java Wed Aug 5 15:54:30 2015
@@ -706,10 +706,14 @@ public abstract class AbstractVirtualHos
@Override
public int removeQueue(AMQQueue<?> queue)
{
- int purged = queue.deleteAndReturnCount();
+ return doSync(removeQueueAsync(queue));
+ }
- return purged;
-}
+ @Override
+ public ListenableFuture<Integer> removeQueueAsync(final AMQQueue<?> queue)
+ {
+ return queue.deleteAndReturnCount();
+ }
public AMQQueue<?> createQueue(Map<String, Object> attributes) throws QueueExistsException
{
@@ -856,12 +860,18 @@ public abstract class AbstractVirtualHos
}
+ @Override
+ public void removeExchange(final ExchangeImpl<?> exchange, final boolean force)
+ throws ExchangeIsAlternateException, RequiredExchangeException
+ {
+ doSync(removeExchangeAsync(exchange, force));
+ }
@Override
- public void removeExchange(ExchangeImpl exchange, boolean force)
+ public ListenableFuture<Void> removeExchangeAsync(ExchangeImpl exchange, boolean force)
throws ExchangeIsAlternateException, RequiredExchangeException
{
- exchange.deleteWithChecks();
+ return exchange.deleteWithChecks();
}
@Override
Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java?rev=1694253&r1=1694252&r2=1694253&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java Wed Aug 5 15:54:30 2015
@@ -25,6 +25,8 @@ import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ScheduledFuture;
+import com.google.common.util.concurrent.ListenableFuture;
+
import org.apache.qpid.server.exchange.ExchangeImpl;
import org.apache.qpid.server.logging.EventLogger;
import org.apache.qpid.server.logging.EventLoggerProvider;
@@ -60,6 +62,8 @@ public interface VirtualHostImpl< X exte
int removeQueue(Q queue);
+ ListenableFuture<Integer> removeQueueAsync(Q queue);
+
Q createQueue(Map<String, Object> arguments) throws QueueExistsException;
E createExchange(Map<String,Object> attributes)
@@ -67,6 +71,9 @@ public interface VirtualHostImpl< X exte
NoFactoryForTypeException;
void removeExchange(E exchange, boolean force) throws ExchangeIsAlternateException,
+ RequiredExchangeException;
+
+ ListenableFuture<Void> removeExchangeAsync(E exchange, boolean force) throws ExchangeIsAlternateException,
RequiredExchangeException;
E getAttainedExchange(String name);
Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/RedirectingVirtualHostImpl.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/RedirectingVirtualHostImpl.java?rev=1694253&r1=1694252&r2=1694253&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/RedirectingVirtualHostImpl.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/RedirectingVirtualHostImpl.java Wed Aug 5 15:54:30 2015
@@ -165,6 +165,14 @@ class RedirectingVirtualHostImpl
}
@Override
+ public ListenableFuture<Void> removeExchangeAsync(final ExchangeImpl<?> exchange, final boolean force)
+ throws ExchangeIsAlternateException, RequiredExchangeException
+ {
+ throwUnsupportedForRedirector();
+ return null;
+ }
+
+ @Override
public MessageDestination getAttainedMessageDestination(final String name)
{
return null;
@@ -335,6 +343,13 @@ class RedirectingVirtualHostImpl
}
@Override
+ public ListenableFuture<Integer> removeQueueAsync(final AMQQueue<?> queue)
+ {
+ throwUnsupportedForRedirector();
+ return null;
+ }
+
+ @Override
public int removeQueue(final AMQQueue<?> queue)
{
throwUnsupportedForRedirector();
Modified: qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java?rev=1694253&r1=1694252&r2=1694253&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java Wed Aug 5 15:54:30 2015
@@ -889,6 +889,10 @@ public class AMQChannel
{
unsubscribeAllConsumers();
setDefaultQueue(null);
+ if(_modelObject != null)
+ {
+ _modelObject.delete();
+ }
for (Action<? super AMQChannel> task : _taskList)
{
task.performAction(this);
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org