You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2015/06/11 12:32:31 UTC
svn commit: r1684851 - in /qpid/java/trunk:
broker-core/src/main/java/org/apache/qpid/server/consumer/
broker-core/src/main/java/org/apache/qpid/server/protocol/
broker-core/src/test/java/org/apache/qpid/server/consumer/
broker-plugins/amqp-0-10-protoc...
Author: rgodfrey
Date: Thu Jun 11 10:32:30 2015
New Revision: 1684851
URL: http://svn.apache.org/r1684851
Log:
QPID-6582 : Implement fairness amongst session/consumer on a single connection (work by Rob Godfrey and Lorenz Quack)
Modified:
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerTarget.java
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java
qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java
qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java
qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngine_0_10.java
qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java
qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java
qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java
qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0_SASL.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java?rev=1684851&r1=1684850&r2=1684851&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java Thu Jun 11 10:32:30 2015
@@ -36,7 +36,6 @@ import org.slf4j.LoggerFactory;
import org.apache.qpid.server.message.MessageInstance;
import org.apache.qpid.server.util.StateChangeListener;
-import org.apache.qpid.thread.ThreadFactory;
public abstract class AbstractConsumerTarget implements ConsumerTarget
{
@@ -57,18 +56,31 @@ public abstract class AbstractConsumerTa
}
@Override
- public void processPending()
+ public boolean processPending()
{
- while(hasMessagesToSend())
+ if(hasMessagesToSend())
{
sendNextMessage();
+ return true;
}
+ else
+ {
+ processStateChanged();
+ processClosed();
+ return false;
+ }
+ }
- processStateChanged();
-
- processClosed();
+ @Override
+ public boolean hasPendingWork()
+ {
+ return hasMessagesToSend() || hasStateChanged() || hasClosed();
}
+ protected abstract boolean hasStateChanged();
+
+ protected abstract boolean hasClosed();
+
protected abstract void processStateChanged();
protected abstract void processClosed();
Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerTarget.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerTarget.java?rev=1684851&r1=1684850&r2=1684851&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerTarget.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerTarget.java Thu Jun 11 10:32:30 2015
@@ -33,7 +33,9 @@ public interface ConsumerTarget
void removeStateChangeListener(StateChangeListener<ConsumerTarget, State> listener);
- void processPending();
+ boolean processPending();
+
+ boolean hasPendingWork();
enum State
{
Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java?rev=1684851&r1=1684850&r2=1684851&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java Thu Jun 11 10:32:30 2015
@@ -117,7 +117,7 @@ public interface AMQSessionModel<T exten
void transportStateChanged();
- void processPending();
+ boolean processPending();
void addTicker(Ticker ticker);
void removeTicker(Ticker ticker);
Modified: qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java?rev=1684851&r1=1684850&r2=1684851&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java (original)
+++ qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java Thu Jun 11 10:32:30 2015
@@ -264,9 +264,15 @@ public class MockConsumer implements Con
}
@Override
- public void processPending()
+ public boolean processPending()
{
+ return false;
+ }
+ @Override
+ public boolean hasPendingWork()
+ {
+ return false;
}
public ArrayList<MessageInstance> getMessages()
@@ -505,9 +511,9 @@ public class MockConsumer implements Con
}
@Override
- public void processPending()
+ public boolean processPending()
{
-
+ return false;
}
@Override
Modified: qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java?rev=1684851&r1=1684850&r2=1684851&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java Thu Jun 11 10:32:30 2015
@@ -672,4 +672,16 @@ public class ConsumerTarget_0_10 extends
{
}
+
+ @Override
+ protected boolean hasStateChanged()
+ {
+ return false;
+ }
+
+ @Override
+ protected boolean hasClosed()
+ {
+ return false;
+ }
}
Modified: qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngine_0_10.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngine_0_10.java?rev=1684851&r1=1684850&r2=1684851&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngine_0_10.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngine_0_10.java Thu Jun 11 10:32:30 2015
@@ -312,7 +312,6 @@ public class ProtocolEngine_0_10 extend
public void processPending()
{
_connection.processPending();
-
}
@Override
Modified: qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java?rev=1684851&r1=1684850&r2=1684851&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java Thu Jun 11 10:32:30 2015
@@ -30,6 +30,7 @@ import java.security.Principal;
import java.security.PrivilegedAction;
import java.text.MessageFormat;
import java.util.ArrayList;
+import java.util.Iterator;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
@@ -759,10 +760,19 @@ public class ServerConnection extends Co
public void processPending()
{
-
- for (AMQSessionModel session : getSessionModels())
+ List<? extends AMQSessionModel<?,?>> sessionsWithPending = new ArrayList<>(getSessionModels());
+ while(!sessionsWithPending.isEmpty())
{
- session.processPending();
+ final Iterator<? extends AMQSessionModel<?, ?>> iter = sessionsWithPending.iterator();
+ AMQSessionModel<?, ?> session;
+ while(iter.hasNext())
+ {
+ session = iter.next();
+ if(!session.processPending())
+ {
+ iter.remove();
+ }
+ }
}
while(_asyncTaskList.peek() != null)
Modified: qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java?rev=1684851&r1=1684850&r2=1684851&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java Thu Jun 11 10:32:30 2015
@@ -139,6 +139,7 @@ public class ServerSession extends Sessi
private long _blockTime;
private long _blockingTimeout;
private boolean _wireBlockingState;
+ private final List<ConsumerTarget> _consumersWithPendingWork = new ArrayList<>();
public static interface MessageDispositionChangeListener
{
@@ -1137,7 +1138,7 @@ public class ServerSession extends Sessi
}
@Override
- public void processPending()
+ public boolean processPending()
{
boolean desiredBlockingState = _blocking.get();
if (desiredBlockingState != _wireBlockingState)
@@ -1155,11 +1156,33 @@ public class ServerSession extends Sessi
_blockTime = desiredBlockingState ? System.currentTimeMillis() : 0;
}
+ boolean consumerListNeedsRefreshing;
+ if(_consumersWithPendingWork.isEmpty())
+ {
+ _consumersWithPendingWork.addAll(getSubscriptions());
+ consumerListNeedsRefreshing = false;
+ }
+ else
+ {
+ consumerListNeedsRefreshing = true;
+ }
- for(ConsumerTarget target : getSubscriptions())
+ Iterator<ConsumerTarget> iter = _consumersWithPendingWork.iterator();
+
+ boolean consumerHasMoreWork = false;
+ while(iter.hasNext())
{
- target.processPending();
+ final ConsumerTarget target = iter.next();
+ iter.remove();
+ if(target.hasPendingWork())
+ {
+ consumerHasMoreWork = true;
+ target.processPending();
+ break;
+ }
}
+
+ return consumerHasMoreWork || consumerListNeedsRefreshing;
}
@Override
Modified: qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java?rev=1684851&r1=1684850&r2=1684851&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java Thu Jun 11 10:32:30 2015
@@ -31,6 +31,7 @@ import java.util.Collections;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.HashSet;
+import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.LinkedList;
import java.util.List;
@@ -158,6 +159,8 @@ public class AMQChannel
/** Maps from consumer tag to subscription instance. Allows us to unsubscribe from a queue. */
private final Map<AMQShortString, ConsumerTarget_0_8> _tag2SubscriptionTargetMap = new HashMap<AMQShortString, ConsumerTarget_0_8>();
+ private final List<ConsumerTarget_0_8> _consumersWithPendingWork = new ArrayList<>();
+
private final MessageStore _messageStore;
private final LinkedList<AsyncCommand> _unfinishedCommandsQueue = new LinkedList<AsyncCommand>();
@@ -3663,7 +3666,7 @@ public class AMQChannel
}
@Override
- public void processPending()
+ public boolean processPending()
{
boolean desiredBlockingState = _blocking.get();
@@ -3674,10 +3677,33 @@ public class AMQChannel
_blockTime = desiredBlockingState ? System.currentTimeMillis() : 0;
}
- for(ConsumerTarget target : _tag2SubscriptionTargetMap.values())
+ boolean consumerListNeedsRefreshing;
+ if(_consumersWithPendingWork.isEmpty())
+ {
+ _consumersWithPendingWork.addAll(_tag2SubscriptionTargetMap.values());
+ consumerListNeedsRefreshing = false;
+ }
+ else
{
- target.processPending();
+ consumerListNeedsRefreshing = true;
}
+
+ Iterator<ConsumerTarget_0_8> iter = _consumersWithPendingWork.iterator();
+
+ boolean consumerHasMoreWork = false;
+ while(iter.hasNext())
+ {
+ final ConsumerTarget_0_8 target = iter.next();
+ iter.remove();
+ if(target.hasPendingWork())
+ {
+ consumerHasMoreWork = true;
+ target.processPending();
+ break;
+ }
+ }
+
+ return consumerHasMoreWork || consumerListNeedsRefreshing;
}
@Override
Modified: qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java?rev=1684851&r1=1684850&r2=1684851&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java Thu Jun 11 10:32:30 2015
@@ -34,6 +34,7 @@ import java.security.PrivilegedAction;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
+import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Queue;
@@ -2003,9 +2004,19 @@ public class AMQProtocolEngine implement
@Override
public void processPending()
{
- for (AMQSessionModel session : getSessionModels())
+ List<? extends AMQSessionModel<?,?>> sessionsWithPending = new ArrayList<>(getSessionModels());
+ while(!sessionsWithPending.isEmpty())
{
- session.processPending();
+ final Iterator<? extends AMQSessionModel<?, ?>> iter = sessionsWithPending.iterator();
+ AMQSessionModel<?, ?> session;
+ while(iter.hasNext())
+ {
+ session = iter.next();
+ if(!session.processPending())
+ {
+ iter.remove();
+ }
+ }
}
while(_asyncTaskList.peek() != null)
Modified: qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java?rev=1684851&r1=1684850&r2=1684851&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java Thu Jun 11 10:32:30 2015
@@ -517,7 +517,7 @@ public abstract class ConsumerTarget_0_8
@Override
protected void processClosed()
{
- if (_needToClose.get() && getState() != State.CLOSED)
+ if (hasClosed())
{
close();
confirmAutoClose();
@@ -530,6 +530,18 @@ public abstract class ConsumerTarget_0_8
}
+ @Override
+ protected boolean hasStateChanged()
+ {
+ return false;
+ }
+
+ @Override
+ protected boolean hasClosed()
+ {
+ return (_needToClose.get() && getState() != State.CLOSED);
+ }
+
public void flushBatched()
{
_channel.getConnection().setDeferFlush(false);
Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java?rev=1684851&r1=1684850&r2=1684851&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java Thu Jun 11 10:32:30 2015
@@ -29,6 +29,7 @@ import java.text.MessageFormat;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
+import java.util.Iterator;
import java.util.List;
import java.util.Queue;
import java.util.Set;
@@ -539,9 +540,19 @@ public class Connection_1_0 implements C
public void processPending()
{
- for (AMQSessionModel session : getSessionModels())
+ List<? extends AMQSessionModel<?,?>> sessionsWithPending = new ArrayList<>(getSessionModels());
+ while(!sessionsWithPending.isEmpty())
{
- session.processPending();
+ final Iterator<? extends AMQSessionModel<?, ?>> iter = sessionsWithPending.iterator();
+ AMQSessionModel<?, ?> session;
+ while(iter.hasNext())
+ {
+ session = iter.next();
+ if(!session.processPending())
+ {
+ iter.remove();
+ }
+ }
}
while(_asyncTaskList.peek() != null)
Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java?rev=1684851&r1=1684850&r2=1684851&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java Thu Jun 11 10:32:30 2015
@@ -566,4 +566,19 @@ class ConsumerTarget_1_0 extends Abstrac
}
}
}
+
+ @Override
+ protected boolean hasStateChanged()
+ {
+ synchronized (_link.getLock())
+ {
+ return _queueEmpty;
+ }
+ }
+
+ @Override
+ protected boolean hasClosed()
+ {
+ return false;
+ }
}
Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0_SASL.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0_SASL.java?rev=1684851&r1=1684850&r2=1684851&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0_SASL.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0_SASL.java Thu Jun 11 10:32:30 2015
@@ -601,11 +601,6 @@ public class ProtocolEngine_1_0_0_SASL i
}
- public void flushBatched()
- {
- _sender.flush();
- }
-
@Override
public void processPending()
{
Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java?rev=1684851&r1=1684850&r2=1684851&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java Thu Jun 11 10:32:30 2015
@@ -72,6 +72,7 @@ import org.apache.qpid.amqp_1_0.type.tra
import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.server.connection.SessionPrincipal;
import org.apache.qpid.server.consumer.ConsumerImpl;
+import org.apache.qpid.server.consumer.ConsumerTarget;
import org.apache.qpid.server.exchange.ExchangeImpl;
import org.apache.qpid.server.logging.LogSubject;
import org.apache.qpid.server.message.MessageDestination;
@@ -115,6 +116,7 @@ public class Session_1_0 implements Sess
private final ConfigurationChangeListener _consumerClosedListener = new ConsumerClosedListener();
private final CopyOnWriteArrayList<ConsumerListener> _consumerListeners = new CopyOnWriteArrayList<ConsumerListener>();
private Session<?> _modelObject;
+ private final List<ConsumerTarget_1_0> _consumersWithPendingWork = new ArrayList<>();
public Session_1_0(final Connection_1_0 connection, final SessionEndpoint endpoint)
@@ -905,13 +907,37 @@ public class Session_1_0 implements Sess
}
@Override
- public void processPending()
+ public boolean processPending()
{
- for(Consumer<?> consumer : getConsumers())
+ boolean consumerListNeedsRefreshing;
+ if(_consumersWithPendingWork.isEmpty())
{
+ for(SendingLink_1_0 link : _sendingLinks)
+ {
+ _consumersWithPendingWork.add(link.getConsumerTarget());
+ }
+ consumerListNeedsRefreshing = false;
+ }
+ else
+ {
+ consumerListNeedsRefreshing = true;
+ }
- ((ConsumerImpl)consumer).getTarget().processPending();
+ Iterator<ConsumerTarget_1_0> iter = _consumersWithPendingWork.iterator();
+ boolean consumerHasMoreWork = false;
+ while(iter.hasNext())
+ {
+ final ConsumerTarget target = iter.next();
+ iter.remove();
+ if(target.hasPendingWork())
+ {
+ consumerHasMoreWork = true;
+ target.processPending();
+ break;
+ }
}
+
+ return consumerHasMoreWork || consumerListNeedsRefreshing;
}
@Override
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org