You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2015/06/11 12:32:31 UTC

svn commit: r1684851 - in /qpid/java/trunk: broker-core/src/main/java/org/apache/qpid/server/consumer/ broker-core/src/main/java/org/apache/qpid/server/protocol/ broker-core/src/test/java/org/apache/qpid/server/consumer/ broker-plugins/amqp-0-10-protoc...

Author: rgodfrey
Date: Thu Jun 11 10:32:30 2015
New Revision: 1684851

URL: http://svn.apache.org/r1684851
Log:
QPID-6582 : Implement fairness amongst session/consumer on a single connection (work by Rob Godfrey and Lorenz Quack)

Modified:
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerTarget.java
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java
    qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java
    qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java
    qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngine_0_10.java
    qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java
    qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java
    qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
    qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java
    qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java
    qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java
    qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java
    qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0_SASL.java
    qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java

Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java?rev=1684851&r1=1684850&r2=1684851&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java Thu Jun 11 10:32:30 2015
@@ -36,7 +36,6 @@ import org.slf4j.LoggerFactory;
 
 import org.apache.qpid.server.message.MessageInstance;
 import org.apache.qpid.server.util.StateChangeListener;
-import org.apache.qpid.thread.ThreadFactory;
 
 public abstract class AbstractConsumerTarget implements ConsumerTarget
 {
@@ -57,18 +56,31 @@ public abstract class AbstractConsumerTa
     }
 
     @Override
-    public void processPending()
+    public boolean processPending()
     {
-        while(hasMessagesToSend())
+        if(hasMessagesToSend())
         {
             sendNextMessage();
+            return true;
         }
+        else
+        {
+            processStateChanged();
+            processClosed();
+            return false;
+        }
+    }
 
-        processStateChanged();
-
-        processClosed();
+    @Override
+    public boolean hasPendingWork()
+    {
+        return hasMessagesToSend() || hasStateChanged() || hasClosed();
     }
 
+    protected abstract boolean hasStateChanged();
+
+    protected abstract boolean hasClosed();
+
     protected abstract void processStateChanged();
 
     protected abstract void processClosed();

Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerTarget.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerTarget.java?rev=1684851&r1=1684850&r2=1684851&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerTarget.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerTarget.java Thu Jun 11 10:32:30 2015
@@ -33,7 +33,9 @@ public interface ConsumerTarget
 
     void removeStateChangeListener(StateChangeListener<ConsumerTarget, State> listener);
 
-    void processPending();
+    boolean processPending();
+
+    boolean hasPendingWork();
 
     enum State
     {

Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java?rev=1684851&r1=1684850&r2=1684851&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java Thu Jun 11 10:32:30 2015
@@ -117,7 +117,7 @@ public interface AMQSessionModel<T exten
 
     void transportStateChanged();
 
-    void processPending();
+    boolean processPending();
 
     void addTicker(Ticker ticker);
     void removeTicker(Ticker ticker);

Modified: qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java?rev=1684851&r1=1684850&r2=1684851&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java (original)
+++ qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java Thu Jun 11 10:32:30 2015
@@ -264,9 +264,15 @@ public class MockConsumer implements Con
     }
 
     @Override
-    public void processPending()
+    public boolean processPending()
     {
+        return false;
+    }
 
+    @Override
+    public boolean hasPendingWork()
+    {
+        return false;
     }
 
     public ArrayList<MessageInstance> getMessages()
@@ -505,9 +511,9 @@ public class MockConsumer implements Con
         }
 
         @Override
-        public void processPending()
+        public boolean processPending()
         {
-
+            return false;
         }
 
         @Override

Modified: qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java?rev=1684851&r1=1684850&r2=1684851&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java Thu Jun 11 10:32:30 2015
@@ -672,4 +672,16 @@ public class ConsumerTarget_0_10 extends
     {
 
     }
+
+    @Override
+    protected boolean hasStateChanged()
+    {
+        return false;
+    }
+
+    @Override
+    protected boolean hasClosed()
+    {
+        return false;
+    }
 }

Modified: qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngine_0_10.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngine_0_10.java?rev=1684851&r1=1684850&r2=1684851&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngine_0_10.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngine_0_10.java Thu Jun 11 10:32:30 2015
@@ -312,7 +312,6 @@ public class ProtocolEngine_0_10  extend
     public void processPending()
     {
         _connection.processPending();
-
     }
 
     @Override

Modified: qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java?rev=1684851&r1=1684850&r2=1684851&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java Thu Jun 11 10:32:30 2015
@@ -30,6 +30,7 @@ import java.security.Principal;
 import java.security.PrivilegedAction;
 import java.text.MessageFormat;
 import java.util.ArrayList;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Queue;
 import java.util.concurrent.ConcurrentLinkedQueue;
@@ -759,10 +760,19 @@ public class ServerConnection extends Co
 
     public void processPending()
     {
-
-        for (AMQSessionModel session : getSessionModels())
+        List<? extends AMQSessionModel<?,?>> sessionsWithPending = new ArrayList<>(getSessionModels());
+        while(!sessionsWithPending.isEmpty())
         {
-            session.processPending();
+            final Iterator<? extends AMQSessionModel<?, ?>> iter = sessionsWithPending.iterator();
+            AMQSessionModel<?, ?> session;
+            while(iter.hasNext())
+            {
+                session = iter.next();
+                if(!session.processPending())
+                {
+                    iter.remove();
+                }
+            }
         }
 
         while(_asyncTaskList.peek() != null)

Modified: qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java?rev=1684851&r1=1684850&r2=1684851&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java Thu Jun 11 10:32:30 2015
@@ -139,6 +139,7 @@ public class ServerSession extends Sessi
     private long _blockTime;
     private long _blockingTimeout;
     private boolean _wireBlockingState;
+    private final List<ConsumerTarget> _consumersWithPendingWork = new ArrayList<>();
 
     public static interface MessageDispositionChangeListener
     {
@@ -1137,7 +1138,7 @@ public class ServerSession extends Sessi
     }
 
     @Override
-    public void processPending()
+    public boolean processPending()
     {
         boolean desiredBlockingState = _blocking.get();
         if (desiredBlockingState != _wireBlockingState)
@@ -1155,11 +1156,33 @@ public class ServerSession extends Sessi
             _blockTime = desiredBlockingState ? System.currentTimeMillis() : 0;
         }
 
+        boolean consumerListNeedsRefreshing;
+        if(_consumersWithPendingWork.isEmpty())
+        {
+            _consumersWithPendingWork.addAll(getSubscriptions());
+            consumerListNeedsRefreshing = false;
+        }
+        else
+        {
+            consumerListNeedsRefreshing = true;
+        }
 
-        for(ConsumerTarget target : getSubscriptions())
+        Iterator<ConsumerTarget> iter = _consumersWithPendingWork.iterator();
+
+        boolean consumerHasMoreWork = false;
+        while(iter.hasNext())
         {
-            target.processPending();
+            final ConsumerTarget target = iter.next();
+            iter.remove();
+            if(target.hasPendingWork())
+            {
+                consumerHasMoreWork = true;
+                target.processPending();
+                break;
+            }
         }
+
+        return consumerHasMoreWork || consumerListNeedsRefreshing;
     }
 
     @Override

Modified: qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java?rev=1684851&r1=1684850&r2=1684851&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java Thu Jun 11 10:32:30 2015
@@ -31,6 +31,7 @@ import java.util.Collections;
 import java.util.EnumSet;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.Iterator;
 import java.util.LinkedHashMap;
 import java.util.LinkedList;
 import java.util.List;
@@ -158,6 +159,8 @@ public class AMQChannel
     /** Maps from consumer tag to subscription instance. Allows us to unsubscribe from a queue. */
     private final Map<AMQShortString, ConsumerTarget_0_8> _tag2SubscriptionTargetMap = new HashMap<AMQShortString, ConsumerTarget_0_8>();
 
+    private final List<ConsumerTarget_0_8> _consumersWithPendingWork = new ArrayList<>();
+
     private final MessageStore _messageStore;
 
     private final LinkedList<AsyncCommand> _unfinishedCommandsQueue = new LinkedList<AsyncCommand>();
@@ -3663,7 +3666,7 @@ public class AMQChannel
     }
 
     @Override
-    public void processPending()
+    public boolean processPending()
     {
 
         boolean desiredBlockingState = _blocking.get();
@@ -3674,10 +3677,33 @@ public class AMQChannel
             _blockTime = desiredBlockingState ? System.currentTimeMillis() : 0;
         }
 
-        for(ConsumerTarget target : _tag2SubscriptionTargetMap.values())
+        boolean consumerListNeedsRefreshing;
+        if(_consumersWithPendingWork.isEmpty())
+        {
+            _consumersWithPendingWork.addAll(_tag2SubscriptionTargetMap.values());
+            consumerListNeedsRefreshing = false;
+        }
+        else
         {
-            target.processPending();
+            consumerListNeedsRefreshing = true;
         }
+
+        Iterator<ConsumerTarget_0_8> iter = _consumersWithPendingWork.iterator();
+
+        boolean consumerHasMoreWork = false;
+        while(iter.hasNext())
+        {
+            final ConsumerTarget_0_8 target = iter.next();
+            iter.remove();
+            if(target.hasPendingWork())
+            {
+                consumerHasMoreWork = true;
+                target.processPending();
+                break;
+            }
+        }
+
+        return consumerHasMoreWork || consumerListNeedsRefreshing;
     }
 
     @Override

Modified: qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java?rev=1684851&r1=1684850&r2=1684851&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java Thu Jun 11 10:32:30 2015
@@ -34,6 +34,7 @@ import java.security.PrivilegedAction;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Queue;
@@ -2003,9 +2004,19 @@ public class AMQProtocolEngine implement
     @Override
     public void processPending()
     {
-        for (AMQSessionModel session : getSessionModels())
+        List<? extends AMQSessionModel<?,?>> sessionsWithPending = new ArrayList<>(getSessionModels());
+        while(!sessionsWithPending.isEmpty())
         {
-            session.processPending();
+            final Iterator<? extends AMQSessionModel<?, ?>> iter = sessionsWithPending.iterator();
+            AMQSessionModel<?, ?> session;
+            while(iter.hasNext())
+            {
+                session = iter.next();
+                if(!session.processPending())
+                {
+                    iter.remove();
+                }
+            }
         }
 
         while(_asyncTaskList.peek() != null)

Modified: qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java?rev=1684851&r1=1684850&r2=1684851&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java Thu Jun 11 10:32:30 2015
@@ -517,7 +517,7 @@ public abstract class ConsumerTarget_0_8
     @Override
     protected void processClosed()
     {
-        if (_needToClose.get() && getState() != State.CLOSED)
+        if (hasClosed())
         {
             close();
             confirmAutoClose();
@@ -530,6 +530,18 @@ public abstract class ConsumerTarget_0_8
 
     }
 
+    @Override
+    protected boolean hasStateChanged()
+    {
+        return false;
+    }
+
+    @Override
+    protected boolean hasClosed()
+    {
+        return (_needToClose.get() && getState() != State.CLOSED);
+    }
+
     public void flushBatched()
     {
         _channel.getConnection().setDeferFlush(false);

Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java?rev=1684851&r1=1684850&r2=1684851&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java Thu Jun 11 10:32:30 2015
@@ -29,6 +29,7 @@ import java.text.MessageFormat;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Queue;
 import java.util.Set;
@@ -539,9 +540,19 @@ public class Connection_1_0 implements C
 
     public void processPending()
     {
-        for (AMQSessionModel session : getSessionModels())
+        List<? extends AMQSessionModel<?,?>> sessionsWithPending = new ArrayList<>(getSessionModels());
+        while(!sessionsWithPending.isEmpty())
         {
-            session.processPending();
+            final Iterator<? extends AMQSessionModel<?, ?>> iter = sessionsWithPending.iterator();
+            AMQSessionModel<?, ?> session;
+            while(iter.hasNext())
+            {
+                session = iter.next();
+                if(!session.processPending())
+                {
+                    iter.remove();
+                }
+            }
         }
 
         while(_asyncTaskList.peek() != null)

Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java?rev=1684851&r1=1684850&r2=1684851&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java Thu Jun 11 10:32:30 2015
@@ -566,4 +566,19 @@ class ConsumerTarget_1_0 extends Abstrac
             }
         }
     }
+
+    @Override
+    protected boolean hasStateChanged()
+    {
+        synchronized (_link.getLock())
+        {
+            return _queueEmpty;
+        }
+    }
+
+    @Override
+    protected boolean hasClosed()
+    {
+        return false;
+    }
 }

Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0_SASL.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0_SASL.java?rev=1684851&r1=1684850&r2=1684851&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0_SASL.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0_SASL.java Thu Jun 11 10:32:30 2015
@@ -601,11 +601,6 @@ public class ProtocolEngine_1_0_0_SASL i
 
     }
 
-    public void flushBatched()
-    {
-        _sender.flush();
-    }
-
     @Override
     public void processPending()
     {

Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java?rev=1684851&r1=1684850&r2=1684851&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java Thu Jun 11 10:32:30 2015
@@ -72,6 +72,7 @@ import org.apache.qpid.amqp_1_0.type.tra
 import org.apache.qpid.protocol.AMQConstant;
 import org.apache.qpid.server.connection.SessionPrincipal;
 import org.apache.qpid.server.consumer.ConsumerImpl;
+import org.apache.qpid.server.consumer.ConsumerTarget;
 import org.apache.qpid.server.exchange.ExchangeImpl;
 import org.apache.qpid.server.logging.LogSubject;
 import org.apache.qpid.server.message.MessageDestination;
@@ -115,6 +116,7 @@ public class Session_1_0 implements Sess
     private final ConfigurationChangeListener _consumerClosedListener = new ConsumerClosedListener();
     private final CopyOnWriteArrayList<ConsumerListener> _consumerListeners = new CopyOnWriteArrayList<ConsumerListener>();
     private Session<?> _modelObject;
+    private final List<ConsumerTarget_1_0> _consumersWithPendingWork = new ArrayList<>();
 
 
     public Session_1_0(final Connection_1_0 connection, final SessionEndpoint endpoint)
@@ -905,13 +907,37 @@ public class Session_1_0 implements Sess
     }
 
     @Override
-    public void processPending()
+    public boolean processPending()
     {
-        for(Consumer<?> consumer : getConsumers())
+        boolean consumerListNeedsRefreshing;
+        if(_consumersWithPendingWork.isEmpty())
         {
+            for(SendingLink_1_0 link : _sendingLinks)
+            {
+                _consumersWithPendingWork.add(link.getConsumerTarget());
+            }
+            consumerListNeedsRefreshing = false;
+        }
+        else
+        {
+            consumerListNeedsRefreshing = true;
+        }
 
-            ((ConsumerImpl)consumer).getTarget().processPending();
+        Iterator<ConsumerTarget_1_0> iter = _consumersWithPendingWork.iterator();
+        boolean consumerHasMoreWork = false;
+        while(iter.hasNext())
+        {
+            final ConsumerTarget target = iter.next();
+            iter.remove();
+            if(target.hasPendingWork())
+            {
+                consumerHasMoreWork = true;
+                target.processPending();
+                break;
+            }
         }
+
+        return consumerHasMoreWork || consumerListNeedsRefreshing;
     }
 
     @Override



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org