You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2015/07/06 13:08:28 UTC
svn commit: r1689364 - in
/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue:
AbstractQueue.java AssignedConsumerMessageGroupManager.java
DefinedGroupMessageGroupManager.java MessageGroupManager.java
Author: rgodfrey
Date: Mon Jul 6 11:08:28 2015
New Revision: 1689364
URL: http://svn.apache.org/r1689364
Log:
QPID-6621 : [Java Broker] improve the performance of shared groups on large queues
Modified:
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/AssignedConsumerMessageGroupManager.java
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/DefinedGroupMessageGroupManager.java
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/MessageGroupManager.java
Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java?rev=1689364&r1=1689363&r2=1689364&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java Mon Jul 6 11:08:28 2015
@@ -917,7 +917,7 @@ public abstract class AbstractQueue<X ex
if(_messageGroupManager != null)
{
- resetSubPointersForGroups(consumer, true);
+ resetSubPointersForGroups(consumer);
}
synchronized (_consumerListeners)
@@ -990,32 +990,33 @@ public abstract class AbstractQueue<X ex
}
}
- public void resetSubPointersForGroups(QueueConsumer<?> consumer, boolean clearAssignments)
+ public void resetSubPointersForGroups(QueueConsumer<?> consumer)
{
QueueEntry entry = _messageGroupManager.findEarliestAssignedAvailableEntry(consumer);
- if(clearAssignments)
+ _messageGroupManager.clearAssignments(consumer);
+
+ if(entry != null)
{
- _messageGroupManager.clearAssignments(consumer);
+ resetSubPointersForGroups(entry);
}
+ }
- if(entry != null)
+ public void resetSubPointersForGroups(final QueueEntry entry)
+ {
+ QueueConsumerList.ConsumerNodeIterator subscriberIter = _consumerList.iterator();
+ // iterate over all the subscribers, and if they are in advance of this queue entry then move them backwards
+ while (subscriberIter.advance())
{
- QueueConsumerList.ConsumerNodeIterator subscriberIter = _consumerList.iterator();
- // iterate over all the subscribers, and if they are in advance of this queue entry then move them backwards
- while (subscriberIter.advance())
- {
- QueueConsumer<?> sub = subscriberIter.getNode().getConsumer();
+ QueueConsumer<?> sub = subscriberIter.getNode().getConsumer();
- // we don't make browsers send the same stuff twice
- if (sub.seesRequeues())
- {
- updateSubRequeueEntry(sub, entry);
- }
+ // we don't make browsers send the same stuff twice
+ if (sub.seesRequeues())
+ {
+ updateSubRequeueEntry(sub, entry);
}
-
- deliverAsync();
-
}
+
+ deliverAsync();
}
public void addBinding(final BindingImpl binding)
@@ -1338,12 +1339,7 @@ public abstract class AbstractQueue<X ex
private boolean mightAssign(final QueueConsumer sub, final QueueEntry entry)
{
- if(_messageGroupManager == null || !sub.acquires())
- {
- return true;
- }
- QueueConsumer assigned = _messageGroupManager.getAssignedConsumer(entry);
- return (assigned == null) || (assigned == sub);
+ return _messageGroupManager == null || !sub.acquires() || _messageGroupManager.mightAssign(entry, sub);
}
protected void checkConsumersNotAheadOfDelivery(final QueueEntry entry)
Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/AssignedConsumerMessageGroupManager.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/AssignedConsumerMessageGroupManager.java?rev=1689364&r1=1689363&r2=1689364&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/AssignedConsumerMessageGroupManager.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/AssignedConsumerMessageGroupManager.java Mon Jul 6 11:08:28 2015
@@ -53,10 +53,19 @@ public class AssignedConsumerMessageGrou
return val;
}
- public QueueConsumer<?> getAssignedConsumer(final QueueEntry entry)
+ public boolean mightAssign(final QueueEntry entry, QueueConsumer sub)
{
Object groupVal = entry.getMessage().getMessageHeader().getHeader(_groupId);
- return groupVal == null ? null : _groupMap.get(groupVal.hashCode() & _groupMask);
+
+ if(groupVal == null)
+ {
+ return true;
+ }
+ else
+ {
+ QueueConsumer assignedSub = _groupMap.get(groupVal.hashCode() & _groupMask);
+ return assignedSub == null || assignedSub == sub;
+ }
}
public boolean acceptMessage(QueueConsumer<?> sub, QueueEntry entry)
Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/DefinedGroupMessageGroupManager.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/DefinedGroupMessageGroupManager.java?rev=1689364&r1=1689363&r2=1689364&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/DefinedGroupMessageGroupManager.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/DefinedGroupMessageGroupManager.java Mon Jul 6 11:08:28 2015
@@ -31,6 +31,8 @@ import org.apache.qpid.server.message.Se
import java.util.HashMap;
import java.util.Map;
+import java.util.SortedSet;
+import java.util.TreeSet;
public class DefinedGroupMessageGroupManager implements MessageGroupManager
{
@@ -46,6 +48,7 @@ public class DefinedGroupMessageGroupMan
private final Object _group;
private QueueConsumer<?> _consumer;
private int _activeCount;
+ private final SortedSet<QueueEntry> _skippedEntries = new TreeSet<>();
private Group(final Object key, final QueueConsumer<?> consumer)
{
@@ -66,11 +69,19 @@ public class DefinedGroupMessageGroupMan
}
}
- public void subtract()
+ public void subtract(final QueueEntry entry, final boolean released)
{
+ if(!released)
+ {
+ _skippedEntries.remove(entry);
+ }
if(--_activeCount == 0)
{
- _resetHelper.resetSubPointersForGroups(_consumer, false);
+ if(!_skippedEntries.isEmpty())
+ {
+ _resetHelper.resetSubPointersForGroups(_skippedEntries.first());
+ _skippedEntries.clear();
+ }
_consumer = null;
_groupMap.remove(_group);
}
@@ -118,6 +129,11 @@ public class DefinedGroupMessageGroupMan
", _activeCount=" + _activeCount +
'}';
}
+
+ public void addSkippedEntry(final QueueEntry entry)
+ {
+ _skippedEntries.add(entry);
+ }
}
public DefinedGroupMessageGroupManager(final String groupId, String defaultGroup, ConsumerResetHelper resetHelper)
@@ -127,12 +143,17 @@ public class DefinedGroupMessageGroupMan
_resetHelper = resetHelper;
}
- public synchronized QueueConsumer<?> getAssignedConsumer(final QueueEntry entry)
+ public synchronized boolean mightAssign(final QueueEntry entry, final QueueConsumer sub)
{
Object groupId = getKey(entry);
Group group = _groupMap.get(groupId);
- return group == null || !group.isValid() ? null : group.getConsumer();
+ final boolean possibleAssginment = group == null || !group.isValid() || group.getConsumer() == sub;
+ if(!possibleAssginment)
+ {
+ group.addSkippedEntry(entry);
+ }
+ return possibleAssginment;
}
public synchronized boolean acceptMessage(final QueueConsumer<?> sub, final QueueEntry entry)
@@ -169,6 +190,7 @@ public class DefinedGroupMessageGroupMan
}
else
{
+ group.addSkippedEntry(entry);
return false;
}
}
@@ -259,7 +281,7 @@ public class DefinedGroupMessageGroupMan
}
else if(oldState == QueueEntry.State.ACQUIRED)
{
- _group.subtract();
+ _group.subtract((QueueEntry) entry, newState == MessageInstance.State.AVAILABLE);
}
}
}
Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/MessageGroupManager.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/MessageGroupManager.java?rev=1689364&r1=1689363&r2=1689364&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/MessageGroupManager.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/MessageGroupManager.java Mon Jul 6 11:08:28 2015
@@ -24,12 +24,12 @@ public interface MessageGroupManager
{
public interface ConsumerResetHelper
{
- public void resetSubPointersForGroups(QueueConsumer<?> consumer, boolean clearAssignments);
+ void resetSubPointersForGroups(QueueEntry entry);
boolean isEntryAheadOfConsumer(QueueEntry entry, QueueConsumer<?> sub);
}
- QueueConsumer getAssignedConsumer(QueueEntry entry);
+ boolean mightAssign(QueueEntry entry, final QueueConsumer sub);
boolean acceptMessage(QueueConsumer<?> sub, QueueEntry entry);
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org