You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2015/07/06 13:08:28 UTC

svn commit: r1689364 - in /qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue: AbstractQueue.java AssignedConsumerMessageGroupManager.java DefinedGroupMessageGroupManager.java MessageGroupManager.java

Author: rgodfrey
Date: Mon Jul  6 11:08:28 2015
New Revision: 1689364

URL: http://svn.apache.org/r1689364
Log:
QPID-6621 : [Java Broker] improve the performance of shared groups on large queues

Modified:
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/AssignedConsumerMessageGroupManager.java
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/DefinedGroupMessageGroupManager.java
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/MessageGroupManager.java

Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java?rev=1689364&r1=1689363&r2=1689364&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java Mon Jul  6 11:08:28 2015
@@ -917,7 +917,7 @@ public abstract class AbstractQueue<X ex
 
             if(_messageGroupManager != null)
             {
-                resetSubPointersForGroups(consumer, true);
+                resetSubPointersForGroups(consumer);
             }
 
             synchronized (_consumerListeners)
@@ -990,32 +990,33 @@ public abstract class AbstractQueue<X ex
         }
     }
 
-    public void resetSubPointersForGroups(QueueConsumer<?> consumer, boolean clearAssignments)
+    public void resetSubPointersForGroups(QueueConsumer<?> consumer)
     {
         QueueEntry entry = _messageGroupManager.findEarliestAssignedAvailableEntry(consumer);
-        if(clearAssignments)
+        _messageGroupManager.clearAssignments(consumer);
+
+        if(entry != null)
         {
-            _messageGroupManager.clearAssignments(consumer);
+            resetSubPointersForGroups(entry);
         }
+    }
 
-        if(entry != null)
+    public void resetSubPointersForGroups(final QueueEntry entry)
+    {
+        QueueConsumerList.ConsumerNodeIterator subscriberIter = _consumerList.iterator();
+        // iterate over all the subscribers, and if they are in advance of this queue entry then move them backwards
+        while (subscriberIter.advance())
         {
-            QueueConsumerList.ConsumerNodeIterator subscriberIter = _consumerList.iterator();
-            // iterate over all the subscribers, and if they are in advance of this queue entry then move them backwards
-            while (subscriberIter.advance())
-            {
-                QueueConsumer<?> sub = subscriberIter.getNode().getConsumer();
+            QueueConsumer<?> sub = subscriberIter.getNode().getConsumer();
 
-                // we don't make browsers send the same stuff twice
-                if (sub.seesRequeues())
-                {
-                    updateSubRequeueEntry(sub, entry);
-                }
+            // we don't make browsers send the same stuff twice
+            if (sub.seesRequeues())
+            {
+                updateSubRequeueEntry(sub, entry);
             }
-
-            deliverAsync();
-
         }
+
+        deliverAsync();
     }
 
     public void addBinding(final BindingImpl binding)
@@ -1338,12 +1339,7 @@ public abstract class AbstractQueue<X ex
 
     private boolean mightAssign(final QueueConsumer sub, final QueueEntry entry)
     {
-        if(_messageGroupManager == null || !sub.acquires())
-        {
-            return true;
-        }
-        QueueConsumer assigned = _messageGroupManager.getAssignedConsumer(entry);
-        return (assigned == null) || (assigned == sub);
+        return _messageGroupManager == null || !sub.acquires() || _messageGroupManager.mightAssign(entry, sub);
     }
 
     protected void checkConsumersNotAheadOfDelivery(final QueueEntry entry)

Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/AssignedConsumerMessageGroupManager.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/AssignedConsumerMessageGroupManager.java?rev=1689364&r1=1689363&r2=1689364&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/AssignedConsumerMessageGroupManager.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/AssignedConsumerMessageGroupManager.java Mon Jul  6 11:08:28 2015
@@ -53,10 +53,19 @@ public class AssignedConsumerMessageGrou
         return val;
     }
 
-    public QueueConsumer<?> getAssignedConsumer(final QueueEntry entry)
+    public boolean mightAssign(final QueueEntry entry, QueueConsumer sub)
     {
         Object groupVal = entry.getMessage().getMessageHeader().getHeader(_groupId);
-        return groupVal == null ? null : _groupMap.get(groupVal.hashCode() & _groupMask);
+
+        if(groupVal == null)
+        {
+            return true;
+        }
+        else
+        {
+            QueueConsumer assignedSub = _groupMap.get(groupVal.hashCode() & _groupMask);
+            return assignedSub == null || assignedSub == sub;
+        }
     }
 
     public boolean acceptMessage(QueueConsumer<?> sub, QueueEntry entry)

Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/DefinedGroupMessageGroupManager.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/DefinedGroupMessageGroupManager.java?rev=1689364&r1=1689363&r2=1689364&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/DefinedGroupMessageGroupManager.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/DefinedGroupMessageGroupManager.java Mon Jul  6 11:08:28 2015
@@ -31,6 +31,8 @@ import org.apache.qpid.server.message.Se
 
 import java.util.HashMap;
 import java.util.Map;
+import java.util.SortedSet;
+import java.util.TreeSet;
 
 public class DefinedGroupMessageGroupManager implements MessageGroupManager
 {
@@ -46,6 +48,7 @@ public class DefinedGroupMessageGroupMan
         private final Object _group;
         private QueueConsumer<?> _consumer;
         private int _activeCount;
+        private final SortedSet<QueueEntry> _skippedEntries = new TreeSet<>();
 
         private Group(final Object key, final QueueConsumer<?> consumer)
         {
@@ -66,11 +69,19 @@ public class DefinedGroupMessageGroupMan
             }
         }
         
-        public void subtract()
+        public void subtract(final QueueEntry entry, final boolean released)
         {
+            if(!released)
+            {
+                _skippedEntries.remove(entry);
+            }
             if(--_activeCount == 0)
             {
-                _resetHelper.resetSubPointersForGroups(_consumer, false);
+                if(!_skippedEntries.isEmpty())
+                {
+                    _resetHelper.resetSubPointersForGroups(_skippedEntries.first());
+                    _skippedEntries.clear();
+                }
                 _consumer = null;
                 _groupMap.remove(_group);
             }
@@ -118,6 +129,11 @@ public class DefinedGroupMessageGroupMan
                     ", _activeCount=" + _activeCount +
                     '}';
         }
+
+        public void addSkippedEntry(final QueueEntry entry)
+        {
+            _skippedEntries.add(entry);
+        }
     }
 
     public DefinedGroupMessageGroupManager(final String groupId, String defaultGroup, ConsumerResetHelper resetHelper)
@@ -127,12 +143,17 @@ public class DefinedGroupMessageGroupMan
         _resetHelper = resetHelper;
     }
     
-    public synchronized QueueConsumer<?> getAssignedConsumer(final QueueEntry entry)
+    public synchronized boolean mightAssign(final QueueEntry entry, final QueueConsumer sub)
     {
         Object groupId = getKey(entry);
 
         Group group = _groupMap.get(groupId);
-        return group == null || !group.isValid() ? null : group.getConsumer();
+        final boolean possibleAssginment = group == null || !group.isValid() || group.getConsumer() == sub;
+        if(!possibleAssginment)
+        {
+            group.addSkippedEntry(entry);
+        }
+        return possibleAssginment;
     }
 
     public synchronized boolean acceptMessage(final QueueConsumer<?> sub, final QueueEntry entry)
@@ -169,6 +190,7 @@ public class DefinedGroupMessageGroupMan
         }
         else
         {
+            group.addSkippedEntry(entry);
             return false;            
         }
     }
@@ -259,7 +281,7 @@ public class DefinedGroupMessageGroupMan
                         }
                         else if(oldState == QueueEntry.State.ACQUIRED)
                         {
-                            _group.subtract();
+                            _group.subtract((QueueEntry) entry, newState == MessageInstance.State.AVAILABLE);
                         }
                     }
                 }

Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/MessageGroupManager.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/MessageGroupManager.java?rev=1689364&r1=1689363&r2=1689364&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/MessageGroupManager.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/MessageGroupManager.java Mon Jul  6 11:08:28 2015
@@ -24,12 +24,12 @@ public interface MessageGroupManager
 {
     public interface ConsumerResetHelper
     {
-        public void resetSubPointersForGroups(QueueConsumer<?> consumer, boolean clearAssignments);
+        void resetSubPointersForGroups(QueueEntry entry);
 
         boolean isEntryAheadOfConsumer(QueueEntry entry, QueueConsumer<?> sub);
     }
 
-    QueueConsumer getAssignedConsumer(QueueEntry entry);
+    boolean mightAssign(QueueEntry entry, final QueueConsumer sub);
 
     boolean acceptMessage(QueueConsumer<?> sub, QueueEntry entry);
 



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org