You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2016/11/15 15:32:07 UTC

svn commit: r1769849 - in /qpid/java/trunk: broker-core/src/main/java/org/apache/qpid/server/consumer/ broker-core/src/main/java/org/apache/qpid/server/queue/ broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ broker...

Author: rgodfrey
Date: Tue Nov 15 15:32:07 2016
New Revision: 1769849

URL: http://svn.apache.org/viewvc?rev=1769849&view=rev
Log:
QPID-7514 : Make processing of consumer targets strictly follow round robin.  Tidy up responsibilities between AbstractQueue and QueueConsumerManagerImpl

Added:
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/consumer/ScheduledConsumerTargetSet.java   (with props)
Modified:
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerManagerImpl.java
    qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java
    qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
    qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java

Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java?rev=1769849&r1=1769848&r2=1769849&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java Tue Nov 15 15:32:07 2016
@@ -24,6 +24,7 @@ import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.List;
 import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicReference;
 
 import com.google.common.util.concurrent.Futures;
@@ -59,6 +60,7 @@ public abstract class AbstractConsumerTa
 
     private Iterator<ConsumerImpl> _pullIterator;
     private boolean _notifyWorkDesired;
+    private final AtomicBoolean _scheduled = new AtomicBoolean();
 
     protected AbstractConsumerTarget(final boolean isMultiQueue,
                                      final AMQPConnection<?> amqpConnection)
@@ -289,4 +291,14 @@ public abstract class AbstractConsumerTa
             return false;
         }
     }
+
+    final boolean setScheduled()
+    {
+        return _scheduled.compareAndSet(false, true);
+    }
+
+    final void clearScheduled()
+    {
+        _scheduled.set(false);
+    }
 }

Added: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/consumer/ScheduledConsumerTargetSet.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/consumer/ScheduledConsumerTargetSet.java?rev=1769849&view=auto
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/consumer/ScheduledConsumerTargetSet.java (added)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/consumer/ScheduledConsumerTargetSet.java Tue Nov 15 15:32:07 2016
@@ -0,0 +1,189 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.consumer;
+
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.Set;
+import java.util.concurrent.ConcurrentLinkedQueue;
+
+public class ScheduledConsumerTargetSet<C extends AbstractConsumerTarget> implements Set<C>
+{
+    private final ConcurrentLinkedQueue<C> _underlying = new ConcurrentLinkedQueue<>();
+
+    @Override
+    public boolean add(final C c)
+    {
+        if(c.setScheduled())
+        {
+            return _underlying.add(c);
+        }
+        else
+        {
+            return false;
+        }
+    }
+
+    @Override
+    public boolean isEmpty()
+    {
+        return _underlying.isEmpty();
+    }
+
+    @Override
+    public int size()
+    {
+        return _underlying.size();
+    }
+
+    @Override
+    public boolean contains(final Object o)
+    {
+        return _underlying.contains(o);
+    }
+
+    @Override
+    public boolean remove(final Object o)
+    {
+        ((C)o).clearScheduled();
+        return _underlying.remove(o);
+    }
+
+    @Override
+    public boolean addAll(final Collection<? extends C> c)
+    {
+        boolean result = false;
+        for(C consumer : c)
+        {
+            result = _underlying.add(consumer) || result;
+        }
+        return result;
+    }
+
+    @Override
+    public Object[] toArray()
+    {
+        return _underlying.toArray();
+    }
+
+    @Override
+    public <T> T[] toArray(final T[] a)
+    {
+        return _underlying.toArray(a);
+    }
+
+    @Override
+    public Iterator<C> iterator()
+    {
+        return new ScheduledConsumerIterator();
+    }
+
+    @Override
+    public void clear()
+    {
+        for(C consumer : _underlying)
+        {
+            remove(consumer);
+        }
+    }
+
+    @Override
+    public boolean containsAll(final Collection<?> c)
+    {
+        return _underlying.containsAll(c);
+    }
+
+    @Override
+    public boolean removeAll(final Collection<?> c)
+    {
+        boolean result = false;
+        for(Object consumer : c)
+        {
+            result = _underlying.remove((C)consumer) || result;
+        }
+        return result;
+    }
+
+    @Override
+    public boolean retainAll(final Collection<?> c)
+    {
+        boolean modified = false;
+        Iterator<C> iterator = iterator();
+        while (iterator.hasNext())
+        {
+            if (!c.contains(iterator.next()))
+            {
+                iterator.remove();
+                modified = true;
+            }
+        }
+        return modified;
+    }
+
+    @Override
+    public String toString()
+    {
+        return _underlying.toString();
+    }
+
+    @Override
+    public boolean equals(final Object o)
+    {
+        return _underlying.equals(o);
+    }
+
+    @Override
+    public int hashCode()
+    {
+        return _underlying.hashCode();
+    }
+
+    private class ScheduledConsumerIterator implements Iterator<C>
+    {
+        private final Iterator<C> _underlyingIterator;
+        private C _current;
+
+        public ScheduledConsumerIterator()
+        {
+            _underlyingIterator = _underlying.iterator();
+        }
+
+        @Override
+        public boolean hasNext()
+        {
+            return _underlyingIterator.hasNext();
+        }
+
+        @Override
+        public C next()
+        {
+            _current = _underlyingIterator.next();
+            return _current;
+        }
+
+        @Override
+        public void remove()
+        {
+            _underlyingIterator.remove();
+            _current.clearScheduled();
+        }
+    }
+}

Propchange: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/consumer/ScheduledConsumerTargetSet.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java?rev=1769849&r1=1769848&r2=1769849&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java Tue Nov 15 15:32:07 2016
@@ -266,47 +266,6 @@ public abstract class AbstractQueue<X ex
     private Map<String, String> _mimeTypeToFileExtension = Collections.emptyMap();
     private AdvanceConsumersTask _queueHouseKeepingTask;
 
-    void setNotifyWorkDesired(final QueueConsumer consumer, final boolean desired)
-    {
-        if (_queueConsumerManager.setInterest(consumer, desired))
-        {
-            if (desired)
-            {
-                _activeSubscriberCount.incrementAndGet();
-                notifyConsumer(consumer);
-            }
-            else
-            {
-                _activeSubscriberCount.decrementAndGet();
-
-                // iterate over interested and notify one as long as its priority is higher than any notified
-                final Iterator<QueueConsumer<?>> consumerIterator = _queueConsumerManager.getInterestedIterator();
-                final int highestNotifiedPriority = _queueConsumerManager.getHighestNotifiedPriority();
-                while (consumerIterator.hasNext())
-                {
-                    QueueConsumer<?> queueConsumer = consumerIterator.next();
-                    if (queueConsumer.getPriority() < highestNotifiedPriority || notifyConsumer(queueConsumer))
-                    {
-                        break;
-                    }
-                }
-            }
-        }
-    }
-
-    private boolean notifyConsumer(final QueueConsumer<?> consumer)
-    {
-        if(_queueConsumerManager.setNotified(consumer, true))
-        {
-            consumer.notifyWork();
-            return true;
-        }
-        else
-        {
-            return false;
-        }
-    }
-
     private interface HoldMethod
     {
         boolean isHeld(MessageReference<?> message, long evalutaionTime);
@@ -1167,7 +1126,6 @@ public abstract class AbstractQueue<X ex
 
     protected void doEnqueue(final ServerMessage message, final Action<? super MessageInstance> action, MessageEnqueueRecord enqueueRecord)
     {
-        final QueueConsumer<?> exclusiveSub = _exclusiveSubscriber;
         final QueueEntry entry = getEntries().add(message, enqueueRecord);
         updateExpiration(entry);
 
@@ -1888,7 +1846,10 @@ public abstract class AbstractQueue<X ex
                 messageContainer = attemptDelivery(consumer);
                 if(messageContainer != null)
                 {
-                    _queueConsumerManager.setNotified(consumer, true);
+                    if(consumerHasAvailableMessages(consumer))
+                    {
+                        _queueConsumerManager.setNotified(consumer, true);
+                    }
                 }
 
                 if (messageContainer == null && getNextAvailableEntry(consumer) == null)
@@ -2172,6 +2133,54 @@ public abstract class AbstractQueue<X ex
 
     }
 
+    private boolean consumerHasAvailableMessages(final QueueConsumer consumer)
+    {
+        final QueueEntry queueEntry;
+        return !consumer.acquires() || ((queueEntry = getNextAvailableEntry(consumer)) != null
+                                        && noHigherPriorityWithCredit(consumer, queueEntry));
+    }
+
+    void setNotifyWorkDesired(final QueueConsumer consumer, final boolean desired)
+    {
+        if (_queueConsumerManager.setInterest(consumer, desired))
+        {
+            if (desired)
+            {
+                _activeSubscriberCount.incrementAndGet();
+                notifyConsumer(consumer);
+            }
+            else
+            {
+                _activeSubscriberCount.decrementAndGet();
+
+                // iterate over interested and notify one as long as its priority is higher than any notified
+                final Iterator<QueueConsumer<?>> consumerIterator = _queueConsumerManager.getInterestedIterator();
+                final int highestNotifiedPriority = _queueConsumerManager.getHighestNotifiedPriority();
+                while (consumerIterator.hasNext())
+                {
+                    QueueConsumer<?> queueConsumer = consumerIterator.next();
+                    if (queueConsumer.getPriority() < highestNotifiedPriority || notifyConsumer(queueConsumer))
+                    {
+                        break;
+                    }
+                }
+            }
+        }
+    }
+
+    private boolean notifyConsumer(final QueueConsumer<?> consumer)
+    {
+        if(consumerHasAvailableMessages(consumer) && _queueConsumerManager.setNotified(consumer, true))
+        {
+            consumer.notifyWork();
+            return true;
+        }
+        else
+        {
+            return false;
+        }
+    }
+
     @Override
     public long getPotentialMemoryFootprint()
     {

Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerManagerImpl.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerManagerImpl.java?rev=1769849&r1=1769848&r2=1769849&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerManagerImpl.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerManagerImpl.java Tue Nov 15 15:32:07 2016
@@ -138,17 +138,7 @@ public class QueueConsumerManagerImpl im
         {
             if (notified)
             {
-                // TODO - Fix responsibility
-                QueueEntry queueEntry;
-                if ((queueEntry = _queue.getNextAvailableEntry(consumer)) != null
-                    && _queue.noHigherPriorityWithCredit(consumer, queueEntry))
-                {
-                    return node.moveFromTo(NodeState.INTERESTED, NodeState.NOTIFIED);
-                }
-                else
-                {
-                    return false;
-                }
+                return node.moveFromTo(NodeState.INTERESTED, NodeState.NOTIFIED);
             }
             else
             {

Modified: qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java?rev=1769849&r1=1769848&r2=1769849&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java Tue Nov 15 15:32:07 2016
@@ -58,6 +58,7 @@ import org.apache.qpid.protocol.AMQConst
 import org.apache.qpid.server.connection.SessionPrincipal;
 import org.apache.qpid.server.consumer.ConsumerImpl;
 import org.apache.qpid.server.consumer.ConsumerTarget;
+import org.apache.qpid.server.consumer.ScheduledConsumerTargetSet;
 import org.apache.qpid.server.logging.LogMessage;
 import org.apache.qpid.server.logging.LogSubject;
 import org.apache.qpid.server.logging.messages.ChannelMessages;
@@ -143,9 +144,8 @@ public class ServerSession extends Sessi
     private long _blockTime;
     private long _blockingTimeout;
     private boolean _wireBlockingState;
-    private final Set<ConsumerTarget> _consumersWithPendingWork =
-            Collections.newSetFromMap(new ConcurrentHashMap<ConsumerTarget, Boolean>());
-    private Iterator<ConsumerTarget> _processPendingIterator;
+    private final Set<ConsumerTarget_0_10> _consumersWithPendingWork = new ScheduledConsumerTargetSet<>();
+    private Iterator<ConsumerTarget_0_10> _processPendingIterator;
 
     private final PublishAuthorisationCache _publishAuthCache;
 
@@ -1218,7 +1218,7 @@ public class ServerSession extends Sessi
 
             if (_processPendingIterator.hasNext())
             {
-                ConsumerTarget target = _processPendingIterator.next();
+                ConsumerTarget_0_10 target = _processPendingIterator.next();
                 _processPendingIterator.remove();
                 if (target.processPending())
                 {
@@ -1247,7 +1247,7 @@ public class ServerSession extends Sessi
     @Override
     public void notifyWork(final ConsumerTarget target)
     {
-        if(_consumersWithPendingWork.add(target))
+        if(_consumersWithPendingWork.add((ConsumerTarget_0_10) target))
         {
             getAMQPConnection().notifyWork(this);
         }

Modified: qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java?rev=1769849&r1=1769848&r2=1769849&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java Tue Nov 15 15:32:07 2016
@@ -39,7 +39,6 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.UUID;
-import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -60,6 +59,7 @@ import org.apache.qpid.protocol.AMQConst
 import org.apache.qpid.server.connection.SessionPrincipal;
 import org.apache.qpid.server.consumer.ConsumerImpl;
 import org.apache.qpid.server.consumer.ConsumerTarget;
+import org.apache.qpid.server.consumer.ScheduledConsumerTargetSet;
 import org.apache.qpid.server.filter.AMQInvalidArgumentException;
 import org.apache.qpid.server.filter.ArrivalTimeFilter;
 import org.apache.qpid.server.filter.FilterManager;
@@ -164,9 +164,9 @@ public class AMQChannel
     /** Maps from consumer tag to subscription instance. Allows us to unsubscribe from a queue. */
     private final Map<AMQShortString, ConsumerTarget_0_8> _tag2SubscriptionTargetMap = new HashMap<AMQShortString, ConsumerTarget_0_8>();
 
-    private final Set<ConsumerTarget> _consumersWithPendingWork =
-            Collections.newSetFromMap(new ConcurrentHashMap<ConsumerTarget, Boolean>());
-    private Iterator<ConsumerTarget> _processPendingIterator;
+    private final Set<ConsumerTarget_0_8> _consumersWithPendingWork = new ScheduledConsumerTargetSet<>();
+
+    private Iterator<ConsumerTarget_0_8> _processPendingIterator;
 
     private final MessageStore _messageStore;
 
@@ -3744,7 +3744,7 @@ public class AMQChannel
 
             if(_processPendingIterator.hasNext())
             {
-                ConsumerTarget target = _processPendingIterator.next();
+                ConsumerTarget_0_8 target = _processPendingIterator.next();
                 _processPendingIterator.remove();
                 if (target.processPending())
                 {
@@ -3773,7 +3773,7 @@ public class AMQChannel
     @Override
     public void notifyWork(final ConsumerTarget target)
     {
-        if(_consumersWithPendingWork.add(target))
+        if(_consumersWithPendingWork.add((ConsumerTarget_0_8) target))
         {
             getAMQPConnection().notifyWork(this);
         }

Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java?rev=1769849&r1=1769848&r2=1769849&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java Tue Nov 15 15:32:07 2016
@@ -38,7 +38,6 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.UUID;
-import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.atomic.AtomicBoolean;
 
@@ -52,6 +51,7 @@ import org.apache.qpid.protocol.AMQConst
 import org.apache.qpid.server.connection.SessionPrincipal;
 import org.apache.qpid.server.consumer.ConsumerImpl;
 import org.apache.qpid.server.consumer.ConsumerTarget;
+import org.apache.qpid.server.consumer.ScheduledConsumerTargetSet;
 import org.apache.qpid.server.logging.LogSubject;
 import org.apache.qpid.server.message.MessageDestination;
 import org.apache.qpid.server.message.MessageSource;
@@ -130,9 +130,8 @@ public class Session_1_0 implements AMQS
     private final ConfigurationChangeListener _consumerClosedListener = new ConsumerClosedListener();
     private final CopyOnWriteArrayList<ConsumerListener> _consumerListeners = new CopyOnWriteArrayList<ConsumerListener>();
     private Session<?> _modelObject;
-    private final Set<ConsumerTarget> _consumersWithPendingWork =
-            Collections.newSetFromMap(new ConcurrentHashMap<ConsumerTarget, Boolean>());
-    private Iterator<ConsumerTarget> _processPendingIterator;
+    private final Set<ConsumerTarget_1_0> _consumersWithPendingWork = new ScheduledConsumerTargetSet<>();
+    private Iterator<ConsumerTarget_1_0> _processPendingIterator;
 
     private SessionState _state ;
 
@@ -1548,7 +1547,7 @@ public class Session_1_0 implements AMQS
 
             if(_processPendingIterator.hasNext())
             {
-                ConsumerTarget target = _processPendingIterator.next();
+                ConsumerTarget_1_0 target = _processPendingIterator.next();
                 _processPendingIterator.remove();
                 if (target.processPending())
                 {
@@ -1577,7 +1576,7 @@ public class Session_1_0 implements AMQS
     @Override
     public void notifyWork(final ConsumerTarget target)
     {
-        if(_consumersWithPendingWork.add(target))
+        if(_consumersWithPendingWork.add((ConsumerTarget_1_0) target))
         {
             getAMQPConnection().notifyWork(this);
         }



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org