You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2016/11/15 15:32:07 UTC
svn commit: r1769849 - in /qpid/java/trunk:
broker-core/src/main/java/org/apache/qpid/server/consumer/
broker-core/src/main/java/org/apache/qpid/server/queue/
broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/
broker...
Author: rgodfrey
Date: Tue Nov 15 15:32:07 2016
New Revision: 1769849
URL: http://svn.apache.org/viewvc?rev=1769849&view=rev
Log:
QPID-7514 : Make processing of consumer targets strictly follow round robin. Tidy up responsibilities between AbstractQueue and QueueConsumerManagerImpl
Added:
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/consumer/ScheduledConsumerTargetSet.java (with props)
Modified:
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerManagerImpl.java
qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java
qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java?rev=1769849&r1=1769848&r2=1769849&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java Tue Nov 15 15:32:07 2016
@@ -24,6 +24,7 @@ import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import com.google.common.util.concurrent.Futures;
@@ -59,6 +60,7 @@ public abstract class AbstractConsumerTa
private Iterator<ConsumerImpl> _pullIterator;
private boolean _notifyWorkDesired;
+ private final AtomicBoolean _scheduled = new AtomicBoolean();
protected AbstractConsumerTarget(final boolean isMultiQueue,
final AMQPConnection<?> amqpConnection)
@@ -289,4 +291,14 @@ public abstract class AbstractConsumerTa
return false;
}
}
+
+ final boolean setScheduled()
+ {
+ return _scheduled.compareAndSet(false, true);
+ }
+
+ final void clearScheduled()
+ {
+ _scheduled.set(false);
+ }
}
Added: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/consumer/ScheduledConsumerTargetSet.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/consumer/ScheduledConsumerTargetSet.java?rev=1769849&view=auto
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/consumer/ScheduledConsumerTargetSet.java (added)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/consumer/ScheduledConsumerTargetSet.java Tue Nov 15 15:32:07 2016
@@ -0,0 +1,189 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.consumer;
+
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.Set;
+import java.util.concurrent.ConcurrentLinkedQueue;
+
+public class ScheduledConsumerTargetSet<C extends AbstractConsumerTarget> implements Set<C>
+{
+ private final ConcurrentLinkedQueue<C> _underlying = new ConcurrentLinkedQueue<>();
+
+ @Override
+ public boolean add(final C c)
+ {
+ if(c.setScheduled())
+ {
+ return _underlying.add(c);
+ }
+ else
+ {
+ return false;
+ }
+ }
+
+ @Override
+ public boolean isEmpty()
+ {
+ return _underlying.isEmpty();
+ }
+
+ @Override
+ public int size()
+ {
+ return _underlying.size();
+ }
+
+ @Override
+ public boolean contains(final Object o)
+ {
+ return _underlying.contains(o);
+ }
+
+ @Override
+ public boolean remove(final Object o)
+ {
+ ((C)o).clearScheduled();
+ return _underlying.remove(o);
+ }
+
+ @Override
+ public boolean addAll(final Collection<? extends C> c)
+ {
+ boolean result = false;
+ for(C consumer : c)
+ {
+ result = _underlying.add(consumer) || result;
+ }
+ return result;
+ }
+
+ @Override
+ public Object[] toArray()
+ {
+ return _underlying.toArray();
+ }
+
+ @Override
+ public <T> T[] toArray(final T[] a)
+ {
+ return _underlying.toArray(a);
+ }
+
+ @Override
+ public Iterator<C> iterator()
+ {
+ return new ScheduledConsumerIterator();
+ }
+
+ @Override
+ public void clear()
+ {
+ for(C consumer : _underlying)
+ {
+ remove(consumer);
+ }
+ }
+
+ @Override
+ public boolean containsAll(final Collection<?> c)
+ {
+ return _underlying.containsAll(c);
+ }
+
+ @Override
+ public boolean removeAll(final Collection<?> c)
+ {
+ boolean result = false;
+ for(Object consumer : c)
+ {
+ result = _underlying.remove((C)consumer) || result;
+ }
+ return result;
+ }
+
+ @Override
+ public boolean retainAll(final Collection<?> c)
+ {
+ boolean modified = false;
+ Iterator<C> iterator = iterator();
+ while (iterator.hasNext())
+ {
+ if (!c.contains(iterator.next()))
+ {
+ iterator.remove();
+ modified = true;
+ }
+ }
+ return modified;
+ }
+
+ @Override
+ public String toString()
+ {
+ return _underlying.toString();
+ }
+
+ @Override
+ public boolean equals(final Object o)
+ {
+ return _underlying.equals(o);
+ }
+
+ @Override
+ public int hashCode()
+ {
+ return _underlying.hashCode();
+ }
+
+ private class ScheduledConsumerIterator implements Iterator<C>
+ {
+ private final Iterator<C> _underlyingIterator;
+ private C _current;
+
+ public ScheduledConsumerIterator()
+ {
+ _underlyingIterator = _underlying.iterator();
+ }
+
+ @Override
+ public boolean hasNext()
+ {
+ return _underlyingIterator.hasNext();
+ }
+
+ @Override
+ public C next()
+ {
+ _current = _underlyingIterator.next();
+ return _current;
+ }
+
+ @Override
+ public void remove()
+ {
+ _underlyingIterator.remove();
+ _current.clearScheduled();
+ }
+ }
+}
Propchange: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/consumer/ScheduledConsumerTargetSet.java
------------------------------------------------------------------------------
svn:eol-style = native
Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java?rev=1769849&r1=1769848&r2=1769849&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java Tue Nov 15 15:32:07 2016
@@ -266,47 +266,6 @@ public abstract class AbstractQueue<X ex
private Map<String, String> _mimeTypeToFileExtension = Collections.emptyMap();
private AdvanceConsumersTask _queueHouseKeepingTask;
- void setNotifyWorkDesired(final QueueConsumer consumer, final boolean desired)
- {
- if (_queueConsumerManager.setInterest(consumer, desired))
- {
- if (desired)
- {
- _activeSubscriberCount.incrementAndGet();
- notifyConsumer(consumer);
- }
- else
- {
- _activeSubscriberCount.decrementAndGet();
-
- // iterate over interested and notify one as long as its priority is higher than any notified
- final Iterator<QueueConsumer<?>> consumerIterator = _queueConsumerManager.getInterestedIterator();
- final int highestNotifiedPriority = _queueConsumerManager.getHighestNotifiedPriority();
- while (consumerIterator.hasNext())
- {
- QueueConsumer<?> queueConsumer = consumerIterator.next();
- if (queueConsumer.getPriority() < highestNotifiedPriority || notifyConsumer(queueConsumer))
- {
- break;
- }
- }
- }
- }
- }
-
- private boolean notifyConsumer(final QueueConsumer<?> consumer)
- {
- if(_queueConsumerManager.setNotified(consumer, true))
- {
- consumer.notifyWork();
- return true;
- }
- else
- {
- return false;
- }
- }
-
private interface HoldMethod
{
boolean isHeld(MessageReference<?> message, long evalutaionTime);
@@ -1167,7 +1126,6 @@ public abstract class AbstractQueue<X ex
protected void doEnqueue(final ServerMessage message, final Action<? super MessageInstance> action, MessageEnqueueRecord enqueueRecord)
{
- final QueueConsumer<?> exclusiveSub = _exclusiveSubscriber;
final QueueEntry entry = getEntries().add(message, enqueueRecord);
updateExpiration(entry);
@@ -1888,7 +1846,10 @@ public abstract class AbstractQueue<X ex
messageContainer = attemptDelivery(consumer);
if(messageContainer != null)
{
- _queueConsumerManager.setNotified(consumer, true);
+ if(consumerHasAvailableMessages(consumer))
+ {
+ _queueConsumerManager.setNotified(consumer, true);
+ }
}
if (messageContainer == null && getNextAvailableEntry(consumer) == null)
@@ -2172,6 +2133,54 @@ public abstract class AbstractQueue<X ex
}
+ private boolean consumerHasAvailableMessages(final QueueConsumer consumer)
+ {
+ final QueueEntry queueEntry;
+ return !consumer.acquires() || ((queueEntry = getNextAvailableEntry(consumer)) != null
+ && noHigherPriorityWithCredit(consumer, queueEntry));
+ }
+
+ void setNotifyWorkDesired(final QueueConsumer consumer, final boolean desired)
+ {
+ if (_queueConsumerManager.setInterest(consumer, desired))
+ {
+ if (desired)
+ {
+ _activeSubscriberCount.incrementAndGet();
+ notifyConsumer(consumer);
+ }
+ else
+ {
+ _activeSubscriberCount.decrementAndGet();
+
+ // iterate over interested and notify one as long as its priority is higher than any notified
+ final Iterator<QueueConsumer<?>> consumerIterator = _queueConsumerManager.getInterestedIterator();
+ final int highestNotifiedPriority = _queueConsumerManager.getHighestNotifiedPriority();
+ while (consumerIterator.hasNext())
+ {
+ QueueConsumer<?> queueConsumer = consumerIterator.next();
+ if (queueConsumer.getPriority() < highestNotifiedPriority || notifyConsumer(queueConsumer))
+ {
+ break;
+ }
+ }
+ }
+ }
+ }
+
+ private boolean notifyConsumer(final QueueConsumer<?> consumer)
+ {
+ if(consumerHasAvailableMessages(consumer) && _queueConsumerManager.setNotified(consumer, true))
+ {
+ consumer.notifyWork();
+ return true;
+ }
+ else
+ {
+ return false;
+ }
+ }
+
@Override
public long getPotentialMemoryFootprint()
{
Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerManagerImpl.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerManagerImpl.java?rev=1769849&r1=1769848&r2=1769849&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerManagerImpl.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerManagerImpl.java Tue Nov 15 15:32:07 2016
@@ -138,17 +138,7 @@ public class QueueConsumerManagerImpl im
{
if (notified)
{
- // TODO - Fix responsibility
- QueueEntry queueEntry;
- if ((queueEntry = _queue.getNextAvailableEntry(consumer)) != null
- && _queue.noHigherPriorityWithCredit(consumer, queueEntry))
- {
- return node.moveFromTo(NodeState.INTERESTED, NodeState.NOTIFIED);
- }
- else
- {
- return false;
- }
+ return node.moveFromTo(NodeState.INTERESTED, NodeState.NOTIFIED);
}
else
{
Modified: qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java?rev=1769849&r1=1769848&r2=1769849&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java Tue Nov 15 15:32:07 2016
@@ -58,6 +58,7 @@ import org.apache.qpid.protocol.AMQConst
import org.apache.qpid.server.connection.SessionPrincipal;
import org.apache.qpid.server.consumer.ConsumerImpl;
import org.apache.qpid.server.consumer.ConsumerTarget;
+import org.apache.qpid.server.consumer.ScheduledConsumerTargetSet;
import org.apache.qpid.server.logging.LogMessage;
import org.apache.qpid.server.logging.LogSubject;
import org.apache.qpid.server.logging.messages.ChannelMessages;
@@ -143,9 +144,8 @@ public class ServerSession extends Sessi
private long _blockTime;
private long _blockingTimeout;
private boolean _wireBlockingState;
- private final Set<ConsumerTarget> _consumersWithPendingWork =
- Collections.newSetFromMap(new ConcurrentHashMap<ConsumerTarget, Boolean>());
- private Iterator<ConsumerTarget> _processPendingIterator;
+ private final Set<ConsumerTarget_0_10> _consumersWithPendingWork = new ScheduledConsumerTargetSet<>();
+ private Iterator<ConsumerTarget_0_10> _processPendingIterator;
private final PublishAuthorisationCache _publishAuthCache;
@@ -1218,7 +1218,7 @@ public class ServerSession extends Sessi
if (_processPendingIterator.hasNext())
{
- ConsumerTarget target = _processPendingIterator.next();
+ ConsumerTarget_0_10 target = _processPendingIterator.next();
_processPendingIterator.remove();
if (target.processPending())
{
@@ -1247,7 +1247,7 @@ public class ServerSession extends Sessi
@Override
public void notifyWork(final ConsumerTarget target)
{
- if(_consumersWithPendingWork.add(target))
+ if(_consumersWithPendingWork.add((ConsumerTarget_0_10) target))
{
getAMQPConnection().notifyWork(this);
}
Modified: qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java?rev=1769849&r1=1769848&r2=1769849&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java Tue Nov 15 15:32:07 2016
@@ -39,7 +39,6 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
-import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -60,6 +59,7 @@ import org.apache.qpid.protocol.AMQConst
import org.apache.qpid.server.connection.SessionPrincipal;
import org.apache.qpid.server.consumer.ConsumerImpl;
import org.apache.qpid.server.consumer.ConsumerTarget;
+import org.apache.qpid.server.consumer.ScheduledConsumerTargetSet;
import org.apache.qpid.server.filter.AMQInvalidArgumentException;
import org.apache.qpid.server.filter.ArrivalTimeFilter;
import org.apache.qpid.server.filter.FilterManager;
@@ -164,9 +164,9 @@ public class AMQChannel
/** Maps from consumer tag to subscription instance. Allows us to unsubscribe from a queue. */
private final Map<AMQShortString, ConsumerTarget_0_8> _tag2SubscriptionTargetMap = new HashMap<AMQShortString, ConsumerTarget_0_8>();
- private final Set<ConsumerTarget> _consumersWithPendingWork =
- Collections.newSetFromMap(new ConcurrentHashMap<ConsumerTarget, Boolean>());
- private Iterator<ConsumerTarget> _processPendingIterator;
+ private final Set<ConsumerTarget_0_8> _consumersWithPendingWork = new ScheduledConsumerTargetSet<>();
+
+ private Iterator<ConsumerTarget_0_8> _processPendingIterator;
private final MessageStore _messageStore;
@@ -3744,7 +3744,7 @@ public class AMQChannel
if(_processPendingIterator.hasNext())
{
- ConsumerTarget target = _processPendingIterator.next();
+ ConsumerTarget_0_8 target = _processPendingIterator.next();
_processPendingIterator.remove();
if (target.processPending())
{
@@ -3773,7 +3773,7 @@ public class AMQChannel
@Override
public void notifyWork(final ConsumerTarget target)
{
- if(_consumersWithPendingWork.add(target))
+ if(_consumersWithPendingWork.add((ConsumerTarget_0_8) target))
{
getAMQPConnection().notifyWork(this);
}
Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java?rev=1769849&r1=1769848&r2=1769849&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java Tue Nov 15 15:32:07 2016
@@ -38,7 +38,6 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
-import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -52,6 +51,7 @@ import org.apache.qpid.protocol.AMQConst
import org.apache.qpid.server.connection.SessionPrincipal;
import org.apache.qpid.server.consumer.ConsumerImpl;
import org.apache.qpid.server.consumer.ConsumerTarget;
+import org.apache.qpid.server.consumer.ScheduledConsumerTargetSet;
import org.apache.qpid.server.logging.LogSubject;
import org.apache.qpid.server.message.MessageDestination;
import org.apache.qpid.server.message.MessageSource;
@@ -130,9 +130,8 @@ public class Session_1_0 implements AMQS
private final ConfigurationChangeListener _consumerClosedListener = new ConsumerClosedListener();
private final CopyOnWriteArrayList<ConsumerListener> _consumerListeners = new CopyOnWriteArrayList<ConsumerListener>();
private Session<?> _modelObject;
- private final Set<ConsumerTarget> _consumersWithPendingWork =
- Collections.newSetFromMap(new ConcurrentHashMap<ConsumerTarget, Boolean>());
- private Iterator<ConsumerTarget> _processPendingIterator;
+ private final Set<ConsumerTarget_1_0> _consumersWithPendingWork = new ScheduledConsumerTargetSet<>();
+ private Iterator<ConsumerTarget_1_0> _processPendingIterator;
private SessionState _state ;
@@ -1548,7 +1547,7 @@ public class Session_1_0 implements AMQS
if(_processPendingIterator.hasNext())
{
- ConsumerTarget target = _processPendingIterator.next();
+ ConsumerTarget_1_0 target = _processPendingIterator.next();
_processPendingIterator.remove();
if (target.processPending())
{
@@ -1577,7 +1576,7 @@ public class Session_1_0 implements AMQS
@Override
public void notifyWork(final ConsumerTarget target)
{
- if(_consumersWithPendingWork.add(target))
+ if(_consumersWithPendingWork.add((ConsumerTarget_1_0) target))
{
getAMQPConnection().notifyWork(this);
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org