You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2016/11/15 14:16:10 UTC
svn commit: r1769837 [1/4] - in /qpid/java/trunk: ./
bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/
broker-core/src/main/java/org/apache/qpid/server/consumer/
broker-core/src/main/java/org/apache/qpid/server/flow/ broker-core...
Author: rgodfrey
Date: Tue Nov 15 14:16:10 2016
New Revision: 1769837
URL: http://svn.apache.org/viewvc?rev=1769837&view=rev
Log:
QPID-7514 : remove QueueRunner and move processing into the IO thread
Added:
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerManager.java
- copied unchanged from r1769836, qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerManager.java
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerManagerImpl.java
- copied unchanged from r1769836, qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerManagerImpl.java
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerNode.java
- copied unchanged from r1769836, qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerNode.java
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerNodeIterator.java
- copied unchanged from r1769836, qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerNodeIterator.java
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerNodeList.java
- copied unchanged from r1769836, qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerNodeList.java
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerNodeListEntry.java
- copied unchanged from r1769836, qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerNodeListEntry.java
qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/consumer/TestConsumerTarget.java
- copied unchanged from r1769836, qpid/java/branches/remove-queue-runner/broker-core/src/test/java/org/apache/qpid/server/consumer/TestConsumerTarget.java
qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/InfiniteCreditCreditManager.java
- copied unchanged from r1769836, qpid/java/branches/remove-queue-runner/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/InfiniteCreditCreditManager.java
Removed:
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerMessageInstancePair.java
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/flow/AbstractFlowCreditManager.java
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/MessageSourceConsumer.java
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/ConsumerNode.java
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/ConsumerNodeIterator.java
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerList.java
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueRunner.java
qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java
qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/queue/ConsumerListTest.java
qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageOnlyCreditManager.java
Modified:
qpid/java/trunk/ (props changed)
qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHAVirtualHostNodeImpl.java
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerImpl.java
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerTarget.java
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/flow/FlowCreditManager.java
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/MessageInstance.java
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/MessageSource.java
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/AbstractConfiguredObject.java
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/AbstractSystemConfig.java
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/BrokerImpl.java
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/adapter/SessionAdapter.java
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/port/AmqpPort.java
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/port/AmqpPortImpl.java
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/OrderedQueueEntryList.java
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/OutOfOrderQueue.java
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumer.java
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/security/AbstractKeyStore.java
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/security/TrustStoreMessageSource.java
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/AMQPConnection.java
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/AbstractAMQPConnection.java
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/MultiVersionProtocolEngine.java
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnection.java
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/ProtocolEngine.java
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/ServerNetworkConnection.java
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractSystemMessageSource.java
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/HouseKeepingTask.java
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostPropertiesNode.java
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/AbstractVirtualHostNode.java
qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/configuration/store/ManagementModeStoreHandlerTest.java
qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/queue/AbstractQueueTestBase.java
qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/queue/MockMessageInstance.java
qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/queue/PriorityQueueTest.java
qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueEntryListTest.java
qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueTest.java
qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/security/TrustStoreMessageSourceTest.java
qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/transport/TCPandSSLTransportTest.java
qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/virtualhost/VirtualHostPropertiesNodeTest.java
qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/AMQPConnection_0_10.java
qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java
qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/CreditCreditManager.java
qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/FlowCreditManager_0_10.java
qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageAcceptCompletionListener.java
qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java
qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java
qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java
qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/WindowCreditManager.java
qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQPConnection_0_8Impl.java
qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java
qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/NoAckCreditManager.java
qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/Pre0_10CreditManager.java
qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/Pre0_10CreditManagerTest.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
qpid/java/trunk/broker-plugins/logging-logback/src/main/java/org/apache/qpid/server/logging/logback/BrokerFileLoggerImpl.java
qpid/java/trunk/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNode.java
qpid/java/trunk/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNodeConsumer.java
qpid/java/trunk/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementResponse.java
qpid/java/trunk/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/HttpManagement.java
qpid/java/trunk/broker-plugins/websocket/src/main/java/org/apache/qpid/server/transport/websocket/WebSocketProvider.java
qpid/java/trunk/test-profiles/apache-ci.test.overridden.properties
Propchange: qpid/java/trunk/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Tue Nov 15 14:16:10 2016
@@ -9,4 +9,5 @@
/qpid/branches/java-broker-vhost-refactor/java:1493674-1494547
/qpid/branches/java-network-refactor/qpid/java:805429-821809
/qpid/branches/qpid-2935/qpid/java:1061302-1072333
+/qpid/java/branches/remove-queue-runner:1767741-1769836
/qpid/trunk/qpid:796646-796653
Modified: qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHAVirtualHostNodeImpl.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHAVirtualHostNodeImpl.java?rev=1769837&r1=1769836&r2=1769837&view=diff
==============================================================================
--- qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHAVirtualHostNodeImpl.java (original)
+++ qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHAVirtualHostNodeImpl.java Tue Nov 15 14:16:10 2016
@@ -510,16 +510,18 @@ public class BDBHAVirtualHostNodeImpl ex
}
@Override
- protected void onClose()
+ protected ListenableFuture<Void> onClose()
{
- try
- {
- super.onClose();
- }
- finally
- {
- closeEnvironment();
- }
+ return doAfterAlways(super.onClose(),
+ new Runnable()
+ {
+ @Override
+ public void run()
+ {
+ closeEnvironment();
+ }
+ });
+
}
@Override
Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java?rev=1769837&r1=1769836&r2=1769837&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java Tue Nov 15 14:16:10 2016
@@ -23,15 +23,11 @@ package org.apache.qpid.server.consumer;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
-import java.util.Set;
-import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CopyOnWriteArrayList;
-import java.util.concurrent.CopyOnWriteArraySet;
-import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantLock;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -39,301 +35,224 @@ import org.apache.qpid.server.logging.Lo
import org.apache.qpid.server.logging.messages.SubscriptionMessages;
import org.apache.qpid.server.message.MessageInstance;
import org.apache.qpid.server.model.Consumer;
+import org.apache.qpid.server.protocol.AMQSessionModel;
+import org.apache.qpid.server.queue.AbstractQueue;
import org.apache.qpid.server.queue.SuspendedConsumerLoggingTicker;
import org.apache.qpid.server.transport.AMQPConnection;
-import org.apache.qpid.server.util.StateChangeListener;
-public abstract class AbstractConsumerTarget implements ConsumerTarget, LogSubject
+public abstract class AbstractConsumerTarget implements ConsumerTarget
{
private static final Logger LOGGER = LoggerFactory.getLogger(AbstractConsumerTarget.class);
- protected static final String PULL_ONLY_CONSUMER = "x-pull-only";
- private final AtomicReference<State> _state;
-
- private final Set<StateChangeListener<ConsumerTarget, State>> _stateChangeListeners = new
- CopyOnWriteArraySet<>();
+ private static final LogSubject MULTI_QUEUE_LOG_SUBJECT = new LogSubject()
+ {
+ @Override
+ public String toLogString()
+ {
+ return "[(** Multi-Queue **)] ";
+ }
+ };
+ private final AtomicReference<State> _state = new AtomicReference<>(State.OPEN);
- private final Lock _stateChangeLock = new ReentrantLock();
- private final AtomicInteger _stateActivates = new AtomicInteger();
private final boolean _isMultiQueue;
private final SuspendedConsumerLoggingTicker _suspendedConsumerLoggingTicker;
- private ConcurrentLinkedQueue<ConsumerMessageInstancePair> _queue = new ConcurrentLinkedQueue();
private final List<ConsumerImpl> _consumers = new CopyOnWriteArrayList<>();
- private final boolean _isPullOnly;
private Iterator<ConsumerImpl> _pullIterator;
+ private boolean _notifyWorkDesired;
-
- protected AbstractConsumerTarget(final State initialState,
- final boolean isPullOnly,
- final boolean isMultiQueue,
+ protected AbstractConsumerTarget(final boolean isMultiQueue,
final AMQPConnection<?> amqpConnection)
{
- _state = new AtomicReference<State>(initialState);
- _isPullOnly = isPullOnly;
_isMultiQueue = isMultiQueue;
- _suspendedConsumerLoggingTicker = isMultiQueue
- ? new SuspendedConsumerLoggingTicker(amqpConnection.getContextValue(Long.class, Consumer.SUSPEND_NOTIFICATION_PERIOD))
- {
- @Override
- protected void log(final long period)
- {
- amqpConnection.getEventLogger().message(AbstractConsumerTarget.this, SubscriptionMessages.STATE(period));
- }
- }
- : null;
+ _suspendedConsumerLoggingTicker = new SuspendedConsumerLoggingTicker(amqpConnection.getContextValue(Long.class, Consumer.SUSPEND_NOTIFICATION_PERIOD))
+ {
+ @Override
+ protected void log(final long period)
+ {
+ amqpConnection.getEventLogger().message(AbstractConsumerTarget.this.getLogSubject(), SubscriptionMessages.STATE(period));
+ }
+ };
}
- public boolean isMultiQueue()
- {
- return _isMultiQueue;
- }
-
- @Override
- public boolean processPending()
+ private LogSubject getLogSubject()
{
- if (!getSessionModel().getAMQPConnection().isIOThread())
- {
- return false;
- }
- if(sendNextMessage())
+ if (_consumers.size() == 1 && _consumers.get(0) instanceof LogSubject)
{
- return true;
+ return (LogSubject) _consumers.get(0);
}
else
{
- processStateChanged();
- processClosed();
- return false;
+ return MULTI_QUEUE_LOG_SUBJECT;
}
}
- @Override
- public boolean hasPendingWork()
+ public boolean isMultiQueue()
{
- return hasMessagesToSend() || hasStateChanged() || hasClosed();
+ return _isMultiQueue;
}
- protected abstract boolean hasStateChanged();
-
- protected abstract boolean hasClosed();
-
- protected abstract void processStateChanged();
-
- protected abstract void processClosed();
-
@Override
- public void consumerAdded(final ConsumerImpl sub)
+ public void notifyWork()
{
- _consumers.add(sub);
+ getSessionModel().notifyWork(this);
}
- @Override
- public void consumerRemoved(final ConsumerImpl sub)
+ protected final void setNotifyWorkDesired(final boolean desired)
{
- _consumers.remove(sub);
- if(_consumers.isEmpty())
+ if (desired != _notifyWorkDesired)
{
- close();
+ if(_suspendedConsumerLoggingTicker != null)
+ {
+ if (desired)
+ {
+ getSessionModel().removeTicker(_suspendedConsumerLoggingTicker);
+ }
+ else
+ {
+ _suspendedConsumerLoggingTicker.setStartTime(System.currentTimeMillis());
+ getSessionModel().addTicker(_suspendedConsumerLoggingTicker);
+ }
+ }
+
+ for (ConsumerImpl consumer : _consumers)
+ {
+ consumer.setNotifyWorkDesired(desired);
+ }
+
+ _notifyWorkDesired = desired;
}
}
- public List<ConsumerImpl> getConsumers()
+ public final boolean isNotifyWorkDesired()
{
- return _consumers;
+ return _notifyWorkDesired;
}
-
@Override
- public final boolean isSuspended()
+ public boolean processPending()
{
- return getSessionModel().getAMQPConnection().isMessageAssignmentSuspended() || isFlowSuspended();
- }
+ if (!getSessionModel().getAMQPConnection().isIOThread())
+ {
+ return false;
+ }
- @Override
- public boolean hasCredit()
- {
- return !isFlowSuspended();
+ // TODO - if not closed
+ return sendNextMessage();
}
- protected abstract boolean isFlowSuspended();
-
- public final State getState()
+ @Override
+ public void consumerAdded(final ConsumerImpl sub)
{
- return _state.get();
+ _consumers.add(sub);
}
- protected final boolean updateState(State from, State to)
+ @Override
+ public ListenableFuture<Void> consumerRemoved(final ConsumerImpl sub)
{
- if(_state.compareAndSet(from, to))
+ if(_consumers.contains(sub))
{
- if (to == State.ACTIVE && _stateChangeListeners.size() > 1)
- {
- int offset = _stateActivates.incrementAndGet();
- if (offset >= _stateChangeListeners.size())
- {
- _stateActivates.set(0);
- offset = 0;
- }
-
- List<StateChangeListener<ConsumerTarget, State>> holdovers = new ArrayList<>();
- int pos = 0;
- for (StateChangeListener<ConsumerTarget, State> listener : _stateChangeListeners)
- {
- if (pos++ < offset)
+ return doOnIoThreadAsync(
+ new Runnable()
{
- holdovers.add(listener);
- }
- else
- {
- listener.stateChanged(this, from, to);
- }
- }
- for (StateChangeListener<ConsumerTarget, State> listener : holdovers)
- {
- listener.stateChanged(this, from, to);
- }
-
- }
- else
- {
- if(!_stateChangeListeners.isEmpty())
- {
- for (StateChangeListener<ConsumerTarget, State> listener : _stateChangeListeners)
- {
- listener.stateChanged(this, from, to);
- }
- }
- }
- if(_suspendedConsumerLoggingTicker != null)
- {
- if (to == State.SUSPENDED)
- {
- _suspendedConsumerLoggingTicker.setStartTime(System.currentTimeMillis());
- getSessionModel().addTicker(_suspendedConsumerLoggingTicker);
- }
- else
- {
- getSessionModel().removeTicker(_suspendedConsumerLoggingTicker);
- }
- }
- return true;
+ @Override
+ public void run()
+ {
+ consumerRemovedInternal(sub);
+ }
+ });
}
else
{
- return false;
+ return Futures.immediateFuture(null);
}
}
- @Override
- public final void notifyCurrentState()
- {
-
- for (StateChangeListener<ConsumerTarget, State> listener : _stateChangeListeners)
- {
- State state = getState();
- listener.stateChanged(this, state, state);
- }
- }
- public final void addStateListener(StateChangeListener<ConsumerTarget, State> listener)
+ private ListenableFuture<Void> doOnIoThreadAsync(final Runnable task)
{
- _stateChangeListeners.add(listener);
+ AMQSessionModel<?> sessionModel = getSessionModel();
+ return sessionModel.getAMQPConnection().doOnIOThreadAsync(task);
}
- @Override
- public void removeStateChangeListener(final StateChangeListener<ConsumerTarget, State> listener)
+ private void consumerRemovedInternal(final ConsumerImpl sub)
{
- _stateChangeListeners.remove(listener);
+ _consumers.remove(sub);
+ if(_consumers.isEmpty())
+ {
+ close();
+ }
}
- public final boolean trySendLock()
+ public List<ConsumerImpl> getConsumers()
{
- return _stateChangeLock.tryLock();
+ return _consumers;
}
- public final void getSendLock()
- {
- _stateChangeLock.lock();
- }
- public final void releaseSendLock()
+ @Override
+ public final boolean isSuspended()
{
- _stateChangeLock.unlock();
+ return !isNotifyWorkDesired();
}
- @Override
- public boolean isPullOnly()
+ public final State getState()
{
- return _isPullOnly;
+ return _state.get();
}
@Override
public final long send(final ConsumerImpl consumer, MessageInstance entry, boolean batch)
{
- AMQPConnection<?> amqpConnection = getSessionModel().getAMQPConnection();
- amqpConnection.reserveOutboundMessageSpace(entry.getMessage().getSize());
- _queue.add(new ConsumerMessageInstancePair(consumer, entry, batch));
- amqpConnection.notifyWork();
+ doSend(consumer, entry, batch);
+
+ if (consumer.acquires())
+ {
+ entry.makeAcquisitionStealable();
+ }
return entry.getMessage().getSize();
}
protected abstract void doSend(final ConsumerImpl consumer, MessageInstance entry, boolean batch);
- @Override
- public boolean hasMessagesToSend()
- {
- return !_queue.isEmpty() || (isPullOnly() && messagesAvailable());
- }
- private boolean messagesAvailable()
+ @Override
+ public boolean sendNextMessage()
{
- if(hasCredit())
+ AbstractQueue.MessageContainer messageContainer = null;
+ ConsumerImpl consumer = null;
+ boolean iteratedCompleteList = false;
+ while (messageContainer == null)
{
- for (ConsumerImpl consumer : _consumers)
+ if (_pullIterator == null || !_pullIterator.hasNext())
{
- if (consumer.hasAvailableMessages())
+ if (iteratedCompleteList)
{
- return true;
+ break;
}
- }
- }
- return false;
- }
+ iteratedCompleteList = true;
- @Override
- public boolean sendNextMessage()
- {
- if(isPullOnly())
- {
- if(_pullIterator == null || !_pullIterator.hasNext())
- {
_pullIterator = getConsumers().iterator();
}
- if(_pullIterator.hasNext())
+ if (_pullIterator.hasNext())
{
- ConsumerImpl consumer = _pullIterator.next();
- consumer.pullMessage();
+ consumer = _pullIterator.next();
+ messageContainer = consumer.pullMessage();
}
}
- ConsumerMessageInstancePair consumerMessage = _queue.poll();
- if (consumerMessage != null)
+
+ if (messageContainer != null)
{
+ MessageInstance entry = messageContainer._messageInstance;
try
{
-
- ConsumerImpl consumer = consumerMessage.getConsumer();
- MessageInstance entry = consumerMessage.getEntry();
- boolean batch = consumerMessage.isBatch();
- doSend(consumer, entry, batch);
-
- if (consumer.acquires())
- {
- entry.makeAcquisitionStealable();
- }
+ send(consumer, entry, false);
}
finally
{
- consumerMessage.release();
+ if (messageContainer._messageReference != null)
+ {
+ messageContainer._messageReference.release();
+ }
}
return true;
}
@@ -347,54 +266,27 @@ public abstract class AbstractConsumerTa
final public boolean close()
{
- boolean closed = false;
- State state = getState();
-
- getSendLock();
- try
+ if (_state.compareAndSet(State.OPEN, State.CLOSED))
{
- while(!closed && state != State.CLOSED)
+ List<ConsumerImpl> consumers = new ArrayList<>(_consumers);
+ _consumers.clear();
+
+ setNotifyWorkDesired(false);
+
+ for (ConsumerImpl consumer : consumers)
{
- closed = updateState(state, State.CLOSED);
- if(!closed)
- {
- state = getState();
- }
+ consumer.close();
}
- ConsumerMessageInstancePair instance;
- while((instance = _queue.poll()) != null)
+ if (_suspendedConsumerLoggingTicker != null)
{
- MessageInstance entry = instance.getEntry();
- entry.release(instance.getConsumer());
- instance.release();
+ getSessionModel().removeTicker(_suspendedConsumerLoggingTicker);
}
- doCloseInternal();
- }
- finally
- {
- releaseSendLock();
- }
- for (ConsumerImpl consumer : _consumers)
- {
- consumer.close();
+ return true;
}
- if(_suspendedConsumerLoggingTicker != null)
+ else
{
- getSessionModel().removeTicker(_suspendedConsumerLoggingTicker);
+ return false;
}
-
- return closed;
-
- }
-
- @Override
- public String toLogString()
- {
-
- return "[(** Multi-Queue **)] ";
}
-
-
- protected abstract void doCloseInternal();
}
Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerImpl.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerImpl.java?rev=1769837&r1=1769836&r2=1769837&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerImpl.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerImpl.java Tue Nov 15 14:16:10 2016
@@ -24,6 +24,7 @@ import java.util.concurrent.atomic.Atomi
import org.apache.qpid.server.message.MessageSource;
import org.apache.qpid.server.protocol.AMQSessionModel;
+import org.apache.qpid.server.queue.AbstractQueue;
public interface ConsumerImpl
{
@@ -33,9 +34,9 @@ public interface ConsumerImpl
ConsumerTarget getTarget();
- boolean hasAvailableMessages();
+ AbstractQueue.MessageContainer pullMessage();
- void pullMessage();
+ void setNotifyWorkDesired(boolean desired);
enum Option
{
@@ -71,16 +72,7 @@ public interface ConsumerImpl
void close();
- boolean trySendLock();
-
-
- void getSendLock();
-
- void releaseSendLock();
-
boolean isActive();
String getName();
-
- void flush();
}
Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerTarget.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerTarget.java?rev=1769837&r1=1769836&r2=1769837&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerTarget.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerTarget.java Tue Nov 15 14:16:10 2016
@@ -20,43 +20,38 @@
*/
package org.apache.qpid.server.consumer;
+import com.google.common.util.concurrent.ListenableFuture;
+
import org.apache.qpid.server.message.MessageInstance;
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.protocol.AMQSessionModel;
-import org.apache.qpid.server.util.StateChangeListener;
public interface ConsumerTarget
{
-
-
void acquisitionRemoved(MessageInstance node);
- void removeStateChangeListener(StateChangeListener<ConsumerTarget, State> listener);
-
boolean processPending();
- boolean hasPendingWork();
-
String getTargetAddress();
- boolean hasCredit();
-
boolean isMultiQueue();
+ void notifyWork();
+
+ void updateNotifyWorkDesired();
+
+ boolean isNotifyWorkDesired();
+
enum State
{
- ACTIVE, SUSPENDED, CLOSED
+ OPEN, CLOSED
}
State getState();
void consumerAdded(ConsumerImpl sub);
- void consumerRemoved(ConsumerImpl sub);
-
- void notifyCurrentState();
-
- void addStateListener(StateChangeListener<ConsumerTarget, State> listener);
+ ListenableFuture<Void> consumerRemoved(ConsumerImpl sub);
long getUnacknowledgedBytes();
@@ -66,14 +61,10 @@ public interface ConsumerTarget
long send(final ConsumerImpl consumer, MessageInstance entry, boolean batch);
- boolean hasMessagesToSend();
-
boolean sendNextMessage();
void flushBatched();
- void queueDeleted();
-
void queueEmpty();
boolean allocateCredit(ServerMessage msg);
@@ -83,13 +74,4 @@ public interface ConsumerTarget
boolean isSuspended();
boolean close();
-
- boolean trySendLock();
-
- void getSendLock();
-
- void releaseSendLock();
-
- boolean isPullOnly();
-
}
Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/flow/FlowCreditManager.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/flow/FlowCreditManager.java?rev=1769837&r1=1769836&r2=1769837&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/flow/FlowCreditManager.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/flow/FlowCreditManager.java Tue Nov 15 14:16:10 2016
@@ -24,19 +24,10 @@ package org.apache.qpid.server.flow;
public interface FlowCreditManager
{
- public static interface FlowCreditManagerListener
- {
- void creditStateChanged(boolean hasCredit);
- }
+ void restoreCredit(long messageCredit, long bytesCredit);
- void addStateListener(FlowCreditManagerListener listener);
+ boolean hasCredit();
- boolean removeListener(FlowCreditManagerListener listener);
-
- public void restoreCredit(long messageCredit, long bytesCredit);
-
- public boolean hasCredit();
-
- public boolean useCreditForMessage(long msgSize);
+ boolean useCreditForMessage(long msgSize);
}
Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/MessageInstance.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/MessageInstance.java?rev=1769837&r1=1769836&r2=1769837&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/MessageInstance.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/MessageInstance.java Tue Nov 15 14:16:10 2016
@@ -248,8 +248,6 @@ public interface MessageInstance
void release(ConsumerImpl release);
- boolean resend();
-
void delete();
boolean isDeleted();
Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/MessageSource.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/MessageSource.java?rev=1769837&r1=1769836&r2=1769837&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/MessageSource.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/MessageSource.java Tue Nov 15 14:16:10 2016
@@ -37,7 +37,7 @@ public interface MessageSource extends T
EnumSet<ConsumerImpl.Option> options,
Integer priority)
throws ExistingExclusiveConsumer, ExistingConsumerPreventsExclusive,
- ConsumerAccessRefused;
+ ConsumerAccessRefused, QueueDeleted;
Collection<? extends ConsumerImpl> getConsumers();
@@ -56,6 +56,19 @@ public interface MessageSource extends T
{
}
}
+
+ /**
+ * QueueDeleted signals a failure to create a consumer, because the queue has been deleted.
+ * <p>
+ * TODO Move to top level, used outside this class.
+ */
+ final class QueueDeleted extends Exception
+ {
+
+ public QueueDeleted()
+ {
+ }
+ }
/**
* ExistingConsumerPreventsExclusive signals a failure to create an exclusive consumer, as a consumer
Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/AbstractConfiguredObject.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/AbstractConfiguredObject.java?rev=1769837&r1=1769836&r2=1769837&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/AbstractConfiguredObject.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/AbstractConfiguredObject.java Tue Nov 15 14:16:10 2016
@@ -774,16 +774,25 @@ public abstract class AbstractConfigured
{
return closeChildren();
}
+ }).then(new Callable<ListenableFuture<Void>>()
+ {
+ @Override
+ public ListenableFuture<Void> call() throws Exception
+ {
+ return onClose();
+ }
}).then(new Runnable()
- {
- @Override
- public void run()
- {
- onClose();
- unregister(false);
- LOGGER.debug("Closed " + AbstractConfiguredObject.this.getClass().getSimpleName() + " : " + getName());
- }
- });
+ {
+ @Override
+ public void run()
+ {
+ unregister(false);
+ LOGGER.debug("Closed "
+ + AbstractConfiguredObject.this.getClass().getSimpleName()
+ + " : "
+ + getName());
+ }
+ });
}
else
{
@@ -819,8 +828,9 @@ public abstract class AbstractConfigured
return Futures.immediateFuture(null);
}
- protected void onClose()
+ protected ListenableFuture<Void> onClose()
{
+ return Futures.immediateFuture(null);
}
public final void create()
Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/AbstractSystemConfig.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/AbstractSystemConfig.java?rev=1769837&r1=1769836&r2=1769837&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/AbstractSystemConfig.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/AbstractSystemConfig.java Tue Nov 15 14:16:10 2016
@@ -168,7 +168,7 @@ public abstract class AbstractSystemConf
}
@Override
- protected void onClose()
+ protected ListenableFuture<Void> onClose()
{
final TaskExecutor taskExecutor = getTaskExecutor();
try
@@ -192,7 +192,7 @@ public abstract class AbstractSystemConf
taskExecutor.stopImmediately();
}
}
-
+ return Futures.immediateFuture(null);
}
@Override
Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/BrokerImpl.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/BrokerImpl.java?rev=1769837&r1=1769836&r2=1769837&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/BrokerImpl.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/BrokerImpl.java Tue Nov 15 14:16:10 2016
@@ -671,7 +671,7 @@ public class BrokerImpl extends Abstract
}
@Override
- protected void onClose()
+ protected ListenableFuture<Void> onClose()
{
if (_reportingTimer != null)
{
@@ -700,6 +700,8 @@ public class BrokerImpl extends Abstract
task.run();
}
}
+ return Futures.immediateFuture(null);
+
}
@Override
Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java?rev=1769837&r1=1769836&r2=1769837&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java Tue Nov 15 14:16:10 2016
@@ -90,9 +90,10 @@ public interface Queue<X extends Queue<X
@ManagedContextDefault( name = QUEUE_ESTIMATED_MESSAGE_MEMORY_OVERHEAD)
long DEFAULT_ESTIMATED_MESSAGE_MEMORY_OVERHEAD = 1024l;
- String MAX_ASYNCHRONOUS_DELIVERIES = "queue.maxAsynchronousDeliveries";
- @ManagedContextDefault(name = MAX_ASYNCHRONOUS_DELIVERIES )
- int DEFAULT_MAX_ASYNCHRONOUS_DELIVERIES = 80;
+ String QUEUE_SCAVANGE_COUNT = "qpid.queue.scavenge_count";
+ @ManagedContextDefault( name = QUEUE_SCAVANGE_COUNT)
+ int DEFAULT_QUEUE_SCAVANGE_COUNT = 50;
+
String MIME_TYPE_TO_FILE_EXTENSION = "qpid.mimeTypeToFileExtension";
@SuppressWarnings("unused")
@@ -366,8 +367,6 @@ public interface Queue<X extends Queue<X
void requeue(QueueEntry entry);
- boolean resend(QueueEntry entry, QueueConsumer<?> consumer);
-
List<? extends QueueEntry> getMessagesOnTheQueue();
List<Long> getMessagesOnTheQueue(int num);
@@ -394,8 +393,6 @@ public interface Queue<X extends Queue<X
Set<NotificationCheck> getNotificationChecks();
- void deliverAsync();
-
Collection<String> getAvailableAttributes();
void completeRecovery();
Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/adapter/SessionAdapter.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/adapter/SessionAdapter.java?rev=1769837&r1=1769836&r2=1769837&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/adapter/SessionAdapter.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/adapter/SessionAdapter.java Tue Nov 15 14:16:10 2016
@@ -60,9 +60,9 @@ public final class SessionAdapter extend
// Attributes
private final AMQSessionModel _session;
private final Action _deleteModelTask;
- private final AbstractAMQPConnection<?> _amqpConnection;
+ private final AbstractAMQPConnection<?,?> _amqpConnection;
- public SessionAdapter(final AbstractAMQPConnection<?> amqpConnection,
+ public SessionAdapter(final AbstractAMQPConnection<?,?> amqpConnection,
final AMQSessionModel session)
{
super(parentsMap(amqpConnection), createAttributes(session));
@@ -206,7 +206,7 @@ public final class SessionAdapter extend
return Futures.immediateFuture(null);
}
- private void registerTransactionTimeoutTickers(AbstractAMQPConnection<?> amqpConnection,
+ private void registerTransactionTimeoutTickers(AbstractAMQPConnection<?,?> amqpConnection,
final AMQSessionModel session)
{
NamedAddressSpace addressSpace = amqpConnection.getAddressSpace();
Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/port/AmqpPort.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/port/AmqpPort.java?rev=1769837&r1=1769836&r2=1769837&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/port/AmqpPort.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/port/AmqpPort.java Tue Nov 15 14:16:10 2016
@@ -57,7 +57,6 @@ public interface AmqpPort<X extends Amqp
String PORT_AMQP_NUMBER_OF_SELECTORS = "qpid.port.amqp.threadPool.numberOfSelectors";
String PORT_AMQP_ACCEPT_BACKLOG = "qpid.port.amqp.acceptBacklog";
- String PORT_AMQP_OUTBOUND_MESSAGE_BUFFER_SIZE = "qpid.port.amqp.outboundMessageBufferSize";
@ManagedContextDefault(name = DEFAULT_AMQP_PROTOCOLS)
String INSTALLED_PROTOCOLS = AmqpPortImpl.getInstalledProtocolsAsString();
@@ -88,11 +87,6 @@ public interface AmqpPort<X extends Amqp
@ManagedContextDefault(name = OPEN_CONNECTIONS_WARN_PERCENT)
int DEFAULT_OPEN_CONNECTIONS_WARN_PERCENT = 80;
- @SuppressWarnings("unused")
- @ManagedContextDefault(name = PORT_AMQP_OUTBOUND_MESSAGE_BUFFER_SIZE)
- long DEFAULT_PORT_AMQP_OUTBOUND_MESSAGE_BUFFER_SIZE = 1024 * 1024;
-
-
String PROTOCOL_HANDSHAKE_TIMEOUT = "qpid.port.protocol_handshake_timeout";
@SuppressWarnings("unused")
Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/port/AmqpPortImpl.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/port/AmqpPortImpl.java?rev=1769837&r1=1769836&r2=1769837&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/port/AmqpPortImpl.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/port/AmqpPortImpl.java Tue Nov 15 14:16:10 2016
@@ -43,6 +43,7 @@ import javax.net.ssl.X509TrustManager;
import javax.security.auth.Subject;
import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
import org.slf4j.Logger;
@@ -310,7 +311,7 @@ public class AmqpPortImpl extends Abstra
}
@Override
- protected void onClose()
+ protected ListenableFuture<Void> onClose()
{
if (_transport != null)
{
@@ -322,6 +323,7 @@ public class AmqpPortImpl extends Abstra
_transport.close();
}
+ return Futures.immediateFuture(null);
}
@Override
Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java?rev=1769837&r1=1769836&r2=1769837&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java Tue Nov 15 14:16:10 2016
@@ -25,6 +25,7 @@ import java.util.UUID;
import java.util.concurrent.ConcurrentSkipListSet;
import org.apache.qpid.protocol.AMQConstant;
+import org.apache.qpid.server.consumer.ConsumerTarget;
import org.apache.qpid.server.logging.LogSubject;
import org.apache.qpid.server.model.Consumer;
import org.apache.qpid.server.model.Queue;
@@ -106,6 +107,5 @@ public interface AMQSessionModel<T exten
void addTicker(Ticker ticker);
void removeTicker(Ticker ticker);
- void notifyConsumerTargetCurrentStates();
- void ensureConsumersNoticedStateChange();
+ void notifyWork(ConsumerTarget target);
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org