You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2016/11/15 14:16:10 UTC
svn commit: r1769837 [2/4] - in /qpid/java/trunk: ./
bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/
broker-core/src/main/java/org/apache/qpid/server/consumer/
broker-core/src/main/java/org/apache/qpid/server/flow/ broker-core...
Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java?rev=1769837&r1=1769836&r2=1769837&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java Tue Nov 15 14:16:10 2016
@@ -37,6 +37,7 @@ import java.util.Collection;
import java.util.Collections;
import java.util.EnumSet;
import java.util.HashSet;
+import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
@@ -57,6 +58,7 @@ import java.util.zip.GZIPOutputStream;
import javax.security.auth.Subject;
+import com.google.common.collect.Lists;
import com.google.common.io.ByteStreams;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
@@ -113,15 +115,13 @@ import org.apache.qpid.server.util.Conne
import org.apache.qpid.server.util.Deletable;
import org.apache.qpid.server.util.MapValueConverter;
import org.apache.qpid.server.util.ServerScopedRuntimeException;
-import org.apache.qpid.server.util.StateChangeListener;
+import org.apache.qpid.server.virtualhost.HouseKeepingTask;
import org.apache.qpid.server.virtualhost.QueueManagingVirtualHost;
import org.apache.qpid.server.virtualhost.VirtualHostUnavailableException;
-import org.apache.qpid.transport.TransportException;
public abstract class AbstractQueue<X extends AbstractQueue<X>>
extends AbstractConfiguredObject<X>
implements Queue<X>,
- StateChangeListener<QueueConsumer<?>, State>,
MessageGroupManager.ConsumerResetHelper
{
@@ -147,18 +147,13 @@ public abstract class AbstractQueue<X ex
private final QueueManagingVirtualHost<?> _virtualHost;
private final DeletedChildListener _deletedChildListener = new DeletedChildListener();
- private final AccessControlContext _immediateDeliveryContext;
+ private QueueConsumerManagerImpl _queueConsumerManager;
@ManagedAttributeField( beforeSet = "preSetAlternateExchange", afterSet = "postSetAlternateExchange")
private Exchange _alternateExchange;
-
- private final QueueConsumerList _consumerList = new QueueConsumerList();
-
private volatile QueueConsumer<?> _exclusiveSubscriber;
-
-
private final AtomicLong _targetQueueSize = new AtomicLong(INITIAL_TARGET_QUEUE_SIZE);
private final AtomicInteger _activeSubscriberCount = new AtomicInteger();
@@ -206,11 +201,8 @@ public abstract class AbstractQueue<X ex
Collections.synchronizedSet(EnumSet.noneOf(NotificationCheck.class));
- private volatile int _maxAsyncDeliveries;
private volatile long _estimatedAverageMessageHeaderSize;
- private final AtomicLong _stateChangeCount = new AtomicLong(Long.MIN_VALUE);
-
private AtomicBoolean _stopped = new AtomicBoolean(false);
private final Set<AMQSessionModel> _blockedChannels = new ConcurrentSkipListSet<AMQSessionModel>();
@@ -268,12 +260,52 @@ public abstract class AbstractQueue<X ex
private final ConcurrentLinkedQueue<EnqueueRequest> _postRecoveryQueue = new ConcurrentLinkedQueue<>();
- private final QueueRunner _queueRunner;
private boolean _closing;
private final ConcurrentMap<String, Callable<MessageFilter>> _defaultFiltersMap = new ConcurrentHashMap<>();
private final List<HoldMethod> _holdMethods = new CopyOnWriteArrayList<>();
private Map<String, String> _mimeTypeToFileExtension = Collections.emptyMap();
- private volatile boolean _hasPullOnlyConsumers;
+ private AdvanceConsumersTask _queueHouseKeepingTask;
+
+ void setNotifyWorkDesired(final QueueConsumer consumer, final boolean desired)
+ {
+ if (_queueConsumerManager.setInterest(consumer, desired))
+ {
+ if (desired)
+ {
+ _activeSubscriberCount.incrementAndGet();
+ notifyConsumer(consumer);
+ }
+ else
+ {
+ _activeSubscriberCount.decrementAndGet();
+
+ // iterate over interested and notify one as long as its priority is higher than any notified
+ final Iterator<QueueConsumer<?>> consumerIterator = _queueConsumerManager.getInterestedIterator();
+ final int highestNotifiedPriority = _queueConsumerManager.getHighestNotifiedPriority();
+ while (consumerIterator.hasNext())
+ {
+ QueueConsumer<?> queueConsumer = consumerIterator.next();
+ if (queueConsumer.getPriority() < highestNotifiedPriority || notifyConsumer(queueConsumer))
+ {
+ break;
+ }
+ }
+ }
+ }
+ }
+
+ private boolean notifyConsumer(final QueueConsumer<?> consumer)
+ {
+ if(_queueConsumerManager.setNotified(consumer, true))
+ {
+ consumer.notifyWork();
+ return true;
+ }
+ else
+ {
+ return false;
+ }
+ }
private interface HoldMethod
{
@@ -284,12 +316,7 @@ public abstract class AbstractQueue<X ex
{
super(parentsMap(virtualHost), attributes);
-
_virtualHost = virtualHost;
- _immediateDeliveryContext = getSystemTaskControllerContext("Immediate Delivery", virtualHost.getPrincipal());
-
- _queueRunner = new QueueRunner(this, getSystemTaskControllerContext("Queue Delivery",
- virtualHost.getPrincipal()));
}
@Override
@@ -355,8 +382,9 @@ public abstract class AbstractQueue<X ex
_arguments = Collections.synchronizedMap(arguments);
+ _queueConsumerManager = new QueueConsumerManagerImpl(this);
_logSubject = new QueueLogSubject(this);
-
+ _queueHouseKeepingTask = new AdvanceConsumersTask();
Subject activeSubject = Subject.getSubject(AccessController.getContext());
Set<SessionPrincipal> sessionPrincipals = activeSubject == null ? Collections.<SessionPrincipal>emptySet() : activeSubject.getPrincipals(SessionPrincipal.class);
AMQSessionModel<?> sessionModel;
@@ -478,7 +506,6 @@ public abstract class AbstractQueue<X ex
}
_estimatedAverageMessageHeaderSize = getContextValue(Long.class, QUEUE_ESTIMATED_MESSAGE_MEMORY_OVERHEAD);
- _maxAsyncDeliveries = getContextValue(Integer.class, Queue.MAX_ASYNCHRONOUS_DELIVERIES);
_mimeTypeToFileExtension = getContextValue(Map.class, MAP_OF_STRING_STRING, MIME_TYPE_TO_FILE_EXTENSION);
if(_defaultFilters != null)
@@ -712,12 +739,12 @@ public abstract class AbstractQueue<X ex
final EnumSet<ConsumerImpl.Option> optionSet,
final Integer priority)
throws ExistingExclusiveConsumer, ExistingConsumerPreventsExclusive,
- ConsumerAccessRefused
+ ConsumerAccessRefused, QueueDeleted
{
try
{
- return getTaskExecutor().run(new Task<QueueConsumerImpl, Exception>()
+ final QueueConsumerImpl queueConsumer = getTaskExecutor().run(new Task<QueueConsumerImpl, Exception>()
{
@Override
public QueueConsumerImpl execute() throws Exception
@@ -743,9 +770,19 @@ public abstract class AbstractQueue<X ex
return "target=" + target + ", consumerName=" + consumerName + ", optionSet=" + optionSet;
}
});
- }
- catch (ExistingExclusiveConsumer | ConsumerAccessRefused |
- ExistingConsumerPreventsExclusive | RuntimeException e)
+
+ target.consumerAdded(queueConsumer);
+ if(isEmpty())
+ {
+ target.queueEmpty();
+ }
+ target.updateNotifyWorkDesired();
+ target.notifyWork();
+ return queueConsumer;
+ }
+ catch (ExistingExclusiveConsumer | ConsumerAccessRefused
+ | ExistingConsumerPreventsExclusive | QueueDeleted
+ | RuntimeException e)
{
throw e;
}
@@ -765,8 +802,13 @@ public abstract class AbstractQueue<X ex
EnumSet<ConsumerImpl.Option> optionSet,
final Integer priority)
throws ExistingExclusiveConsumer, ConsumerAccessRefused,
- ExistingConsumerPreventsExclusive
+ ExistingConsumerPreventsExclusive, QueueDeleted
{
+ if (isDeleted())
+ {
+ throw new QueueDeleted();
+ }
+
if (hasExclusiveConsumer())
{
throw new ExistingExclusiveConsumer();
@@ -902,20 +944,12 @@ public abstract class AbstractQueue<X ex
priority);
_exclusiveOwner = exclusiveOwner;
- target.consumerAdded(consumer);
-
if (exclusive && !isTransient)
{
_exclusiveSubscriber = consumer;
}
- if(consumer.isActive())
- {
- _activeSubscriberCount.incrementAndGet();
- }
-
- consumer.setStateListener(this);
QueueContext queueContext;
if(filters == null || !filters.startAtTail())
{
@@ -927,36 +961,15 @@ public abstract class AbstractQueue<X ex
}
consumer.setQueueContext(queueContext);
- if (!isDeleted())
+ _queueConsumerManager.addConsumer(consumer);
+ if (consumer.isNotifyWorkDesired())
{
- if(consumer.isPullOnly())
- {
- _hasPullOnlyConsumers = true;
- }
- _consumerList.add(consumer);
-
- if (isDeleted())
- {
- consumer.queueDeleted();
- }
- }
- else
- {
- // TODO
+ _activeSubscriberCount.incrementAndGet();
}
childAdded(consumer);
consumer.addChangeListener(_deletedChildListener);
- if(consumer.isPullOnly())
- {
- consumer.getSessionModel().getAMQPConnection().notifyWork();
- }
- else
- {
- deliverAsync();
- }
-
return consumer;
}
@@ -976,7 +989,7 @@ public abstract class AbstractQueue<X ex
throw new NullPointerException("consumer argument is null");
}
- boolean removed = _consumerList.remove(consumer);
+ boolean removed = _queueConsumerManager.removeConsumer(consumer);
if (removed)
{
@@ -996,18 +1009,6 @@ public abstract class AbstractQueue<X ex
resetSubPointersForGroups(consumer);
}
- if(consumer.isPullOnly())
- {
- boolean hasOnlyPushConsumers = true;
- ConsumerNode consumerNode = _consumerList.getHead().findNext();
- while (consumerNode != null && hasOnlyPushConsumers)
- {
- hasOnlyPushConsumers = !consumerNode.getConsumer().isPullOnly();
- consumerNode = consumerNode.findNext();
- }
- _hasPullOnlyConsumers = !hasOnlyPushConsumers;
- }
-
// auto-delete queues must be deleted if there are no remaining subscribers
if(!consumer.isTransient()
@@ -1042,16 +1043,10 @@ public abstract class AbstractQueue<X ex
@Override
public Collection<QueueConsumer<?>> getConsumers()
{
- List<QueueConsumer<?>> consumers = new ArrayList<QueueConsumer<?>>();
- ConsumerNodeIterator iter = _consumerList.iterator();
- while(iter.advance())
- {
- consumers.add(iter.getNode().getConsumer());
- }
- return consumers;
-
+ return Lists.newArrayList(_queueConsumerManager.getAllIterator());
}
+
public void resetSubPointersForGroups(QueueConsumer<?> consumer)
{
QueueEntry entry = _messageGroupManager.findEarliestAssignedAvailableEntry(consumer);
@@ -1063,24 +1058,6 @@ public abstract class AbstractQueue<X ex
}
}
- @Override
- public void resetSubPointersForGroups(final QueueEntry entry)
- {
- ConsumerNodeIterator subscriberIter = _consumerList.iterator();
- // iterate over all the subscribers, and if they are in advance of this queue entry then move them backwards
- while (subscriberIter.advance())
- {
- QueueConsumer<?> sub = subscriberIter.getNode().getConsumer();
-
- // we don't make browsers send the same stuff twice
- if (sub.seesRequeues())
- {
- updateSubRequeueEntry(sub, entry);
- }
- }
- notifyPullOnlyConsumers();
- deliverAsync();
- }
public void addBinding(final Binding<?> binding)
{
@@ -1152,7 +1129,6 @@ public abstract class AbstractQueue<X ex
public final void recover(ServerMessage message, final MessageEnqueueRecord enqueueRecord)
{
-
doEnqueue(message, null, enqueueRecord);
}
@@ -1197,25 +1173,10 @@ public abstract class AbstractQueue<X ex
try
{
- if (action != null || (exclusiveSub == null && _queueRunner.isIdle()))
- {
- AccessController.doPrivileged(
- new PrivilegedAction<Void>()
- {
- @Override
- public Void run()
- {
- tryDeliverStraightThrough(entry);
- return null;
- }
- }, _immediateDeliveryContext);
- }
-
if (entry.isAvailable())
{
checkConsumersNotAheadOfDelivery(entry);
- notifyPullOnlyConsumers();
- deliverAsync();
+ notifyConsumers(entry);
}
checkForNotificationOnNewMessage(entry.getMessage());
@@ -1264,125 +1225,6 @@ public abstract class AbstractQueue<X ex
}
}
- /**
- * iterate over consumers and if any is at the end of the queue and can deliver this message,
- * then deliver the message
- */
- private void tryDeliverStraightThrough(final QueueEntry entry)
- {
- try
- {
- ConsumerNode node = _consumerList.getMarkedNode();
- ConsumerNode nextNode = node.findNext();
- if (nextNode == null)
- {
- nextNode = _consumerList.getHead().findNext();
- }
- while (nextNode != null)
- {
- if (_consumerList.updateMarkedNode(node, nextNode))
- {
- break;
- }
- else
- {
- node = _consumerList.getMarkedNode();
- nextNode = node.findNext();
- if (nextNode == null)
- {
- nextNode = _consumerList.getHead().findNext();
- }
- }
- }
- // always do one extra loop after we believe we've finished
- // this catches the case where we *just* miss an update
- int loops = 2;
-
- while (entry.isAvailable() && loops != 0)
- {
- if (nextNode == null)
- {
- loops--;
- nextNode = _consumerList.getHead();
- }
- else
- {
- // if consumer at end, and active, offer
- final QueueConsumer<?> sub = nextNode.getConsumer();
-
- if(sub.getPriority() == Integer.MAX_VALUE)
- {
- deliverToConsumer(sub, entry);
- }
-
- }
- nextNode = nextNode.findNext();
-
- }
- }
- catch (ConnectionScopedRuntimeException | TransportException e)
- {
- String errorMessage = "Suppressing " + e.getClass().getSimpleName() +
- " during straight through delivery, as this" +
- " can only indicate an issue with a consumer.";
- if(_logger.isDebugEnabled())
- {
- _logger.debug(errorMessage, e);
- }
- else
- {
- _logger.info(errorMessage + ' ' + e.getMessage());
- }
- }
- }
-
- private void deliverToConsumer(final QueueConsumer<?> sub, final QueueEntry entry)
- {
-
- if(sub.trySendLock())
- {
- try
- {
- // get available queue entry first in order to avoid referring old deleted queue entry in sub._queueContext._lastSeen
- if ((getNextAvailableEntry(sub) == entry)
- && !sub.isSuspended()
- && sub.hasInterest(entry)
- && mightAssign(sub, entry)
- && !sub.wouldSuspend(entry))
- {
-
- MessageReference messageReference = null;
- try
- {
-
- if ((sub.acquires() && !assign(sub, entry))
- || (!sub.acquires() && (messageReference = entry.newMessageReference()) == null))
- {
- // restore credit here that would have been taken away by wouldSuspend since we didn't manage
- // to acquire the entry for this consumer
- sub.restoreCredit(entry);
- }
- else
- {
- deliverMessage(sub, entry, false, true);
- }
- }
- finally
- {
- if (messageReference != null)
- {
- messageReference.release();
- }
- }
- }
- }
- finally
- {
- sub.releaseSendLock();
- }
- }
- }
-
private boolean assign(final QueueConsumer<?> sub, final QueueEntry entry)
{
if(_messageGroupManager == null)
@@ -1427,20 +1269,6 @@ public abstract class AbstractQueue<X ex
return _queueStatistics.getEnqueueCount();
}
- private void deliverMessage(final QueueConsumer<?> sub,
- final QueueEntry entry,
- boolean batch,
- final boolean updateLastSeen)
- {
- if(updateLastSeen)
- {
- setLastSeenEntry(sub, entry);
- }
-
- sub.send(entry, batch);
- }
-
-
private void setLastSeenEntry(final QueueConsumer<?> sub, final QueueEntry entry)
{
QueueContext subContext = sub.getQueueContext();
@@ -1458,7 +1286,6 @@ public abstract class AbstractQueue<X ex
private void updateSubRequeueEntry(final QueueConsumer<?> sub, final QueueEntry entry)
{
-
QueueContext subContext = sub.getQueueContext();
if(subContext != null)
{
@@ -1468,19 +1295,33 @@ public abstract class AbstractQueue<X ex
{
if(QueueContext._releasedUpdater.compareAndSet(subContext, oldEntry, entry))
{
+ notifyConsumer(sub);
break;
}
}
}
}
+
+ @Override
+ public void resetSubPointersForGroups(final QueueEntry entry)
+ {
+ resetSubPointers(entry, true);
+ }
+
+ @Override
public void requeue(QueueEntry entry)
{
- ConsumerNodeIterator subscriberIter = _consumerList.iterator();
+ resetSubPointers(entry, false);
+ }
+
+ private void resetSubPointers(final QueueEntry entry, final boolean ignoreAvailable)
+ {
+ Iterator<QueueConsumer<?>> consumerIterator = _queueConsumerManager.getAllIterator();
// iterate over all the subscribers, and if they are in advance of this queue entry then move them backwards
- while (subscriberIter.advance() && entry.isAvailable())
+ while (consumerIterator.hasNext() && (ignoreAvailable || entry.isAvailable()))
{
- QueueConsumer<?> sub = subscriberIter.getNode().getConsumer();
+ QueueConsumer<?> sub = consumerIterator.next();
// we don't make browsers send the same stuff twice
if (sub.seesRequeues())
@@ -1488,40 +1329,11 @@ public abstract class AbstractQueue<X ex
updateSubRequeueEntry(sub, entry);
}
}
- notifyPullOnlyConsumers();
- deliverAsync();
-
- }
-
- public boolean resend(final QueueEntry entry, final QueueConsumer<?> consumer)
- {
- /* TODO : This is wrong as the consumer may be suspended, we should instead change the state of the message
- entry to resend and move back the consumer pointer. */
-
- consumer.getSendLock();
- try
- {
- if (!consumer.isClosed())
- {
- deliverMessage(consumer, entry, false, false);
- return true;
- }
- else
- {
- return false;
- }
- }
- finally
- {
- consumer.releaseSendLock();
- }
}
-
-
public int getConsumerCount()
{
- return _consumerList.size();
+ return _queueConsumerManager.getAllSize();
}
public int getConsumerCountWithCredit()
@@ -1620,31 +1432,6 @@ public abstract class AbstractQueue<X ex
}
- public void stateChanged(QueueConsumer<?> sub, State oldState, State newState)
- {
- if (oldState == State.ACTIVE && newState != State.ACTIVE)
- {
- _activeSubscriberCount.decrementAndGet();
-
- }
- else if (newState == State.ACTIVE)
- {
- if (oldState != State.ACTIVE)
- {
- _activeSubscriberCount.incrementAndGet();
- if(sub.isPullOnly())
- {
- sub.getSessionModel().getAMQPConnection().notifyWork();
- }
-
- }
- if(!sub.isPullOnly())
- {
- deliverAsync();
- }
- }
- }
-
public int compareTo(final X o)
{
return getName().compareTo(o.getName());
@@ -1660,11 +1447,6 @@ public abstract class AbstractQueue<X ex
_exclusiveSubscriber = exclusiveSubscriber;
}
- long getStateChangeCount()
- {
- return _stateChangeCount.get();
- }
-
/** Used to track bindings to exchanges so that on deletion they can easily be cancelled. */
abstract QueueEntryList getEntries();
@@ -1673,9 +1455,9 @@ public abstract class AbstractQueue<X ex
return _queueStatistics;
}
- protected QueueConsumerList getConsumerList()
+ protected final QueueConsumerManagerImpl getQueueConsumerManager()
{
- return _consumerList;
+ return _queueConsumerManager;
}
public EventLogger getEventLogger()
@@ -1898,14 +1680,16 @@ public abstract class AbstractQueue<X ex
{
try
{
- final ConsumerNodeIterator consumerNodeIterator = _consumerList.iterator();
- while (consumerNodeIterator.advance())
+ Iterator<QueueConsumer<?>> consumerIterator = _queueConsumerManager.getAllIterator();
+
+ while (consumerIterator.hasNext())
{
- final QueueConsumer s = consumerNodeIterator.getNode().getConsumer();
- if (s != null)
+ QueueConsumer<?> consumer = consumerIterator.next();
+
+ if (consumer != null)
{
- s.queueDeleted();
+ consumer.queueDeleted();
}
}
@@ -1971,11 +1755,12 @@ public abstract class AbstractQueue<X ex
}
@Override
- protected void onClose()
+ protected ListenableFuture<Void> onClose()
{
- super.onClose();
_stopped.set(true);
_closing = false;
+ _queueHouseKeepingTask.cancel();
+ return Futures.immediateFuture(null);
}
@Override
@@ -2038,110 +1823,122 @@ public abstract class AbstractQueue<X ex
}
}
- public void deliverAsync()
+ void notifyConsumers(QueueEntry entry)
{
- _stateChangeCount.incrementAndGet();
-
- _queueRunner.execute();
- }
+ Iterator<QueueConsumer<?>> nonAcquiringIterator = _queueConsumerManager.getNonAcquiringIterator();
+ while (nonAcquiringIterator.hasNext())
+ {
+ QueueConsumer<?> consumer = nonAcquiringIterator.next();
+ if(consumer.hasInterest(entry))
+ {
+ notifyConsumer(consumer);
+ }
+ }
- void notifyPullOnlyConsumers()
- {
- if(_hasPullOnlyConsumers)
+ final Iterator<QueueConsumer<?>> interestedIterator = _queueConsumerManager.getInterestedIterator();
+ while (entry.isAvailable() && interestedIterator.hasNext())
{
- ConsumerNode consumerNode = _consumerList.getHead().findNext();
- while (consumerNode != null)
+ QueueConsumer<?> consumer = interestedIterator.next();
+ if(consumer.hasInterest(entry))
{
- QueueConsumer<?> consumer = consumerNode.getConsumer();
- if (consumer.isActive() && consumer.isPullOnly() && getNextAvailableEntry(consumer) != null)
+ if(notifyConsumer(consumer))
+ {
+ break;
+ }
+ else if(!noHigherPriorityWithCredit(consumer, entry))
{
- consumer.getSessionModel().getAMQPConnection().notifyWork();
+ // there exists a higher priority consumer that would take this message, therefore no point in
+ // continuing to loop
+ break;
}
- consumerNode = consumerNode.findNext();
}
}
}
- void flushConsumer(QueueConsumer<?> sub)
+ void notifyOtherConsumers(final QueueConsumer<?> excludedConsumer)
{
+ final Iterator<QueueConsumer<?>> interestedIterator = _queueConsumerManager.getInterestedIterator();
+ while (hasAvailableMessages() && interestedIterator.hasNext())
+ {
+ QueueConsumer<?> consumer = interestedIterator.next();
- flushConsumer(sub, Long.MAX_VALUE);
+ if (excludedConsumer != consumer)
+ {
+ if (notifyConsumer(consumer))
+ {
+ break;
+ }
+ }
+ }
}
- boolean flushConsumer(QueueConsumer<?> sub, long iterations)
+
+ MessageContainer deliverSingleMessage(QueueConsumer<?> consumer)
{
- boolean atTail = false;
- final boolean keepSendLockHeld = iterations <= getMaxAsyncDeliveries();
boolean queueEmpty = false;
- boolean deliveryAttempted = false;
+ MessageContainer messageContainer = null;
+
+ _queueConsumerManager.setNotified(consumer, false);
try
{
- if(keepSendLockHeld)
+ if (!consumer.isSuspended())
{
- sub.getSendLock();
- }
- while (!sub.isSuspended() && !atTail && iterations != 0)
- {
- try
+ messageContainer = attemptDelivery(consumer);
+ if(messageContainer != null)
{
- if(!keepSendLockHeld)
- {
- sub.getSendLock();
- }
+ _queueConsumerManager.setNotified(consumer, true);
+ }
- atTail = attemptDelivery(sub, true);
- deliveryAttempted = true;
- if (atTail && getNextAvailableEntry(sub) == null)
- {
- queueEmpty = true;
- }
- else if (!atTail)
- {
- iterations--;
- }
+ if (messageContainer == null && getNextAvailableEntry(consumer) == null)
+ {
+ queueEmpty = true;
}
- finally
+
+ if(messageContainer == null && consumer.acquires())
{
- if(!keepSendLockHeld)
+ if(hasAvailableMessages())
{
- sub.releaseSendLock();
+ notifyOtherConsumers(consumer);
}
}
}
-
- if (!deliveryAttempted )
+ else
{
// avoid referring old deleted queue entry in sub._queueContext._lastSeen
- getNextAvailableEntry(sub);
+ getNextAvailableEntry(consumer);
}
}
finally
{
- if(keepSendLockHeld)
- {
- sub.releaseSendLock();
- }
if(queueEmpty)
{
- sub.queueEmpty();
+ consumer.queueEmpty();
}
- sub.flushBatched();
-
+ consumer.flushBatched();
}
+ return messageContainer;
+ }
+
+ private boolean hasAvailableMessages()
+ {
+ return _queueStatistics.getAvailableCount() != 0;
+ }
- // if there's (potentially) more than one consumer the others will potentially not have been advanced to the
- // next entry they are interested in yet. This would lead to holding on to references to expired messages, etc
- // which would give us memory "leak".
+ public static class MessageContainer
+ {
+ public final MessageInstance _messageInstance;
+ public final MessageReference<?> _messageReference;
- if (!hasExclusiveConsumer())
+ public MessageContainer(final MessageInstance messageInstance,
+ final MessageReference<?> messageReference)
{
- advanceAllConsumers();
+ _messageInstance = messageInstance;
+ _messageReference = messageReference;
}
- return atTail;
}
/**
@@ -2151,18 +1948,16 @@ public abstract class AbstractQueue<X ex
*
*
* @param sub the consumer
- * @param batch true if processing can be batched
* @return true if we have completed all possible deliveries for this sub.
*/
- private boolean attemptDelivery(QueueConsumer<?> sub, boolean batch)
+ private MessageContainer attemptDelivery(QueueConsumer<?> sub)
{
- boolean atTail = false;
-
+ MessageContainer messageContainer = null;
// avoid referring old deleted queue entry in sub._queueContext._lastSeen
QueueEntry node = getNextAvailableEntry(sub);
boolean subActive = sub.isActive() && !sub.isSuspended();
- if (subActive && (sub.getPriority() == Integer.MAX_VALUE || noHigherPriorityWithCredit(sub)))
+ if (node != null && subActive && (sub.getPriority() == Integer.MAX_VALUE || noHigherPriorityWithCredit(sub, node)))
{
if (_virtualHost.getState() != State.ACTIVE)
@@ -2171,11 +1966,11 @@ public abstract class AbstractQueue<X ex
"virtualhost state " + _virtualHost.getState());
}
- if (node != null && node.isAvailable())
+ if (node.isAvailable())
{
if (sub.hasInterest(node) && mightAssign(sub, node))
{
- if (!sub.wouldSuspend(node))
+ if (sub.allocateCredit(node))
{
MessageReference messageReference = null;
try
@@ -2190,7 +1985,8 @@ public abstract class AbstractQueue<X ex
}
else
{
- deliverMessage(sub, node, batch, true);
+ setLastSeenEntry(sub, node);
+ messageContainer = new MessageContainer(node, messageReference);
}
}
finally
@@ -2201,59 +1997,43 @@ public abstract class AbstractQueue<X ex
}
}
}
- else // Not enough Credit for message and wouldSuspend
+ else
{
- //QPID-1187 - Treat the consumer as suspended for this message
- // and wait for the message to be removed to continue delivery.
- subActive = false;
sub.awaitCredit(node);
-
}
}
-
}
- atTail = (node == null) || (getNextAvailableEntry(sub) == null);
}
- return atTail || !subActive;
+ return messageContainer;
}
- private boolean noHigherPriorityWithCredit(final QueueConsumer<?> sub)
+ boolean noHigherPriorityWithCredit(final QueueConsumer<?> sub, final QueueEntry queueEntry)
{
- ConsumerNodeIterator iterator = _consumerList.iterator();
- while(iterator.advance())
+ Iterator<QueueConsumer<?>> consumerIterator = _queueConsumerManager.getAllIterator();
+
+ while (consumerIterator.hasNext())
{
- final ConsumerNode node = iterator.getNode();
- QueueConsumer consumer = node.getConsumer();
+ QueueConsumer<?> consumer = consumerIterator.next();
if(consumer.getPriority() > sub.getPriority())
{
- if(getNextAvailableEntry(consumer) != null && consumer.hasCredit())
+ if(consumer.isNotifyWorkDesired()
+ && consumer.acquires()
+ && consumer.hasInterest(queueEntry)
+ && getNextAvailableEntry(consumer) != null)
{
return false;
}
}
- }
- return true;
- }
-
- protected void advanceAllConsumers()
- {
- ConsumerNodeIterator consumerNodeIterator = _consumerList.iterator();
- while (consumerNodeIterator.advance())
- {
- ConsumerNode subNode = consumerNodeIterator.getNode();
- QueueConsumer sub = subNode.getConsumer();
- if(sub.acquires())
- {
- getNextAvailableEntry(sub);
- }
else
{
- // TODO
+ break;
}
}
+ return true;
}
- private QueueEntry getNextAvailableEntry(final QueueConsumer sub)
+
+ QueueEntry getNextAvailableEntry(final QueueConsumer sub)
{
QueueContext context = sub.getQueueContext();
if(context != null)
@@ -2310,156 +2090,6 @@ public abstract class AbstractQueue<X ex
}
- boolean hasAvailableMessages(final QueueConsumer queueConsumer)
- {
- return getNextAvailableEntry(queueConsumer) != null;
- }
-
- /**
- * Used by queue Runners to asynchronously deliver messages to consumers.
- *
- * A queue Runner is started whenever a state change occurs, e.g when a new
- * message arrives on the queue and cannot be immediately delivered to a
- * consumer (i.e. asynchronous delivery is required).
- *
- * processQueue should be running while there are messages on the queue AND
- * there are consumers that can deliver them. If there are no
- * consumers capable of delivering the remaining messages on the queue
- * then processQueue should stop to prevent spinning.
- *
- * Since processQueue is runs in a fixed size Executor, it should not run
- * indefinitely to prevent starving other tasks of CPU (e.g jobs to process
- * incoming messages may not be able to be scheduled in the thread pool
- * because all threads are working on clearing down large queues). To solve
- * this problem, after an arbitrary number of message deliveries the
- * processQueue job stops iterating, resubmits itself to the executor, and
- * ends the current instance
- *
- * @param runner the Runner to schedule
- */
- public long processQueue(QueueRunner runner)
- {
- long stateChangeCount;
- long previousStateChangeCount = Long.MIN_VALUE;
- long rVal = Long.MIN_VALUE;
- boolean deliveryIncomplete = true;
-
- boolean lastLoop = false;
- int iterations = getMaxAsyncDeliveries();
-
- final int numSubs = _consumerList.size();
-
- final int perSub = Math.max(iterations / Math.max(numSubs,1), 1);
-
- // For every message enqueue/requeue the we fire deliveryAsync() which
- // increases _stateChangeCount. If _sCC changes whilst we are in our loop
- // (detected by setting previousStateChangeCount to stateChangeCount in the loop body)
- // then we will continue to run for a maximum of iterations.
- // So whilst delivery/rejection is going on a processQueue thread will be running
- while (iterations != 0 && ((previousStateChangeCount != (stateChangeCount = _stateChangeCount.get())) || deliveryIncomplete))
- {
- // we want to have one extra loop after every consumer has reached the point where it cannot move
- // further, just in case the advance of one consumer in the last loop allows a different consumer to
- // move forward in the next iteration
-
- if (previousStateChangeCount != stateChangeCount)
- {
- //further asynchronous delivery is required since the
- //previous loop. keep going if iteration slicing allows.
- lastLoop = false;
- rVal = stateChangeCount;
- }
-
- previousStateChangeCount = stateChangeCount;
- boolean allConsumersDone = true;
- boolean consumerDone;
-
- ConsumerNodeIterator consumerNodeIterator = _consumerList.iterator();
- //iterate over the subscribers and try to advance their pointer
- while (consumerNodeIterator.advance())
- {
-
- QueueConsumer<?> sub = consumerNodeIterator.getNode().getConsumer();
-
- if(!sub.isPullOnly())
- {
- sub.getSendLock();
-
- try
- {
- for (int i = 0; i < perSub; i++)
- {
- //attempt delivery. returns true if no further delivery currently possible to this sub
- consumerDone = attemptDelivery(sub, true);
- if (consumerDone)
- {
- sub.flushBatched();
- boolean noMore = getNextAvailableEntry(sub) == null;
- if (lastLoop && noMore)
- {
- sub.queueEmpty();
- }
- break;
- }
- else
- {
- //this consumer can accept additional deliveries, so we must
- //keep going after this (if iteration slicing allows it)
- allConsumersDone = false;
- lastLoop = false;
- if (--iterations == 0)
- {
- sub.flushBatched();
- break;
- }
- }
- }
-
- sub.flushBatched();
- }
- finally
- {
- sub.releaseSendLock();
- }
- }
- }
-
- if(allConsumersDone && lastLoop)
- {
- //We have done an extra loop already and there are again
- //again no further delivery attempts possible, only
- //keep going if state change demands it.
- deliveryIncomplete = false;
- }
- else if(allConsumersDone)
- {
- //All consumers reported being done, but we have to do
- //an extra loop if the iterations are not exhausted and
- //there is still any work to be done
- deliveryIncomplete = _consumerList.size() != 0;
- lastLoop = true;
- }
- else
- {
- //some consumers can still accept more messages,
- //keep going if iteration count allows.
- lastLoop = false;
- deliveryIncomplete = true;
- }
-
- }
-
- // If iterations == 0 then the limiting factor was the time-slicing rather than available messages or credit
- // therefore we should schedule this runner again (unless someone beats us to it :-) ).
- if (iterations == 0)
- {
- _logger.debug("Rescheduling runner: {}", runner);
- return 0L;
- }
- return rVal;
-
- }
-
public void checkMessageStatus()
{
QueueEntryIterator queueListIterator = getEntries().iterator();
@@ -3067,7 +2697,12 @@ public abstract class AbstractQueue<X ex
switch (getConsumerCount())
{
case 1:
- _exclusiveSubscriber = getConsumerList().getHead().getConsumer();
+ Iterator<QueueConsumer<?>> consumerIterator = _queueConsumerManager.getAllIterator();
+
+ if (consumerIterator.hasNext())
+ {
+ _exclusiveSubscriber = consumerIterator.next();
+ }
// deliberate fall through
case 0:
_exclusiveOwner = null;
@@ -3088,8 +2723,11 @@ public abstract class AbstractQueue<X ex
case CONTAINER:
case CONNECTION:
AMQSessionModel session = null;
- for(ConsumerImpl c : getConsumers())
+ Iterator<QueueConsumer<?>> queueConsumerIterator = _queueConsumerManager.getAllIterator();
+ while(queueConsumerIterator.hasNext())
{
+ QueueConsumer<?> c = queueConsumerIterator.next();
+
if(session == null)
{
session = c.getSessionModel();
@@ -3114,8 +2752,10 @@ public abstract class AbstractQueue<X ex
case CONTAINER:
case PRINCIPAL:
AMQPConnection con = null;
- for(ConsumerImpl c : getConsumers())
+ Iterator<QueueConsumer<?>> queueConsumerIterator = _queueConsumerManager.getAllIterator();
+ while(queueConsumerIterator.hasNext())
{
+ QueueConsumer<?> c = queueConsumerIterator.next();
if(con == null)
{
con = c.getSessionModel().getAMQPConnection();
@@ -3142,8 +2782,10 @@ public abstract class AbstractQueue<X ex
case NONE:
case PRINCIPAL:
String containerID = null;
- for(ConsumerImpl c : getConsumers())
+ Iterator<QueueConsumer<?>> queueConsumerIterator = _queueConsumerManager.getAllIterator();
+ while(queueConsumerIterator.hasNext())
{
+ QueueConsumer<?> c = queueConsumerIterator.next();
if(containerID == null)
{
containerID = c.getSessionModel().getAMQPConnection().getRemoteContainerName();
@@ -3173,8 +2815,10 @@ public abstract class AbstractQueue<X ex
case NONE:
case CONTAINER:
Principal principal = null;
- for(ConsumerImpl c : getConsumers())
+ Iterator<QueueConsumer<?>> queueConsumerIterator = _queueConsumerManager.getAllIterator();
+ while(queueConsumerIterator.hasNext())
{
+ QueueConsumer<?> c = queueConsumerIterator.next();
if(principal == null)
{
principal = c.getSessionModel().getAMQPConnection().getAuthorizedPrincipal();
@@ -3232,6 +2876,7 @@ public abstract class AbstractQueue<X ex
@StateTransition(currentState = {State.UNINITIALIZED,State.ERRORED}, desiredState = State.ACTIVE)
private ListenableFuture<Void> activate()
{
+ _virtualHost.scheduleHouseKeepingTask(_virtualHost.getHousekeepingCheckPeriod(), _queueHouseKeepingTask);
setState(State.ACTIVE);
return Futures.immediateFuture(null);
}
@@ -3311,7 +2956,9 @@ public abstract class AbstractQueue<X ex
}
else if(clazz == org.apache.qpid.server.model.Consumer.class)
{
- return (Collection<C>) getConsumers();
+ return _queueConsumerManager == null
+ ? Collections.<C>emptySet()
+ : (Collection<C>) Lists.newArrayList(_queueConsumerManager.getAllIterator());
}
else return Collections.emptySet();
}
@@ -3368,13 +3015,6 @@ public abstract class AbstractQueue<X ex
}
- int getMaxAsyncDeliveries()
- {
- return _maxAsyncDeliveries;
- }
-
-
-
private static final String[] NON_NEGATIVE_NUMBERS = {
ALERT_REPEAT_GAP,
ALERT_THRESHOLD_MESSAGE_AGE,
@@ -3763,4 +3403,33 @@ public abstract class AbstractQueue<X ex
}
}
}
+
+ private class AdvanceConsumersTask extends HouseKeepingTask
+ {
+
+ AdvanceConsumersTask()
+ {
+ super("Queue Housekeeping: " + AbstractQueue.this.getName(),
+ _virtualHost, getSystemTaskControllerContext("Queue Housekeeping", _virtualHost.getPrincipal()));
+ }
+
+ @Override
+ public void execute()
+ {
+ // if there's (potentially) more than one consumer the others will potentially not have been advanced to the
+ // next entry they are interested in yet. This would lead to holding on to references to expired messages, etc
+ // which would give us memory "leak".
+
+ Iterator<QueueConsumer<?>> consumerIterator = _queueConsumerManager.getAllIterator();
+
+ while (consumerIterator.hasNext() && !isDeleted())
+ {
+ QueueConsumer<?> sub = consumerIterator.next();
+ if(sub.acquires())
+ {
+ getNextAvailableEntry(sub);
+ }
+ }
+ }
+ }
}
Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/OrderedQueueEntryList.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/OrderedQueueEntryList.java?rev=1769837&r1=1769836&r2=1769837&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/OrderedQueueEntryList.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/OrderedQueueEntryList.java Tue Nov 15 14:16:10 2016
@@ -20,6 +20,8 @@
*/
package org.apache.qpid.server.queue;
+import static org.apache.qpid.server.model.Queue.QUEUE_SCAVANGE_COUNT;
+
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
@@ -47,7 +49,7 @@ public abstract class OrderedQueueEntryL
_nextUpdater = OrderedQueueEntry._nextUpdater;
private AtomicLong _scavenges = new AtomicLong(0L);
- private final long _scavengeCount = Integer.getInteger("qpid.queue.scavenge_count", 50);
+ private final long _scavengeCount;
private final AtomicReference<QueueEntry> _unscavengedHWM = new AtomicReference<QueueEntry>();
@@ -57,6 +59,7 @@ public abstract class OrderedQueueEntryL
{
super(queue, queueStatistics);
_queue = queue;
+ _scavengeCount = _queue.getContextValue(Integer.class, QUEUE_SCAVANGE_COUNT);
_head = headCreator.createHead(this);
_tail = _head;
}
Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/OutOfOrderQueue.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/OutOfOrderQueue.java?rev=1769837&r1=1769836&r2=1769837&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/OutOfOrderQueue.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/OutOfOrderQueue.java Tue Nov 15 14:16:10 2016
@@ -20,6 +20,7 @@
*/
package org.apache.qpid.server.queue;
+import java.util.Iterator;
import java.util.Map;
import org.apache.qpid.server.virtualhost.QueueManagingVirtualHost;
@@ -36,10 +37,12 @@ public abstract class OutOfOrderQueue<X
protected void checkConsumersNotAheadOfDelivery(final QueueEntry entry)
{
// check that all consumers are not in advance of the entry
- ConsumerNodeIterator subIter = getConsumerList().iterator();
- while(subIter.advance() && !entry.isAcquired())
+ Iterator<QueueConsumer<?>> consumerIterator = getQueueConsumerManager().getAllIterator();
+
+ while (consumerIterator.hasNext() && !entry.isAcquired())
{
- final QueueConsumer<?> consumer = subIter.getNode().getConsumer();
+ QueueConsumer<?> consumer = consumerIterator.next();
+
if(!consumer.isClosed())
{
QueueContext context = consumer.getQueueContext();
Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumer.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumer.java?rev=1769837&r1=1769836&r2=1769837&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumer.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumer.java Tue Nov 15 14:16:10 2016
@@ -32,27 +32,27 @@ public interface QueueConsumer<X extends
boolean hasInterest(QueueEntry node);
- boolean wouldSuspend(QueueEntry entry);
+ boolean allocateCredit(QueueEntry entry);
void restoreCredit(QueueEntry entry);
- void send(QueueEntry entry, boolean batch);
-
void acquisitionRemoved(QueueEntry node);
+ QueueConsumerNode getQueueConsumerNode();
+
void queueDeleted();
Queue<?> getQueue();
- boolean resend(QueueEntry e);
-
MessageInstance.StealableConsumerAcquiredState<X> getOwningState();
QueueContext getQueueContext();
void awaitCredit(QueueEntry entry);
- boolean hasCredit();
+ boolean isNotifyWorkDesired();
+
+ void notifyWork();
- boolean isPullOnly();
+ void setQueueConsumerNode(QueueConsumerNode node);
}
Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java?rev=1769837&r1=1769836&r2=1769837&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java Tue Nov 15 14:16:10 2016
@@ -23,7 +23,6 @@ package org.apache.qpid.server.queue;
import static org.apache.qpid.server.logging.subjects.LogSubjectFormat.SUBSCRIPTION_FORMAT;
import java.text.MessageFormat;
-import java.util.EnumMap;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.Iterator;
@@ -33,6 +32,8 @@ import java.util.concurrent.atomic.Atomi
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -51,15 +52,12 @@ import org.apache.qpid.server.message.Me
import org.apache.qpid.server.message.MessageSource;
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.model.AbstractConfiguredObject;
-import org.apache.qpid.server.model.Consumer;
import org.apache.qpid.server.model.LifetimePolicy;
import org.apache.qpid.server.model.ManagedAttributeField;
import org.apache.qpid.server.model.Queue;
-import org.apache.qpid.server.model.State;
import org.apache.qpid.server.protocol.AMQSessionModel;
import org.apache.qpid.server.protocol.MessageConverterRegistry;
import org.apache.qpid.server.security.access.Operation;
-import org.apache.qpid.server.transport.AMQPConnection;
import org.apache.qpid.server.util.StateChangeListener;
class QueueConsumerImpl
@@ -67,7 +65,6 @@ class QueueConsumerImpl
implements QueueConsumer<QueueConsumerImpl>, LogSubject
{
private final static Logger LOGGER = LoggerFactory.getLogger(QueueConsumerImpl.class);
- private final AtomicBoolean _targetClosed = new AtomicBoolean(false);
private final AtomicBoolean _closed = new AtomicBoolean(false);
private final long _consumerNumber;
private final long _createTime = System.currentTimeMillis();
@@ -84,28 +81,9 @@ class QueueConsumerImpl
private final Object _sessionReference;
private final AbstractQueue _queue;
- private final SuspendedConsumerLoggingTicker _suspendedConsumerLoggingTicker;
-
- static final EnumMap<ConsumerTarget.State, State> STATE_MAP =
- new EnumMap<ConsumerTarget.State, State>(ConsumerTarget.State.class);
-
- static
- {
- STATE_MAP.put(ConsumerTarget.State.ACTIVE, State.ACTIVE);
- STATE_MAP.put(ConsumerTarget.State.SUSPENDED, State.QUIESCED);
- STATE_MAP.put(ConsumerTarget.State.CLOSED, State.DELETED);
- }
-
private final ConsumerTarget _target;
- private final StateChangeListener<ConsumerTarget, ConsumerTarget.State> _listener;
private volatile QueueContext _queueContext;
- private volatile StateChangeListener<? super QueueConsumerImpl, State> _stateListener = new StateChangeListener<QueueConsumerImpl, State>()
- {
- public void stateChanged(QueueConsumerImpl sub, State oldState, State newState)
- {
- // no-op
- }
- };
+
@ManagedAttributeField
private boolean _exclusive;
@ManagedAttributeField
@@ -119,6 +97,8 @@ class QueueConsumerImpl
@ManagedAttributeField
private int _priority;
+ private QueueConsumerNode _queueConsumerNode;
+
QueueConsumerImpl(final AbstractQueue<?> queue,
ConsumerTarget target,
final String consumerName,
@@ -145,29 +125,6 @@ class QueueConsumerImpl
open();
setupLogging();
-
- _listener = new StateChangeListener<ConsumerTarget, ConsumerTarget.State>()
- {
- @Override
- public void stateChanged(final ConsumerTarget object,
- final ConsumerTarget.State oldState,
- final ConsumerTarget.State newState)
- {
- targetStateChanged(oldState, newState);
- }
- };
- _target.addStateListener(_listener);
-
- _suspendedConsumerLoggingTicker = target.isMultiQueue()
- ? null
- : new SuspendedConsumerLoggingTicker(queue.getContextValue(Long.class, Consumer.SUSPEND_NOTIFICATION_PERIOD))
- {
- @Override
- protected void log(final long period)
- {
- getEventLogger().message(getLogSubject(), SubscriptionMessages.STATE(period));
- }
- };
}
private static Map<String, Object> createAttributeMap(String name,
@@ -204,43 +161,6 @@ class QueueConsumerImpl
return attributes;
}
- private void targetStateChanged(final ConsumerTarget.State oldState, final ConsumerTarget.State newState)
- {
- if(oldState != newState)
- {
- if(newState == ConsumerTarget.State.CLOSED)
- {
- if(_targetClosed.compareAndSet(false,true))
- {
- getEventLogger().message(getLogSubject(), SubscriptionMessages.CLOSE());
- }
- }
-
- if(_suspendedConsumerLoggingTicker != null)
- {
- if (newState == ConsumerTarget.State.SUSPENDED)
- {
- _suspendedConsumerLoggingTicker.setStartTime(System.currentTimeMillis());
- getSessionModel().addTicker(_suspendedConsumerLoggingTicker);
- }
- else
- {
- getSessionModel().removeTicker(_suspendedConsumerLoggingTicker);
- }
- }
- }
-
- if(newState == ConsumerTarget.State.CLOSED && oldState != newState && !_closed.get())
- {
- closeAsync();
- }
- final StateChangeListener<? super QueueConsumerImpl, State> stateListener = getStateListener();
- if(stateListener != null)
- {
- stateListener.stateChanged(this, STATE_MAP.get(oldState), STATE_MAP.get(newState));
- }
- }
-
@Override
public ConsumerTarget getTarget()
{
@@ -254,28 +174,15 @@ class QueueConsumerImpl
}
@Override
- public boolean hasCredit()
+ public boolean isNotifyWorkDesired()
{
- return _target.hasCredit();
+ return _target.isNotifyWorkDesired();
}
@Override
public void externalStateChange()
{
- if(isPullOnly())
- {
- getSessionModel().getAMQPConnection().notifyWork();
- }
- else
- {
- _queue.deliverAsync();
- }
- }
-
- @Override
- public boolean hasAvailableMessages()
- {
- return !_queue.isEmpty() && _queue.hasAvailableMessages(this);
+ _target.notifyWork();
}
@Override
@@ -309,28 +216,29 @@ class QueueConsumerImpl
}
@Override
- protected void onClose()
+ protected ListenableFuture<Void> onClose()
{
if(_closed.compareAndSet(false,true))
{
- _target.getSendLock();
- try
- {
- _waitingOnCreditMessageListener.remove();
- _target.consumerRemoved(this);
- _target.removeStateChangeListener(_listener);
- _queue.unregisterConsumer(this);
- if(_suspendedConsumerLoggingTicker != null)
- {
- getSessionModel().removeTicker(_suspendedConsumerLoggingTicker);
- }
- deleted();
- }
- finally
- {
- _target.releaseSendLock();
- }
+ getEventLogger().message(getLogSubject(), SubscriptionMessages.CLOSE());
+
+ _waitingOnCreditMessageListener.remove();
+ return doAfter(_target.consumerRemoved(this),
+ new Runnable()
+ {
+ @Override
+ public void run()
+ {
+ _queue.unregisterConsumer(QueueConsumerImpl.this);
+
+ deleted();
+ }
+ });
+ }
+ else
+ {
+ return Futures.immediateFuture(null);
}
}
@@ -346,19 +254,31 @@ class QueueConsumerImpl
}
@Override
- public boolean isPullOnly()
+ public void notifyWork()
+ {
+ _target.notifyWork();
+ }
+
+ @Override
+ public void setQueueConsumerNode(final QueueConsumerNode node)
+ {
+ _queueConsumerNode = node;
+ }
+
+ @Override
+ public QueueConsumerNode getQueueConsumerNode()
{
- return _target.isPullOnly();
+ return _queueConsumerNode;
}
public void queueDeleted()
{
- _target.queueDeleted();
+ _target.consumerRemoved(this);
}
- public boolean wouldSuspend(final QueueEntry msg)
+ public boolean allocateCredit(final QueueEntry msg)
{
- return !_target.allocateCredit(msg.getMessage());
+ return _target.allocateCredit(msg.getMessage());
}
public void restoreCredit(final QueueEntry queueEntry)
@@ -372,12 +292,6 @@ class QueueConsumerImpl
}
@Override
- public State getState()
- {
- return STATE_MAP.get(_target.getState());
- }
-
- @Override
public final Queue<?> getQueue()
{
return _queue;
@@ -397,46 +311,21 @@ class QueueConsumerImpl
}
@Override
- public final void flush()
+ public AbstractQueue.MessageContainer pullMessage()
{
- AMQPConnection<?> connection = _target.getSessionModel().getAMQPConnection();
- try
+ AbstractQueue.MessageContainer messageContainer = _queue.deliverSingleMessage(this);
+ if (messageContainer != null)
{
- connection.alwaysAllowMessageAssignmentInThisThreadIfItIsIOThread(true);
- _queue.flushConsumer(this);
- _target.processPending();
+ _deliveredCount.incrementAndGet();
+ _deliveredBytes.addAndGet(messageContainer._messageInstance.getMessage().getSize());
}
- finally
- {
- connection.alwaysAllowMessageAssignmentInThisThreadIfItIsIOThread(false);
- }
-
+ return messageContainer;
}
@Override
- public void pullMessage()
+ public void setNotifyWorkDesired(final boolean desired)
{
- AMQPConnection<?> connection = _target.getSessionModel().getAMQPConnection();
- try
- {
- connection.alwaysAllowMessageAssignmentInThisThreadIfItIsIOThread(true);
- _queue.flushConsumer(this, 1);
- }
- finally
- {
- connection.alwaysAllowMessageAssignmentInThisThreadIfItIsIOThread(false);
- }
-
- }
-
- public boolean resend(final QueueEntry entry)
- {
- boolean messageWasResent = getQueue().resend(entry, this);
- if (messageWasResent)
- {
- _target.processPending();
- }
- return messageWasResent;
+ _queue.setNotifyWorkDesired(this, desired);
}
public final long getConsumerNumber()
@@ -444,16 +333,6 @@ class QueueConsumerImpl
return _consumerNumber;
}
- public final StateChangeListener<? super QueueConsumerImpl, State> getStateListener()
- {
- return _stateListener;
- }
-
- public final void setStateListener(StateChangeListener<? super QueueConsumerImpl, State> listener)
- {
- _stateListener = listener;
- }
-
public final QueueContext getQueueContext()
{
return _queueContext;
@@ -466,12 +345,12 @@ class QueueConsumerImpl
public final boolean isActive()
{
- return getState() == State.ACTIVE;
+ return _target.getState() == ConsumerTarget.State.OPEN;
}
public final boolean isClosed()
{
- return getState() == State.DELETED;
+ return _target.getState() == ConsumerTarget.State.CLOSED;
}
public final boolean hasInterest(QueueEntry entry)
@@ -563,21 +442,6 @@ class QueueConsumerImpl
return filterLogString.toString();
}
- public final boolean trySendLock()
- {
- return getTarget().trySendLock();
- }
-
- public final void getSendLock()
- {
- getTarget().getSendLock();
- }
-
- public final void releaseSendLock()
- {
- getTarget().releaseSendLock();
- }
-
public final long getCreateTime()
{
return _createTime;
@@ -613,13 +477,6 @@ class QueueConsumerImpl
return _deliveredCount.longValue();
}
- public final void send(final QueueEntry entry, final boolean batch)
- {
- _deliveredCount.incrementAndGet();
- long size = _target.send(this, entry, batch);
- _deliveredBytes.addAndGet(size);
- }
-
@Override
public void acquisitionRemoved(final QueueEntry node)
{
@@ -704,14 +561,7 @@ class QueueConsumerImpl
entry.addStateChangeListener(this);
if(!entry.isAvailable())
{
- if(isPullOnly())
- {
- getSessionModel().getAMQPConnection().notifyWork();
- }
- else
- {
- _queue.deliverAsync();
- }
+ _target.notifyWork();
remove();
}
}
@@ -731,14 +581,7 @@ class QueueConsumerImpl
{
entry.removeStateChangeListener(this);
_entry.compareAndSet(entry, null);
- if(isPullOnly())
- {
- getSessionModel().getAMQPConnection().notifyWork();
- }
- else
- {
- _queue.deliverAsync();
- }
+ _target.notifyWork();
}
}
Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java?rev=1769837&r1=1769836&r2=1769837&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java Tue Nov 15 14:16:10 2016
@@ -709,17 +709,6 @@ public abstract class QueueEntryImpl imp
}
@Override
- public boolean resend()
- {
- QueueConsumer sub = getDeliveredConsumer();
- if(sub != null)
- {
- return sub.resend(this);
- }
- return false;
- }
-
- @Override
public TransactionLogResource getOwningResource()
{
return getQueue();
Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/security/AbstractKeyStore.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/security/AbstractKeyStore.java?rev=1769837&r1=1769836&r2=1769837&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/security/AbstractKeyStore.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/security/AbstractKeyStore.java Tue Nov 15 14:16:10 2016
@@ -79,14 +79,14 @@ public abstract class AbstractKeyStore<X
}
@Override
- protected void onClose()
+ protected ListenableFuture<Void> onClose()
{
- super.onClose();
if(_checkExpiryTaskFuture != null)
{
_checkExpiryTaskFuture.cancel(false);
_checkExpiryTaskFuture = null;
}
+ return Futures.immediateFuture(null);
}
protected void initializeExpiryChecking()
Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/security/TrustStoreMessageSource.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/security/TrustStoreMessageSource.java?rev=1769837&r1=1769836&r2=1769837&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/security/TrustStoreMessageSource.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/security/TrustStoreMessageSource.java Tue Nov 15 14:16:10 2016
@@ -97,7 +97,7 @@ public class TrustStoreMessageSource ext
final String consumerName,
final EnumSet<ConsumerImpl.Option> options, final Integer priority)
throws ExistingExclusiveConsumer, ExistingConsumerPreventsExclusive,
- ConsumerAccessRefused
+ ConsumerAccessRefused, QueueDeleted
{
final Consumer consumer = super.addConsumer(target, filters, messageClass, consumerName, options, priority);
consumer.send(createMessage());
Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/AMQPConnection.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/AMQPConnection.java?rev=1769837&r1=1769836&r2=1769837&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/AMQPConnection.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/AMQPConnection.java Tue Nov 15 14:16:10 2016
@@ -27,6 +27,8 @@ import java.util.Collection;
import javax.security.auth.Subject;
+import com.google.common.util.concurrent.ListenableFuture;
+
import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.server.logging.EventLoggerProvider;
import org.apache.qpid.server.model.Connection;
@@ -35,9 +37,6 @@ import org.apache.qpid.server.util.Delet
public interface AMQPConnection<C extends AMQPConnection<C>> extends Connection<C>, Deletable<C>, EventLoggerProvider
{
- boolean isMessageAssignmentSuspended();
-
- void alwaysAllowMessageAssignmentInThisThreadIfItIsIOThread(boolean override);
AccessControlContext getAccessControlContextFromSubject(Subject subject);
@@ -75,9 +74,8 @@ public interface AMQPConnection<C extend
void sendConnectionCloseAsync(AMQConstant connectionForced, String reason);
- void reserveOutboundMessageSpace(long size);
-
boolean isIOThread();
+ ListenableFuture<Void> doOnIOThreadAsync(final Runnable task);
void checkAuthorizedMessagePrincipal(String messageUserId);
@@ -91,4 +89,8 @@ public interface AMQPConnection<C extend
Collection<? extends AMQSessionModel<?>> getSessionModels();
void resetStatistics();
+
+ void notifyWork(AMQSessionModel<?> sessionModel);
+
+ boolean isTransportBlockedForWriting();
}
Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/AbstractAMQPConnection.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/AbstractAMQPConnection.java?rev=1769837&r1=1769836&r2=1769837&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/AbstractAMQPConnection.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/AbstractAMQPConnection.java Tue Nov 15 14:16:10 2016
@@ -35,11 +35,11 @@ import java.util.Map;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.atomic.AtomicReference;
import javax.security.auth.Subject;
import javax.security.auth.SubjectDomainCombiner;
+import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
import org.slf4j.Logger;
@@ -76,7 +76,7 @@ import org.apache.qpid.server.util.Fixed
import org.apache.qpid.transport.network.NetworkConnection;
import org.apache.qpid.transport.network.Ticker;
-public abstract class AbstractAMQPConnection<C extends AbstractAMQPConnection<C>>
+public abstract class AbstractAMQPConnection<C extends AbstractAMQPConnection<C,T>, T>
extends AbstractConfiguredObject<C>
implements ProtocolEngine, AMQPConnection<C>, EventLoggerProvider
@@ -96,8 +96,6 @@ public abstract class AbstractAMQPConnec
new CopyOnWriteArrayList<>();
private final LogSubject _logSubject;
- private final AtomicReference<Thread> _messageAssignmentAllowedThread = new AtomicReference<>();
- private final AtomicBoolean _messageAssignmentSuspended = new AtomicBoolean();
private volatile ContextProvider _contextProvider;
private volatile EventLoggerProvider _eventLoggerProvider;
private String _clientProduct;
@@ -489,60 +487,52 @@ public abstract class AbstractAMQPConnec
}
@Override
- public boolean isMessageAssignmentSuspended()
+ public void setIOThread(final Thread ioThread)
{
- Thread currentThread = Thread.currentThread();
- if (_messageAssignmentAllowedThread.get() == currentThread && currentThread == _ioThread)
- {
- return false;
- }
- return _messageAssignmentSuspended.get();
+ _ioThread = ioThread;
}
@Override
- public void setMessageAssignmentSuspended(final boolean messageAssignmentSuspended, final boolean notifyConsumers)
+ public boolean isIOThread()
{
- _messageAssignmentSuspended.set(messageAssignmentSuspended);
- if(notifyConsumers)
- {
- for (AMQSessionModel<?> session : getSessionModels())
- {
- if (messageAssignmentSuspended)
- {
- session.ensureConsumersNoticedStateChange();
- }
- else
- {
- session.notifyConsumerTargetCurrentStates();
- }
- }
- }
+ return Thread.currentThread() == _ioThread;
}
@Override
- public void alwaysAllowMessageAssignmentInThisThreadIfItIsIOThread(boolean allowed)
+ public ListenableFuture<Void> doOnIOThreadAsync(final Runnable task)
{
- if (allowed)
+ if (isIOThread())
{
- _messageAssignmentAllowedThread.set(Thread.currentThread());
+ task.run();
+ return Futures.immediateFuture(null);
}
else
{
- _messageAssignmentAllowedThread.set(null);
+ final SettableFuture<Void> future = SettableFuture.create();
+
+ addAsyncTask(
+ new Action<Object>()
+ {
+ @Override
+ public void performAction(final Object object)
+ {
+ try
+ {
+ task.run();
+ future.set(null);
+ }
+ catch (RuntimeException e)
+ {
+ future.setException(e);
+ }
+ }
+ });
+ return future;
}
}
- @Override
- public void setIOThread(final Thread ioThread)
- {
- _ioThread = ioThread;
- }
+ protected abstract void addAsyncTask(final Action<? super T> action);
- @Override
- public boolean isIOThread()
- {
- return Thread.currentThread() == _ioThread;
- }
protected <T> T runAsSubject(PrivilegedAction<T> action)
{
@@ -765,12 +755,6 @@ public abstract class AbstractAMQPConnec
return getSessionModels().size();
}
- @Override
- public void reserveOutboundMessageSpace(final long size)
- {
- _network.reserveOutboundMessageSpace(size);
- }
-
protected void markTransportClosed()
{
_transportClosedFuture.set(null);
Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/MultiVersionProtocolEngine.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/MultiVersionProtocolEngine.java?rev=1769837&r1=1769836&r2=1769837&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/MultiVersionProtocolEngine.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/MultiVersionProtocolEngine.java Tue Nov 15 14:16:10 2016
@@ -91,18 +91,6 @@ public class MultiVersionProtocolEngine
_onCloseTask = onCloseTask;
}
- @Override
- public void setMessageAssignmentSuspended(final boolean value, final boolean notifyConsumers)
- {
- _delegate.setMessageAssignmentSuspended(value, notifyConsumers);
- }
-
- @Override
- public boolean isMessageAssignmentSuspended()
- {
- return _delegate.isMessageAssignmentSuspended();
- }
-
public void closed()
{
_logger.debug("Closed");
@@ -244,18 +232,6 @@ public class MultiVersionProtocolEngine
{
@Override
- public void setMessageAssignmentSuspended(final boolean value, final boolean notifyConsumers)
- {
-
- }
-
- @Override
- public boolean isMessageAssignmentSuspended()
- {
- return false;
- }
-
- @Override
public Iterator<Runnable> processPendingIterator()
{
return Collections.emptyIterator();
@@ -366,17 +342,6 @@ public class MultiVersionProtocolEngine
private final AtomicBoolean _hasWork = new AtomicBoolean();
@Override
- public void setMessageAssignmentSuspended(final boolean value, final boolean notifyConsumers)
- {
- }
-
- @Override
- public boolean isMessageAssignmentSuspended()
- {
- return false;
- }
-
- @Override
public Iterator<Runnable> processPendingIterator()
{
return Collections.emptyIterator();
Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnection.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnection.java?rev=1769837&r1=1769836&r2=1769837&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnection.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnection.java Tue Nov 15 14:16:10 2016
@@ -61,8 +61,6 @@ public class NonBlockingConnection imple
private final AtomicBoolean _closed = new AtomicBoolean(false);
private final ProtocolEngine _protocolEngine;
private final Runnable _onTransportEncryptionAction;
- private final AtomicLong _usedOutboundMessageSpace = new AtomicLong();
- private final long _outboundMessageBufferLimit;
private volatile boolean _fullyWritten = true;
@@ -97,9 +95,6 @@ public class NonBlockingConnection imple
_port = port;
_threadName = SelectorThread.IO_THREAD_NAME_PREFIX + _remoteSocketAddress.toString();
- _outboundMessageBufferLimit = (long) _port.getContextValue(Long.class,
- AmqpPort.PORT_AMQP_OUTBOUND_MESSAGE_BUFFER_SIZE);
-
protocolEngine.setWorkListener(new Action<ProtocolEngine>()
{
@Override
@@ -214,15 +209,6 @@ public class NonBlockingConnection imple
}
@Override
- public void reserveOutboundMessageSpace(long size)
- {
- if (_usedOutboundMessageSpace.addAndGet(size) > _outboundMessageBufferLimit)
- {
- _protocolEngine.setMessageAssignmentSuspended(true, false);
- }
- }
-
- @Override
public String getTransportInfo()
{
return _delegate.getTransportInfo();
@@ -274,7 +260,6 @@ public class NonBlockingConnection imple
}
_protocolEngine.setIOThread(Thread.currentThread());
- _protocolEngine.setMessageAssignmentSuspended(true, true);
boolean processPendingComplete = processPending();
@@ -290,10 +275,6 @@ public class NonBlockingConnection imple
_protocolEngine.notifyWork();
}
- if (_fullyWritten)
- {
- _protocolEngine.setMessageAssignmentSuspended(false, true);
- }
}
else
{
@@ -545,12 +526,7 @@ public class NonBlockingConnection imple
_buffers.poll();
buf.dispose();
}
- if (_fullyWritten)
- {
- _usedOutboundMessageSpace.set(0);
- }
return _fullyWritten;
-
}
protected int readFromNetwork() throws IOException
Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/ProtocolEngine.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/ProtocolEngine.java?rev=1769837&r1=1769836&r2=1769837&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/ProtocolEngine.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/ProtocolEngine.java Tue Nov 15 14:16:10 2016
@@ -53,10 +53,6 @@ public interface ProtocolEngine extends
void setTransportBlockedForWriting(boolean blocked);
- void setMessageAssignmentSuspended(boolean value, final boolean notifyConsumers);
-
- boolean isMessageAssignmentSuspended();
-
Iterator<Runnable> processPendingIterator();
boolean hasWork();
Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/ServerNetworkConnection.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/ServerNetworkConnection.java?rev=1769837&r1=1769836&r2=1769837&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/ServerNetworkConnection.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/ServerNetworkConnection.java Tue Nov 15 14:16:10 2016
@@ -23,8 +23,6 @@ import org.apache.qpid.transport.network
public interface ServerNetworkConnection extends NetworkConnection
{
- void reserveOutboundMessageSpace(long size);
-
String getTransportInfo();
long getScheduledTime();
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org