You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2016/11/15 14:16:10 UTC
svn commit: r1769837 [3/4] - in /qpid/java/trunk: ./
bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/
broker-core/src/main/java/org/apache/qpid/server/consumer/
broker-core/src/main/java/org/apache/qpid/server/flow/ broker-core...
Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractSystemMessageSource.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractSystemMessageSource.java?rev=1769837&r1=1769836&r2=1769837&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractSystemMessageSource.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractSystemMessageSource.java Tue Nov 15 14:16:10 2016
@@ -40,10 +40,10 @@ import org.apache.qpid.server.message.Se
import org.apache.qpid.server.message.internal.InternalMessage;
import org.apache.qpid.server.model.NamedAddressSpace;
import org.apache.qpid.server.protocol.AMQSessionModel;
+import org.apache.qpid.server.queue.AbstractQueue;
import org.apache.qpid.server.store.MessageDurability;
import org.apache.qpid.server.store.MessageEnqueueRecord;
import org.apache.qpid.server.store.TransactionLogResource;
-import org.apache.qpid.server.transport.AMQPConnection;
import org.apache.qpid.server.txn.ServerTransaction;
import org.apache.qpid.server.util.Action;
import org.apache.qpid.server.util.StateChangeListener;
@@ -88,7 +88,7 @@ public abstract class AbstractSystemMess
final String consumerName,
final EnumSet<ConsumerImpl.Option> options, final Integer priority)
throws ExistingExclusiveConsumer, ExistingConsumerPreventsExclusive,
- ConsumerAccessRefused
+ ConsumerAccessRefused, QueueDeleted
{
final Consumer consumer = new Consumer(consumerName, target);
target.consumerAdded(consumer);
@@ -116,21 +116,21 @@ public abstract class AbstractSystemMess
Collections.synchronizedList(new ArrayList<PropertiesMessageInstance>());
private final ConsumerTarget _target;
private final String _name;
- private final StateChangeListener<ConsumerTarget, ConsumerTarget.State> _targetChangeListener =
- new Consumer.TargetChangeListener();
public Consumer(final String consumerName, ConsumerTarget target)
{
_name = consumerName;
_target = target;
- target.addStateListener(_targetChangeListener);
}
@Override
public void externalStateChange()
{
-
+ if(!_queue.isEmpty())
+ {
+ _target.notifyWork();
+ }
}
@Override
@@ -140,43 +140,27 @@ public abstract class AbstractSystemMess
}
@Override
- public boolean hasAvailableMessages()
+ public AbstractQueue.MessageContainer pullMessage()
{
- return !_queue.isEmpty();
- }
-
- @Override
- public void pullMessage()
- {
- AMQPConnection<?> connection = _target.getSessionModel().getAMQPConnection();
- _target.getSendLock();
- try
+ if (!_queue.isEmpty())
{
- connection.alwaysAllowMessageAssignmentInThisThreadIfItIsIOThread(true);
-
- try
- {
- if (!_queue.isEmpty())
- {
- final PropertiesMessageInstance propertiesMessageInstance = _queue.get(0);
- if (!_target.isSuspended() && _target.allocateCredit(propertiesMessageInstance.getMessage()))
- {
- _queue.remove(0);
- _target.send(this, propertiesMessageInstance, false);
- }
- }
- }
- finally
+ final PropertiesMessageInstance propertiesMessageInstance = _queue.get(0);
+ if (!_target.isSuspended() && _target.allocateCredit(propertiesMessageInstance.getMessage()))
{
- connection.alwaysAllowMessageAssignmentInThisThreadIfItIsIOThread(false);
+ _queue.remove(0);
+ return new AbstractQueue.MessageContainer(propertiesMessageInstance, null);
}
}
- finally
+ return null;
+ }
+
+ @Override
+ public void setNotifyWorkDesired(final boolean desired)
+ {
+ if (desired && !_queue.isEmpty())
{
- _target.releaseSendLock();
+ _target.notifyWork();
}
-
-
}
@Override
@@ -224,7 +208,7 @@ public abstract class AbstractSystemMess
@Override
public boolean isSuspended()
{
- return false;
+ return !isActive();
}
@Override
@@ -252,27 +236,9 @@ public abstract class AbstractSystemMess
}
@Override
- public boolean trySendLock()
- {
- return _target.trySendLock();
- }
-
- @Override
- public void getSendLock()
- {
- _target.getSendLock();
- }
-
- @Override
- public void releaseSendLock()
- {
- _target.releaseSendLock();
- }
-
- @Override
public boolean isActive()
{
- return false;
+ return _target.isNotifyWorkDesired();
}
@Override
@@ -281,85 +247,11 @@ public abstract class AbstractSystemMess
return _name;
}
- @Override
- public void flush()
- {
- AMQPConnection<?> connection = getSessionModel().getAMQPConnection();
- try
- {
- connection.alwaysAllowMessageAssignmentInThisThreadIfItIsIOThread(true);
- deliverMessages();
- _target.processPending();
- }
- finally
- {
- connection.alwaysAllowMessageAssignmentInThisThreadIfItIsIOThread(false);
- }
- }
-
-
public void send(final InternalMessage response)
{
- _target.getSendLock();
- try
- {
- final PropertiesMessageInstance
- responseEntry = new PropertiesMessageInstance(this, response);
- if (_queue.isEmpty() && _target.allocateCredit(response))
- {
- _target.send(this, responseEntry, false);
- }
- else
- {
- _queue.add(responseEntry);
- }
- }
- finally
- {
- _target.releaseSendLock();
- }
+ _queue.add(new PropertiesMessageInstance(this, response));
+ _target.notifyWork();
}
-
- private class TargetChangeListener implements StateChangeListener<ConsumerTarget, ConsumerTarget.State>
- {
- @Override
- public void stateChanged(final ConsumerTarget object,
- final ConsumerTarget.State oldState,
- final ConsumerTarget.State newState)
- {
- if (newState == ConsumerTarget.State.ACTIVE)
- {
- deliverMessages();
- }
- }
- }
-
- private void deliverMessages()
- {
- _target.getSendLock();
- try
- {
- while (!_queue.isEmpty())
- {
-
- final PropertiesMessageInstance propertiesMessageInstance = _queue.get(0);
- if (!_target.isSuspended() && _target.allocateCredit(propertiesMessageInstance.getMessage()))
- {
- _queue.remove(0);
- _target.send(this, propertiesMessageInstance, false);
- }
- else
- {
- break;
- }
- }
- }
- finally
- {
- _target.releaseSendLock();
- }
- }
-
}
class PropertiesMessageInstance implements MessageInstance
@@ -549,12 +441,6 @@ public abstract class AbstractSystemMess
}
@Override
- public boolean resend()
- {
- return false;
- }
-
- @Override
public void delete()
{
_isDeleted = true;
Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java?rev=1769837&r1=1769836&r2=1769837&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java Tue Nov 15 14:16:10 2016
@@ -1173,9 +1173,10 @@ public abstract class AbstractVirtualHos
*/
public void scheduleHouseKeepingTask(long period, HouseKeepingTask task)
{
- _houseKeepingTaskExecutor.scheduleAtFixedRate(task, period / 2, period, TimeUnit.MILLISECONDS);
+ task.setFuture(_houseKeepingTaskExecutor.scheduleAtFixedRate(task, period / 2, period, TimeUnit.MILLISECONDS));
}
+
public ScheduledFuture<?> scheduleTask(long delay, Runnable task)
{
return _houseKeepingTaskExecutor.schedule(task, delay, TimeUnit.MILLISECONDS);
@@ -1444,7 +1445,7 @@ public abstract class AbstractVirtualHos
}
@Override
- protected void onClose()
+ protected ListenableFuture<Void> onClose()
{
_dtxRegistry.close();
shutdownHouseKeeping();
@@ -1455,6 +1456,7 @@ public abstract class AbstractVirtualHos
_eventLogger.message(VirtualHostMessages.CLOSED(getName()));
stopLogging(_virtualHostLoggersToClose);
+ return Futures.immediateFuture(null);
}
Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/HouseKeepingTask.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/HouseKeepingTask.java?rev=1769837&r1=1769836&r2=1769837&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/HouseKeepingTask.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/HouseKeepingTask.java Tue Nov 15 14:16:10 2016
@@ -23,6 +23,7 @@ package org.apache.qpid.server.virtualho
import java.security.AccessControlContext;
import java.security.AccessController;
import java.security.PrivilegedAction;
+import java.util.concurrent.ScheduledFuture;
import org.apache.qpid.server.model.VirtualHost;
@@ -30,6 +31,7 @@ public abstract class HouseKeepingTask i
{
private final String _name;
private final AccessControlContext _accessControlContext;
+ private ScheduledFuture<?> _future;
public HouseKeepingTask(String name, VirtualHost vhost, AccessControlContext context)
{
@@ -65,4 +67,17 @@ public abstract class HouseKeepingTask i
/** Execute the plugin. */
public abstract void execute();
+ void setFuture(final ScheduledFuture<?> future)
+ {
+ _future = future;
+ }
+
+ public synchronized void cancel()
+ {
+ if(_future != null)
+ {
+ _future.cancel(false);
+ _future = null;
+ }
+ }
}
Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostPropertiesNode.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostPropertiesNode.java?rev=1769837&r1=1769836&r2=1769837&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostPropertiesNode.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostPropertiesNode.java Tue Nov 15 14:16:10 2016
@@ -53,7 +53,7 @@ public class VirtualHostPropertiesNode e
final String consumerName,
final EnumSet<ConsumerImpl.Option> options, final Integer priority)
throws ExistingExclusiveConsumer, ExistingConsumerPreventsExclusive,
- ConsumerAccessRefused
+ ConsumerAccessRefused, QueueDeleted
{
final Consumer consumer = super.addConsumer(target, filters, messageClass, consumerName, options, priority);
consumer.send(createMessage());
@@ -66,10 +66,10 @@ public class VirtualHostPropertiesNode e
Map<String, Object> headers = new HashMap<>();
- final List<String> globalAddresseDomains = _addressSpace.getGlobalAddressDomains();
- if (globalAddresseDomains != null && !globalAddresseDomains.isEmpty())
+ final List<String> globalAddressDomains = _addressSpace.getGlobalAddressDomains();
+ if (globalAddressDomains != null && !globalAddressDomains.isEmpty())
{
- String primaryDomain = globalAddresseDomains.get(0);
+ String primaryDomain = globalAddressDomains.get(0);
if(primaryDomain != null)
{
primaryDomain = primaryDomain.trim();
Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/AbstractVirtualHostNode.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/AbstractVirtualHostNode.java?rev=1769837&r1=1769836&r2=1769837&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/AbstractVirtualHostNode.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/AbstractVirtualHostNode.java Tue Nov 15 14:16:10 2016
@@ -405,10 +405,11 @@ public abstract class AbstractVirtualHos
}
@Override
- protected void onClose()
+ protected ListenableFuture<Void> onClose()
{
closeConfigurationStore();
_virtualHostExecutor.stop();
+ return Futures.immediateFuture(null);
}
private void closeConfigurationStore()
Modified: qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/configuration/store/ManagementModeStoreHandlerTest.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/configuration/store/ManagementModeStoreHandlerTest.java?rev=1769837&r1=1769836&r2=1769837&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/configuration/store/ManagementModeStoreHandlerTest.java (original)
+++ qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/configuration/store/ManagementModeStoreHandlerTest.java Tue Nov 15 14:16:10 2016
@@ -142,8 +142,9 @@ public class ManagementModeStoreHandlerT
}
@Override
- protected void onClose()
+ protected ListenableFuture<Void> onClose()
{
+ return Futures.immediateFuture(null);
}
@StateTransition(currentState = State.UNINITIALIZED, desiredState = State.QUIESCED)
Modified: qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/queue/AbstractQueueTestBase.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/queue/AbstractQueueTestBase.java?rev=1769837&r1=1769836&r2=1769837&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/queue/AbstractQueueTestBase.java (original)
+++ qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/queue/AbstractQueueTestBase.java Tue Nov 15 14:16:10 2016
@@ -48,7 +48,7 @@ import org.slf4j.LoggerFactory;
import org.apache.qpid.exchange.ExchangeDefaults;
import org.apache.qpid.server.consumer.ConsumerImpl;
-import org.apache.qpid.server.consumer.MockConsumer;
+import org.apache.qpid.server.consumer.TestConsumerTarget;
import org.apache.qpid.server.exchange.DirectExchange;
import org.apache.qpid.server.message.AMQMessageHeader;
import org.apache.qpid.server.message.InstanceProperties;
@@ -61,6 +61,7 @@ import org.apache.qpid.server.model.Brok
import org.apache.qpid.server.model.Exchange;
import org.apache.qpid.server.model.Queue;
import org.apache.qpid.server.model.QueueNotificationListener;
+import org.apache.qpid.server.model.VirtualHost;
import org.apache.qpid.server.queue.AbstractQueue.QueueEntryFilter;
import org.apache.qpid.server.store.TransactionLogResource;
import org.apache.qpid.server.util.Action;
@@ -71,14 +72,13 @@ import org.apache.qpid.test.utils.QpidTe
abstract class AbstractQueueTestBase extends QpidTestCase
{
private static final Logger _logger = LoggerFactory.getLogger(AbstractQueueTestBase.class);
- private long _queueRunnerWaitTime;
private Queue<?> _queue;
private QueueManagingVirtualHost<?> _virtualHost;
private String _qname = "qname";
private String _owner = "owner";
private String _routingKey = "routing key";
private DirectExchange _exchange;
- private MockConsumer _consumerTarget = new MockConsumer();
+ private TestConsumerTarget _consumerTarget = new TestConsumerTarget();
private QueueConsumer<?> _consumer;
private Map<String,Object> _arguments = Collections.emptyMap();
@@ -97,8 +97,6 @@ abstract class AbstractQueueTestBase ext
_queue = _virtualHost.createChild(Queue.class, attributes);
_exchange = (DirectExchange) _virtualHost.getChildByName(Exchange.class, ExchangeDefaults.DIRECT_EXCHANGE_NAME);
- _queueRunnerWaitTime = Long.getLong("AbstractQueueTestBase.queueRunnerWaitTime", 150L);
- _logger.debug("Using AbstractQueueTestBase.queueRunnerWaitTime {}", _queueRunnerWaitTime);
}
@Override
@@ -182,7 +180,7 @@ abstract class AbstractQueueTestBase ext
// Check sending a message ends up with the subscriber
_queue.enqueue(messageA, null, null);
- Thread.sleep(_queueRunnerWaitTime);
+ while(_consumerTarget.processPending());
assertEquals(messageA, _consumer.getQueueContext().getLastSeenEntry().getMessage());
assertNull(_consumer.getQueueContext().getReleasedEntry());
@@ -207,7 +205,7 @@ abstract class AbstractQueueTestBase ext
_consumer = (QueueConsumer<?>) _queue.addConsumer(_consumerTarget, null, messageA.getClass(), "test",
EnumSet.of(ConsumerImpl.Option.ACQUIRES,
ConsumerImpl.Option.SEES_REQUEUES), 0);
- Thread.sleep(_queueRunnerWaitTime);
+ while(_consumerTarget.processPending());
assertEquals(messageA, _consumer.getQueueContext().getLastSeenEntry().getMessage());
assertNull("There should be no releasedEntry after an enqueue",
_consumer.getQueueContext().getReleasedEntry());
@@ -225,7 +223,7 @@ abstract class AbstractQueueTestBase ext
_consumer = (QueueConsumer<?>) _queue.addConsumer(_consumerTarget, null, messageA.getClass(), "test",
EnumSet.of(ConsumerImpl.Option.ACQUIRES,
ConsumerImpl.Option.SEES_REQUEUES), 0);
- Thread.sleep(_queueRunnerWaitTime);
+ while(_consumerTarget.processPending());
assertEquals(messageB, _consumer.getQueueContext().getLastSeenEntry().getMessage());
assertNull("There should be no releasedEntry after enqueues",
_consumer.getQueueContext().getReleasedEntry());
@@ -248,12 +246,12 @@ abstract class AbstractQueueTestBase ext
_consumer = (QueueConsumer<?>) _queue.addConsumer(_consumerTarget, null, messageA.getClass(), "test",
EnumSet.of(ConsumerImpl.Option.ACQUIRES,
ConsumerImpl.Option.SEES_REQUEUES), 0);
- Thread.sleep(_queueRunnerWaitTime);
+ while(_consumerTarget.processPending());
assertEquals("Message which was not yet valid was received", 0, _consumerTarget.getMessages().size());
when(messageHeader.getNotValidBefore()).thenReturn(System.currentTimeMillis()-100L);
_queue.checkMessageStatus();
- Thread.sleep(_queueRunnerWaitTime);
+ while(_consumerTarget.processPending());
assertEquals("Message which was valid was not received", 1, _consumerTarget.getMessages().size());
}
@@ -274,7 +272,7 @@ abstract class AbstractQueueTestBase ext
_consumer = (QueueConsumer<?>) _queue.addConsumer(_consumerTarget, null, messageA.getClass(), "test",
EnumSet.of(ConsumerImpl.Option.ACQUIRES,
ConsumerImpl.Option.SEES_REQUEUES), 0);
- Thread.sleep(_queueRunnerWaitTime);
+ while(_consumerTarget.processPending());
assertEquals("Message was held despite queue not having holding enabled", 1, _consumerTarget.getMessages().size());
@@ -300,14 +298,14 @@ abstract class AbstractQueueTestBase ext
_consumer = (QueueConsumer<?>) _queue.addConsumer(_consumerTarget, null, messageA.getClass(), "test",
EnumSet.of(ConsumerImpl.Option.ACQUIRES,
ConsumerImpl.Option.SEES_REQUEUES), 0);
- Thread.sleep(_queueRunnerWaitTime);
+ while(_consumerTarget.processPending());
assertEquals("Expect one message (message B)", 1, _consumerTarget.getMessages().size());
assertEquals("Wrong message received", messageB.getMessageHeader().getMessageId(), _consumerTarget.getMessages().get(0).getMessage().getMessageHeader().getMessageId());
when(messageHeader.getNotValidBefore()).thenReturn(System.currentTimeMillis()-100L);
_queue.checkMessageStatus();
- Thread.sleep(_queueRunnerWaitTime);
+ while(_consumerTarget.processPending());
assertEquals("Message which was valid was not received", 2, _consumerTarget.getMessages().size());
assertEquals("Wrong message received", messageA.getMessageHeader().getMessageId(), _consumerTarget.getMessages().get(1).getMessage().getMessageHeader().getMessageId());
@@ -338,7 +336,7 @@ abstract class AbstractQueueTestBase ext
_queue.enqueue(messageB, postEnqueueAction, null);
_queue.enqueue(messageC, postEnqueueAction, null);
- Thread.sleep(_queueRunnerWaitTime); // Work done by QueueRunner Thread
+ while(_consumerTarget.processPending());
assertEquals("Unexpected total number of messages sent to consumer",
3,
@@ -351,7 +349,7 @@ abstract class AbstractQueueTestBase ext
queueEntries.get(0).release();
- Thread.sleep(_queueRunnerWaitTime); // Work done by QueueRunner Thread
+ while(_consumerTarget.processPending());
assertEquals("Unexpected total number of messages sent to consumer",
4,
@@ -372,8 +370,15 @@ abstract class AbstractQueueTestBase ext
{
ServerMessage messageA = createMessage(new Long(24));
final CountDownLatch sendIndicator = new CountDownLatch(1);
- _consumerTarget = new MockConsumer()
+ _consumerTarget = new TestConsumerTarget()
{
+
+ @Override
+ public void notifyWork()
+ {
+ while(processPending());
+ }
+
@Override
public long send(ConsumerImpl consumer, MessageInstance entry, boolean batch)
{
@@ -397,7 +402,7 @@ abstract class AbstractQueueTestBase ext
/* Enqueue one message with expiration set for a short time in the future */
- final long expiration = System.currentTimeMillis() + _queueRunnerWaitTime;
+ final long expiration = System.currentTimeMillis() + 100L;
when(messageA.getExpiration()).thenReturn(expiration);
_queue.enqueue(messageA, postEnqueueAction, null);
@@ -472,7 +477,7 @@ abstract class AbstractQueueTestBase ext
_queue.enqueue(messageB, postEnqueueAction, null);
_queue.enqueue(messageC, postEnqueueAction, null);
- Thread.sleep(_queueRunnerWaitTime); // Work done by QueueRunner Thread
+ while(_consumerTarget.processPending());
assertEquals("Unexpected total number of messages sent to consumer",
3,
@@ -486,7 +491,7 @@ abstract class AbstractQueueTestBase ext
queueEntries.get(2).release();
queueEntries.get(0).release();
- Thread.sleep(_queueRunnerWaitTime); // Work done by QueueRunner Thread
+ while(_consumerTarget.processPending());
assertEquals("Unexpected total number of messages sent to consumer",
5,
@@ -508,8 +513,8 @@ abstract class AbstractQueueTestBase ext
ServerMessage messageA = createMessage(new Long(24));
ServerMessage messageB = createMessage(new Long(25));
- MockConsumer target1 = new MockConsumer();
- MockConsumer target2 = new MockConsumer();
+ TestConsumerTarget target1 = new TestConsumerTarget();
+ TestConsumerTarget target2 = new TestConsumerTarget();
QueueConsumer consumer1 = (QueueConsumer) _queue.addConsumer(target1, null, messageA.getClass(), "test",
@@ -529,7 +534,8 @@ abstract class AbstractQueueTestBase ext
_queue.enqueue(messageA, postEnqueueAction, null);
_queue.enqueue(messageB, postEnqueueAction, null);
- Thread.sleep(_queueRunnerWaitTime); // Work done by QueueRunner Thread
+ while(target1.processPending());
+ while(target2.processPending());
assertEquals("Unexpected total number of messages sent to both after enqueue",
2,
@@ -538,7 +544,8 @@ abstract class AbstractQueueTestBase ext
/* Now release the first message only, causing it to be requeued */
queueEntries.get(0).release();
- Thread.sleep(_queueRunnerWaitTime); // Work done by QueueRunner Thread
+ while(target1.processPending());
+ while(target2.processPending());
assertEquals("Unexpected total number of messages sent to both consumers after release",
3,
@@ -566,20 +573,13 @@ abstract class AbstractQueueTestBase ext
// Check sending a message ends up with the subscriber
_queue.enqueue(messageA, null, null);
- final long timeout = System.currentTimeMillis() + _queueRunnerWaitTime;
-
- QueueEntry lastSeen = null;
- while (timeout > System.currentTimeMillis() &&
- ((lastSeen = _consumer.getQueueContext().getLastSeenEntry()) == null || lastSeen.getMessage() == null))
- {
- Thread.sleep(10);
- }
+ while(_consumerTarget.processPending());
- assertEquals("Queue context did not see expected message within timeout",
+ assertEquals("Queue context did not see expected message",
messageA, _consumer.getQueueContext().getLastSeenEntry().getMessage());
// Check we cannot add a second subscriber to the queue
- MockConsumer subB = new MockConsumer();
+ TestConsumerTarget subB = new TestConsumerTarget();
Exception ex = null;
try
{
@@ -616,32 +616,6 @@ abstract class AbstractQueueTestBase ext
assertNotNull(ex);
}
-
- public void testResend() throws Exception
- {
- Long id = new Long(26);
- ServerMessage message = createMessage(id);
-
- _consumer = (QueueConsumer<?>) _queue.addConsumer(_consumerTarget, null, message.getClass(), "test",
- EnumSet.of(ConsumerImpl.Option.ACQUIRES, ConsumerImpl.Option.SEES_REQUEUES),
- 0);
-
- _queue.enqueue(message, new Action<MessageInstance>()
- {
- @Override
- public void performAction(final MessageInstance object)
- {
- QueueEntryImpl entry = (QueueEntryImpl) object;
- entry.setRedelivered();
- _consumer.resend(entry);
-
- }
- }, null);
-
-
-
- }
-
public void testGetFirstMessageId() throws Exception
{
// Create message
@@ -1062,14 +1036,7 @@ abstract class AbstractQueueTestBase ext
queue.enqueue(message,null, null);
}
- try
- {
- Thread.sleep(2000L);
- }
- catch (InterruptedException e)
- {
- _logger.error("Thread interrupted", e);
- }
+
}
/**
@@ -1113,7 +1080,7 @@ abstract class AbstractQueueTestBase ext
_queue = queue;
}
- public MockConsumer getConsumer()
+ public TestConsumerTarget getConsumer()
{
return _consumerTarget;
}
@@ -1230,14 +1197,9 @@ abstract class AbstractQueueTestBase ext
return _exchange;
}
- public MockConsumer getConsumerTarget()
+ public TestConsumerTarget getConsumerTarget()
{
return _consumerTarget;
}
- public long getQueueRunnerWaitTime()
- {
- return _queueRunnerWaitTime;
- }
-
}
Modified: qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/queue/MockMessageInstance.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/queue/MockMessageInstance.java?rev=1769837&r1=1769836&r2=1769837&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/queue/MockMessageInstance.java (original)
+++ qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/queue/MockMessageInstance.java Tue Nov 15 14:16:10 2016
@@ -162,12 +162,6 @@ public class MockMessageInstance impleme
{
}
- @Override
- public boolean resend()
- {
- return false;
- }
-
public void setRedelivered()
{
Modified: qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/queue/PriorityQueueTest.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/queue/PriorityQueueTest.java?rev=1769837&r1=1769836&r2=1769837&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/queue/PriorityQueueTest.java (original)
+++ qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/queue/PriorityQueueTest.java Tue Nov 15 14:16:10 2016
@@ -64,7 +64,8 @@ public class PriorityQueueTest extends A
// Register subscriber
queue.addConsumer(getConsumer(), null, null, "test", EnumSet.noneOf(ConsumerImpl.Option.class), 0);
- Thread.sleep(getQueueRunnerWaitTime());
+
+ while(getConsumer().processPending());
ArrayList<MessageInstance> msgs = getConsumer().getMessages();
try
Modified: qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueEntryListTest.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueEntryListTest.java?rev=1769837&r1=1769836&r2=1769837&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueEntryListTest.java (original)
+++ qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueEntryListTest.java Tue Nov 15 14:16:10 2016
@@ -20,6 +20,7 @@
*/
package org.apache.qpid.server.queue;
+import static org.apache.qpid.server.model.Queue.QUEUE_SCAVANGE_COUNT;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@@ -48,15 +49,11 @@ public class StandardQueueEntryListTest
private StandardQueueImpl _testQueue;
private StandardQueueEntryList _sqel;
- private static final String SCAVENGE_PROP = "qpid.queue.scavenge_count";
- private String oldScavengeValue = null;
private ConfiguredObjectFactoryImpl _factory;
@Override
protected void setUp()
{
- oldScavengeValue = System.setProperty(SCAVENGE_PROP, "9");
-
Map<String,Object> queueAttributes = new HashMap<String, Object>();
queueAttributes.put(Queue.ID, UUID.randomUUID());
queueAttributes.put(Queue.NAME, getName());
@@ -87,19 +84,6 @@ public class StandardQueueEntryListTest
}
@Override
- protected void tearDown()
- {
- if(oldScavengeValue != null)
- {
- System.setProperty(SCAVENGE_PROP, oldScavengeValue);
- }
- else
- {
- System.clearProperty(SCAVENGE_PROP);
- }
- }
-
- @Override
public StandardQueueEntryList getTestList()
{
return getTestList(false);
@@ -165,7 +149,9 @@ public class StandardQueueEntryListTest
public void testScavenge() throws Exception
{
- OrderedQueueEntryList sqel = new StandardQueueEntryList(mock(StandardQueue.class), new QueueStatistics());
+ StandardQueueImpl mockQueue = mock(StandardQueueImpl.class);
+ when(mockQueue.getContextValue(Integer.class, QUEUE_SCAVANGE_COUNT)).thenReturn(9);
+ OrderedQueueEntryList sqel = new StandardQueueEntryList(mockQueue, new QueueStatistics());
ConcurrentMap<Integer,QueueEntry> entriesMap = new ConcurrentHashMap<Integer,QueueEntry>();
Modified: qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueTest.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueTest.java?rev=1769837&r1=1769836&r2=1769837&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueTest.java (original)
+++ qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueTest.java Tue Nov 15 14:16:10 2016
@@ -20,7 +20,6 @@
*/
package org.apache.qpid.server.queue;
-import java.security.AccessController;
import java.util.Arrays;
import java.util.EnumSet;
import java.util.HashMap;
@@ -31,7 +30,7 @@ import java.util.concurrent.TimeUnit;
import org.apache.qpid.server.consumer.ConsumerImpl;
import org.apache.qpid.server.consumer.ConsumerTarget;
-import org.apache.qpid.server.consumer.MockConsumer;
+import org.apache.qpid.server.consumer.TestConsumerTarget;
import org.apache.qpid.server.message.MessageInstance;
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.model.LifetimePolicy;
@@ -66,74 +65,6 @@ public class StandardQueueTest extends A
getQueue().isDeleted());
}
- public void testActiveConsumerCount() throws Exception
- {
-
- Map<String,Object> queueAttributes = new HashMap<>();
- queueAttributes.put(Queue.NAME, "testActiveConsumerCount");
- queueAttributes.put(Queue.OWNER, "testOwner");
- final StandardQueueImpl queue = new StandardQueueImpl(queueAttributes, getVirtualHost());
- queue.open();
- //verify adding an active consumer increases the count
- final MockConsumer consumer1 = new MockConsumer();
- consumer1.setActive(true);
- consumer1.setState(ConsumerTarget.State.ACTIVE);
- assertEquals("Unexpected active consumer count", 0, queue.getConsumerCountWithCredit());
- queue.addConsumer(consumer1,
- null,
- createMessage(-1l).getClass(),
- "test",
- EnumSet.of(ConsumerImpl.Option.ACQUIRES,
- ConsumerImpl.Option.SEES_REQUEUES), 0);
- assertEquals("Unexpected active consumer count", 1, queue.getConsumerCountWithCredit());
-
- //verify adding an inactive consumer doesn't increase the count
- final MockConsumer consumer2 = new MockConsumer();
- consumer2.setActive(false);
- consumer2.setState(ConsumerTarget.State.SUSPENDED);
- assertEquals("Unexpected active consumer count", 1, queue.getConsumerCountWithCredit());
- queue.addConsumer(consumer2,
- null,
- createMessage(-1l).getClass(),
- "test",
- EnumSet.of(ConsumerImpl.Option.ACQUIRES,
- ConsumerImpl.Option.SEES_REQUEUES), 0);
- assertEquals("Unexpected active consumer count", 1, queue.getConsumerCountWithCredit());
-
- //verify behaviour in face of expected state changes:
-
- //verify a consumer going suspended->active increases the count
- consumer2.setState(ConsumerTarget.State.ACTIVE);
- assertEquals("Unexpected active consumer count", 2, queue.getConsumerCountWithCredit());
-
- //verify a consumer going active->suspended decreases the count
- consumer2.setState(ConsumerTarget.State.SUSPENDED);
- assertEquals("Unexpected active consumer count", 1, queue.getConsumerCountWithCredit());
-
- //verify a consumer going suspended->closed doesn't change the count
- consumer2.setState(ConsumerTarget.State.CLOSED);
- assertEquals("Unexpected active consumer count", 1, queue.getConsumerCountWithCredit());
-
- //verify a consumer going active->active doesn't change the count
- consumer1.setState(ConsumerTarget.State.ACTIVE);
- assertEquals("Unexpected active consumer count", 1, queue.getConsumerCountWithCredit());
-
- consumer1.setState(ConsumerTarget.State.SUSPENDED);
- assertEquals("Unexpected active consumer count", 0, queue.getConsumerCountWithCredit());
-
- //verify a consumer going suspended->suspended doesn't change the count
- consumer1.setState(ConsumerTarget.State.SUSPENDED);
- assertEquals("Unexpected active consumer count", 0, queue.getConsumerCountWithCredit());
-
- consumer1.setState(ConsumerTarget.State.ACTIVE);
- assertEquals("Unexpected active consumer count", 1, queue.getConsumerCountWithCredit());
-
- //verify a consumer going active->closed decreases the count
- consumer1.setState(ConsumerTarget.State.CLOSED);
- assertEquals("Unexpected active consumer count", 0, queue.getConsumerCountWithCredit());
-
- }
-
/**
* Tests that entry in dequeued state are not enqueued and not delivered to consumer
@@ -144,7 +75,7 @@ public class StandardQueueTest extends A
AbstractQueue queue = new DequeuedQueue(getVirtualHost());
queue.create();
// create a consumer
- MockConsumer consumer = new MockConsumer();
+ TestConsumerTarget consumer = new TestConsumerTarget();
// register consumer
queue.addConsumer(consumer,
@@ -156,6 +87,7 @@ public class StandardQueueTest extends A
// put test messages into a queue
putGivenNumberOfMessages(queue, 4);
+ while(consumer.processPending());
// assert received messages
List<MessageInstance> messages = consumer.getMessages();
@@ -167,8 +99,7 @@ public class StandardQueueTest extends A
}
/**
- * Tests whether dequeued entry is sent to subscriber in result of
- * invocation of {@link AbstractQueue#processQueue(QueueRunner)}
+ * Tests whether dequeued entry is sent to subscriber
*/
public void testProcessQueueWithDequeuedEntry() throws Exception
{
@@ -193,8 +124,15 @@ public class StandardQueueTest extends A
final CountDownLatch latch = new CountDownLatch(messageNumber -1);
// create a consumer
- MockConsumer consumer = new MockConsumer()
+ TestConsumerTarget consumer = new TestConsumerTarget()
{
+
+ @Override
+ public void notifyWork()
+ {
+ while(processPending());
+ }
+
/**
* Send a message and decrement latch
* @param consumer
@@ -217,14 +155,6 @@ public class StandardQueueTest extends A
EnumSet.of(ConsumerImpl.Option.ACQUIRES,
ConsumerImpl.Option.SEES_REQUEUES), 0);
- // process queue
- testQueue.processQueue(new QueueRunner(testQueue, AccessController.getContext())
- {
- public void run()
- {
- // do nothing
- }
- });
// wait up to 1 minute for message receipt
try
Modified: qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/security/TrustStoreMessageSourceTest.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/security/TrustStoreMessageSourceTest.java?rev=1769837&r1=1769836&r2=1769837&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/security/TrustStoreMessageSourceTest.java (original)
+++ qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/security/TrustStoreMessageSourceTest.java Tue Nov 15 14:16:10 2016
@@ -19,6 +19,7 @@
package org.apache.qpid.server.security;
import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertNotNull;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyBoolean;
import static org.mockito.Mockito.mock;
@@ -42,11 +43,14 @@ import org.apache.qpid.server.consumer.C
import org.apache.qpid.server.consumer.ConsumerTarget;
import org.apache.qpid.server.message.MessageInstance;
import org.apache.qpid.server.message.ServerMessage;
+import org.apache.qpid.server.model.Consumer;
import org.apache.qpid.server.model.State;
import org.apache.qpid.server.model.TrustStore;
import org.apache.qpid.server.model.VirtualHost;
+import org.apache.qpid.server.queue.AbstractQueue;
import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.store.TestMemoryMessageStore;
+import org.apache.qpid.server.virtualhost.AbstractSystemMessageSource;
import org.apache.qpid.test.utils.QpidTestCase;
@@ -78,11 +82,10 @@ public class TrustStoreMessageSourceTest
final ConsumerTarget target = mock(ConsumerTarget.class);
when(target.allocateCredit(any(ServerMessage.class))).thenReturn(true);
- _trustStoreMessageSource.addConsumer(target, null, ServerMessage.class, getTestName(), options, 0);
-
- ArgumentCaptor<MessageInstance> argumentCaptor = ArgumentCaptor.forClass(MessageInstance.class);
- verify(target).send(any(ConsumerImpl.class), argumentCaptor.capture(), anyBoolean());
- final ServerMessage message = argumentCaptor.getValue().getMessage();
+ ConsumerImpl consumer = _trustStoreMessageSource.addConsumer(target, null, ServerMessage.class, getTestName(), options, 0);
+ final AbstractQueue.MessageContainer messageContainer = consumer.pullMessage();
+ assertNotNull("Could not pull message of TrustStore", messageContainer);
+ final ServerMessage message = messageContainer._messageInstance.getMessage();
assertCertificates(getCertificatesFromMessage(message));
}
Modified: qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/transport/TCPandSSLTransportTest.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/transport/TCPandSSLTransportTest.java?rev=1769837&r1=1769836&r2=1769837&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/transport/TCPandSSLTransportTest.java (original)
+++ qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/transport/TCPandSSLTransportTest.java Tue Nov 15 14:16:10 2016
@@ -149,7 +149,6 @@ public class TCPandSSLTransportTest exte
when(port.getNumberOfSelectors()).thenReturn(1);
when(port.getSSLContext()).thenReturn(sslContext);
when(port.getContextValue(Long.class, AmqpPort.PORT_AMQP_THREAD_POOL_KEEP_ALIVE_TIMEOUT)).thenReturn(1l);
- when(port.getContextValue(Long.class, AmqpPort.PORT_AMQP_OUTBOUND_MESSAGE_BUFFER_SIZE)).thenReturn(AmqpPort.DEFAULT_PORT_AMQP_OUTBOUND_MESSAGE_BUFFER_SIZE);
when(port.getContextValue(Integer.class, AmqpPort.PORT_AMQP_ACCEPT_BACKLOG)).thenReturn(AmqpPort.DEFAULT_PORT_AMQP_ACCEPT_BACKLOG);
when(port.getProtocolHandshakeTimeout()).thenReturn(AmqpPort.DEFAULT_PROTOCOL_HANDSHAKE_TIMEOUT);
ObjectMapper mapper = new ObjectMapper();
Modified: qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/virtualhost/VirtualHostPropertiesNodeTest.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/virtualhost/VirtualHostPropertiesNodeTest.java?rev=1769837&r1=1769836&r2=1769837&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/virtualhost/VirtualHostPropertiesNodeTest.java (original)
+++ qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/virtualhost/VirtualHostPropertiesNodeTest.java Tue Nov 15 14:16:10 2016
@@ -31,6 +31,7 @@ import org.apache.qpid.server.consumer.C
import org.apache.qpid.server.message.MessageInstance;
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.model.VirtualHost;
+import org.apache.qpid.server.queue.AbstractQueue;
import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.store.TestMemoryMessageStore;
import org.apache.qpid.test.utils.QpidTestCase;
@@ -56,7 +57,8 @@ public class VirtualHostPropertiesNodeTe
final ConsumerTarget target = mock(ConsumerTarget.class);
when(target.allocateCredit(any(ServerMessage.class))).thenReturn(true);
- _virtualHostPropertiesNode.addConsumer(target, null, ServerMessage.class, getTestName(), options, 0);
- verify(target).send(any(ConsumerImpl.class), any(MessageInstance.class), anyBoolean());
+ ConsumerImpl consumer = _virtualHostPropertiesNode.addConsumer(target, null, ServerMessage.class, getTestName(), options, 0);
+ final AbstractQueue.MessageContainer messageContainer = consumer.pullMessage();
+ assertNotNull("Could not pull message from VirtualHostPropertyNode", messageContainer);
}
}
Modified: qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/AMQPConnection_0_10.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/AMQPConnection_0_10.java?rev=1769837&r1=1769836&r2=1769837&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/AMQPConnection_0_10.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/AMQPConnection_0_10.java Tue Nov 15 14:16:10 2016
@@ -27,6 +27,8 @@ import java.security.PrivilegedAction;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
@@ -55,7 +57,7 @@ import org.apache.qpid.transport.Constan
import org.apache.qpid.server.transport.AggregateTicker;
-public class AMQPConnection_0_10 extends AbstractAMQPConnection<AMQPConnection_0_10>
+public class AMQPConnection_0_10 extends AbstractAMQPConnection<AMQPConnection_0_10, ServerConnection>
{
private static final Logger _logger = LoggerFactory.getLogger(AMQPConnection_0_10.class);
private final ServerInputHandler _inputHandler;
@@ -68,6 +70,9 @@ public class AMQPConnection_0_10 extends
private final AtomicReference<Action<ProtocolEngine>> _workListener = new AtomicReference<>();
private ServerDisassembler _disassembler;
+ private final Set<AMQSessionModel<?>> _sessionsWithWork =
+ Collections.newSetFromMap(new ConcurrentHashMap<AMQSessionModel<?>, Boolean>());
+
public AMQPConnection_0_10(final Broker<?> broker,
ServerNetworkConnection network,
@@ -251,7 +256,7 @@ public class AMQPConnection_0_10 extends
{
if (isIOThread())
{
- return _connection.processPendingIterator();
+ return _connection.processPendingIterator(_sessionsWithWork);
}
else
{
@@ -277,6 +282,13 @@ public class AMQPConnection_0_10 extends
}
}
+ @Override
+ public void notifyWork(final AMQSessionModel<?> sessionModel)
+ {
+ _sessionsWithWork.add(sessionModel);
+ notifyWork();
+ }
+
public void clearWork()
{
_stateChanged.set(false);
@@ -294,6 +306,7 @@ public class AMQPConnection_0_10 extends
public void sendConnectionCloseAsync(final AMQConstant cause, final String message)
{
+ stopConnection();
_connection.sendConnectionCloseAsync(cause, message);
}
@@ -303,6 +316,12 @@ public class AMQPConnection_0_10 extends
_connection.closeSessionAsync((ServerSession) session, cause, message);
}
+ @Override
+ protected void addAsyncTask(final Action<? super ServerConnection> action)
+ {
+ _connection.addAsyncTask(action);
+ }
+
public void block()
{
_connection.block();
Modified: qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java?rev=1769837&r1=1769836&r2=1769837&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java Tue Nov 15 14:16:10 2016
@@ -32,7 +32,6 @@ import org.slf4j.LoggerFactory;
import org.apache.qpid.bytebuffer.QpidByteBuffer;
import org.apache.qpid.server.consumer.AbstractConsumerTarget;
import org.apache.qpid.server.consumer.ConsumerImpl;
-import org.apache.qpid.server.flow.FlowCreditManager;
import org.apache.qpid.server.logging.EventLogger;
import org.apache.qpid.server.logging.messages.ChannelMessages;
import org.apache.qpid.server.message.MessageInstance;
@@ -62,13 +61,12 @@ import org.apache.qpid.transport.Option;
import org.apache.qpid.util.ByteBufferUtils;
import org.apache.qpid.util.GZIPUtils;
-public class ConsumerTarget_0_10 extends AbstractConsumerTarget implements FlowCreditManager.FlowCreditManagerListener
+public class ConsumerTarget_0_10 extends AbstractConsumerTarget
{
private static final Logger LOGGER = LoggerFactory.getLogger(ConsumerTarget_0_10.class);
private static final Option[] BATCHED = new Option[] { Option.BATCH };
- private final AtomicBoolean _deleted = new AtomicBoolean(false);
private final String _name;
private final String _targetAddress;
@@ -116,14 +114,13 @@ public class ConsumerTarget_0_10 extends
Map<String, Object> arguments,
boolean multiQueue)
{
- super(State.SUSPENDED, isPullOnly(arguments), multiQueue, session.getAMQPConnection());
+ super(multiQueue, session.getAMQPConnection());
_session = session;
_postIdSettingAction = new AddMessageDispositionListenerAction(session);
_acceptMode = acceptMode;
_acquireMode = acquireMode;
_creditManager = creditManager;
_flowMode = flowMode;
- _creditManager.addStateListener(this);
_name = name;
if(arguments != null && arguments.containsKey("local-address"))
{
@@ -135,41 +132,15 @@ public class ConsumerTarget_0_10 extends
}
}
- private static boolean isPullOnly(Map<String, Object> arguments)
- {
- return arguments != null
- && arguments.containsKey(PULL_ONLY_CONSUMER)
- && Boolean.valueOf(String.valueOf(arguments.get(PULL_ONLY_CONSUMER)));
- }
-
- @Override
- public boolean isFlowSuspended()
- {
- return getState()!=State.ACTIVE || _deleted.get() || _session.isClosing() || _session.getAMQPConnection().isConnectionStopped();
- // TODO check for Session suspension
- }
-
@Override
- protected void doCloseInternal()
+ public void updateNotifyWorkDesired()
{
- _creditManager.removeListener(this);
- }
+ final AMQPConnection_0_10 amqpConnection = _session.getAMQPConnection();
- public void creditStateChanged(boolean hasCredit)
- {
+ boolean state = !amqpConnection.isTransportBlockedForWriting()
+ && getCreditManager().hasCredit();
- if(hasCredit)
- {
- if(!updateState(State.SUSPENDED, State.ACTIVE))
- {
- // this is a hack to get round the issue of increasing bytes credit
- notifyCurrentState();
- }
- }
- else
- {
- updateState(State.ACTIVE, State.SUSPENDED);
- }
+ setNotifyWorkDesired(state);
}
public String getName()
@@ -180,6 +151,7 @@ public class ConsumerTarget_0_10 extends
public void transportStateChanged()
{
_creditManager.restoreCredit(0, 0);
+ updateNotifyWorkDesired();
}
public static class AddMessageDispositionListenerAction implements Runnable
@@ -404,11 +376,12 @@ public class ConsumerTarget_0_10 extends
public void flushCreditState(boolean strict)
{
- if(strict || !isFlowSuspended() || _deferredMessageCredit >= 200
- || !(_creditManager instanceof WindowCreditManager)
- || ((WindowCreditManager)_creditManager).getMessageCreditLimit() < 400 )
+ if(strict || !isSuspended() || _deferredMessageCredit >= 200
+ || !(_creditManager instanceof WindowCreditManager)
+ || ((WindowCreditManager)_creditManager).getMessageCreditLimit() < 400 )
{
- _creditManager.restoreCredit(_deferredMessageCredit, _deferredSizeCredit);
+ restoreCredit(_deferredMessageCredit, _deferredSizeCredit);
+
_deferredMessageCredit = 0;
_deferredSizeCredit = 0l;
}
@@ -521,21 +494,26 @@ public class ConsumerTarget_0_10 extends
return (maxDeliveryLimit > 0 && entry.getDeliveryCount() >= maxDeliveryLimit);
}
- public void queueDeleted()
+ public boolean allocateCredit(ServerMessage message)
{
- _deleted.set(true);
+ boolean creditAllocated = _creditManager.useCreditForMessage(message.getSize());
+ updateNotifyWorkDesired();
+ return creditAllocated;
}
- public boolean allocateCredit(ServerMessage message)
+ public void restoreCredit(ServerMessage message)
{
- return _creditManager.useCreditForMessage(message.getSize());
+ restoreCredit(1, message.getSize());
}
- public void restoreCredit(ServerMessage message)
+ void restoreCredit(int count, long size)
{
- _creditManager.restoreCredit(1, message.getSize());
+ _creditManager.restoreCredit(count, size);
+ updateNotifyWorkDesired();
}
+
+
public FlowCreditManager_0_10 getCreditManager()
{
return _creditManager;
@@ -543,25 +521,13 @@ public class ConsumerTarget_0_10 extends
public void stop()
{
- try
- {
- getSendLock();
-
- updateState(State.ACTIVE, State.SUSPENDED);
- _stopped.set(true);
- FlowCreditManager_0_10 creditManager = getCreditManager();
- creditManager.clearCredit();
- }
- finally
- {
- releaseSendLock();
- }
+ getCreditManager().clearCredit();
+ updateNotifyWorkDesired();
}
public void addCredit(MessageCreditUnit unit, long value)
{
FlowCreditManager_0_10 creditManager = getCreditManager();
-
switch (unit)
{
case MESSAGE:
@@ -572,22 +538,11 @@ public class ConsumerTarget_0_10 extends
creditManager.addCredit(0l, value);
break;
}
-
- _stopped.set(false);
-
- if(creditManager.hasCredit())
- {
- updateState(State.SUSPENDED, State.ACTIVE);
- }
-
+ updateNotifyWorkDesired();
}
public void setFlowMode(MessageFlowMode flowMode)
{
-
-
- _creditManager.removeListener(this);
-
switch(flowMode)
{
case CREDIT:
@@ -601,24 +556,18 @@ public class ConsumerTarget_0_10 extends
throw new ConnectionScopedRuntimeException("Unknown message flow mode: " + flowMode);
}
_flowMode = flowMode;
- updateState(State.ACTIVE, State.SUSPENDED);
-
- _creditManager.addStateListener(this);
-
+ updateNotifyWorkDesired();
}
- public boolean isStopped()
+ public boolean isFlowModeChangeAllowed()
{
- return _stopped.get();
+ return _creditManager.hasNeitherCredit();
}
public void flush()
{
flushCreditState(true);
- for(ConsumerImpl consumer : getConsumers())
- {
- consumer.flush();
- }
+ while(sendNextMessage());
stop();
}
@@ -657,30 +606,6 @@ public class ConsumerTarget_0_10 extends
}
@Override
- protected void processClosed()
- {
-
- }
-
- @Override
- protected void processStateChanged()
- {
-
- }
-
- @Override
- protected boolean hasStateChanged()
- {
- return false;
- }
-
- @Override
- protected boolean hasClosed()
- {
- return false;
- }
-
- @Override
public String toString()
{
return "ConsumerTarget_0_10[name=" + _name + ", session=" + _session.toLogString() + "]";
Modified: qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/CreditCreditManager.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/CreditCreditManager.java?rev=1769837&r1=1769836&r2=1769837&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/CreditCreditManager.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/CreditCreditManager.java Tue Nov 15 14:16:10 2016
@@ -22,9 +22,8 @@ package org.apache.qpid.server.protocol.
import org.apache.qpid.server.transport.ProtocolEngine;
-import org.apache.qpid.server.flow.AbstractFlowCreditManager;
-public class CreditCreditManager extends AbstractFlowCreditManager implements FlowCreditManager_0_10
+public class CreditCreditManager implements FlowCreditManager_0_10
{
private final ProtocolEngine _protocolEngine;
private volatile long _bytesCredit;
@@ -35,13 +34,10 @@ public class CreditCreditManager extends
_protocolEngine = protocolEngine;
_bytesCredit = bytesCredit;
_messageCredit = messageCredit;
- setSuspended(!hasCredit());
-
}
public synchronized void restoreCredit(final long messageCredit, final long bytesCredit)
{
- setSuspended(!hasCredit());
}
@@ -52,26 +48,16 @@ public class CreditCreditManager extends
_messageCredit += messageCredit;
}
- boolean notifyIncrease = false;
-
if(_bytesCredit >= 0L && bytesCredit > 0L)
{
- notifyIncrease = _messageCredit != 0L && bytesCredit > 0L;
_bytesCredit += bytesCredit;
}
-
- if(!setSuspended(!hasCredit()) && notifyIncrease)
- {
- notifyIncreaseBytesCredit();
- }
-
}
public synchronized void clearCredit()
{
_bytesCredit = 0l;
_messageCredit = 0l;
- setSuspended(true);
}
@@ -81,11 +67,16 @@ public class CreditCreditManager extends
return (_bytesCredit != 0L && _messageCredit != 0L && !_protocolEngine.isTransportBlockedForWriting());
}
+ @Override
+ public boolean hasNeitherCredit()
+ {
+ return _bytesCredit == 0L && _messageCredit == 0L;
+ }
+
public synchronized boolean useCreditForMessage(long msgSize)
{
if (_protocolEngine.isTransportBlockedForWriting())
{
- setSuspended(true);
return false;
}
else if(_messageCredit >= 0L)
@@ -112,7 +103,6 @@ public class CreditCreditManager extends
}
else
{
- setSuspended(true);
return false;
}
}
Modified: qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/FlowCreditManager_0_10.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/FlowCreditManager_0_10.java?rev=1769837&r1=1769836&r2=1769837&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/FlowCreditManager_0_10.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/FlowCreditManager_0_10.java Tue Nov 15 14:16:10 2016
@@ -24,7 +24,9 @@ import org.apache.qpid.server.flow.FlowC
public interface FlowCreditManager_0_10 extends FlowCreditManager
{
- public void addCredit(long count, long bytes);
+ void addCredit(long count, long bytes);
void clearCredit();
+
+ boolean hasNeitherCredit();
}
Modified: qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageAcceptCompletionListener.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageAcceptCompletionListener.java?rev=1769837&r1=1769836&r2=1769837&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageAcceptCompletionListener.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageAcceptCompletionListener.java Tue Nov 15 14:16:10 2016
@@ -56,7 +56,7 @@ public class MessageAcceptCompletionList
{
if(_restoreCredit)
{
- _sub.getCreditManager().restoreCredit(1l, _messageSize);
+ _sub.restoreCredit(1, _messageSize);
}
_session.acknowledge(_consumer, _sub, _entry);
Modified: qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java?rev=1769837&r1=1769836&r2=1769837&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java Tue Nov 15 14:16:10 2016
@@ -27,13 +27,12 @@ import java.security.AccessControlContex
import java.security.AccessController;
import java.security.Principal;
import java.security.PrivilegedAction;
-import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
-import java.util.List;
import java.util.NoSuchElementException;
import java.util.Queue;
+import java.util.Set;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicLong;
@@ -43,13 +42,13 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.qpid.protocol.AMQConstant;
-import org.apache.qpid.server.model.NamedAddressSpace;
-import org.apache.qpid.server.protocol.ConnectionClosingTicker;
import org.apache.qpid.server.logging.EventLogger;
import org.apache.qpid.server.model.Broker;
+import org.apache.qpid.server.model.NamedAddressSpace;
import org.apache.qpid.server.model.Transport;
import org.apache.qpid.server.model.port.AmqpPort;
import org.apache.qpid.server.protocol.AMQSessionModel;
+import org.apache.qpid.server.protocol.ConnectionClosingTicker;
import org.apache.qpid.server.transport.ServerNetworkConnection;
import org.apache.qpid.server.util.Action;
import org.apache.qpid.server.util.ServerScopedRuntimeException;
@@ -133,7 +132,7 @@ public class ServerConnection extends Co
getAmqpConnection().getAggregateTicker().addTicker(new ConnectionClosingTicker(System.currentTimeMillis() + CLOSE_OK_TIMEOUT, (ServerNetworkConnection) getNetworkConnection()));
// trigger a wakeup to ensure the ticker will be taken into account
- notifyWork();
+ getAmqpConnection().notifyWork();
}
}
@@ -468,10 +467,10 @@ public class ServerConnection extends Co
super.doHeartBeat();
}
- private void addAsyncTask(final Action<ServerConnection> action)
+ void addAsyncTask(final Action<? super ServerConnection> action)
{
_asyncTaskList.add(action);
- notifyWork();
+ getAmqpConnection().notifyWork();
}
public int getMessageCompressionThreshold()
@@ -492,30 +491,26 @@ public class ServerConnection extends Co
}
}
- public void notifyWork()
- {
- _amqpConnection.notifyWork();
- }
-
- public Iterator<Runnable> processPendingIterator()
+ public Iterator<Runnable> processPendingIterator(final Set<AMQSessionModel<?>> sessionsWithWork)
{
- return new ProcessPendingIterator();
+ return new ProcessPendingIterator(sessionsWithWork);
}
private class ProcessPendingIterator implements Iterator<Runnable>
{
- private final Collection<? extends ServerSession> _sessionsWithPending;
+ private final Collection<AMQSessionModel<?>> _sessionsWithPending;
private Iterator<? extends AMQSessionModel<?>> _sessionIterator;
- private ProcessPendingIterator()
+ private ProcessPendingIterator(final Set<AMQSessionModel<?>> sessionsWithWork)
{
- _sessionsWithPending = new ArrayList<>(getSessionModels());
+ _sessionsWithPending = sessionsWithWork;
_sessionIterator = _sessionsWithPending.iterator();
}
@Override
public boolean hasNext()
{
- return !(_sessionsWithPending.isEmpty() && _asyncTaskList.isEmpty());
+ return (!_sessionsWithPending.isEmpty() && !isClosing() && !_amqpConnection.isConnectionStopped())
+ || !_asyncTaskList.isEmpty();
}
@Override
@@ -523,22 +518,39 @@ public class ServerConnection extends Co
{
if(!_sessionsWithPending.isEmpty())
{
- if(!_sessionIterator.hasNext())
+ if(isClosing() || _amqpConnection.isConnectionStopped())
{
- _sessionIterator = _sessionsWithPending.iterator();
+ // in case the connection was marked as closing between a call to hasNext() and
+ // a subsequent call to next()
+ return new Runnable()
+ {
+ @Override
+ public void run()
+ {
+
+ }
+ };
}
- final AMQSessionModel<?> session = _sessionIterator.next();
- return new Runnable()
+ else
{
- @Override
- public void run()
+ if (!_sessionIterator.hasNext())
+ {
+ _sessionIterator = _sessionsWithPending.iterator();
+ }
+ final AMQSessionModel<?> session = _sessionIterator.next();
+ return new Runnable()
{
- if(!session.processPending())
+ @Override
+ public void run()
{
_sessionIterator.remove();
+ if (session.processPending())
+ {
+ _sessionsWithPending.add(session);
+ }
}
- }
- };
+ };
+ }
}
else if(!_asyncTaskList.isEmpty())
{
@@ -556,6 +568,7 @@ public class ServerConnection extends Co
{
throw new NoSuchElementException();
}
+
}
@Override
Modified: qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java?rev=1769837&r1=1769836&r2=1769837&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java Tue Nov 15 14:16:10 2016
@@ -143,8 +143,11 @@ public class ServerSession extends Sessi
private long _blockTime;
private long _blockingTimeout;
private boolean _wireBlockingState;
- private final List<ConsumerTarget> _consumersWithPendingWork = new ArrayList<>();
- private final PublishAuthorisationCache _publishAuthCahe;
+ private final Set<ConsumerTarget> _consumersWithPendingWork =
+ Collections.newSetFromMap(new ConcurrentHashMap<ConsumerTarget, Boolean>());
+ private Iterator<ConsumerTarget> _processPendingIterator;
+
+ private final PublishAuthorisationCache _publishAuthCache;
public static interface MessageDispositionChangeListener
@@ -206,10 +209,10 @@ public class ServerSession extends Sessi
}
_blockingTimeout = serverConnection.getBroker().getContextValue(Long.class, Broker.CHANNEL_FLOW_CONTROL_ENFORCEMENT_TIMEOUT);
- _maxUncommittedInMemorySize = getConnection().getAmqpConnection().getContextProvider().getContextValue(Long.class, org.apache.qpid.server.model.Connection.MAX_UNCOMMITTED_IN_MEMORY_SIZE);
- _publishAuthCahe = new PublishAuthorisationCache(_token,
- amqpConnection.getContextValue(Long.class, org.apache.qpid.server.model.Session.PRODUCER_AUTH_CACHE_TIMEOUT),
- amqpConnection.getContextValue(Integer.class, org.apache.qpid.server.model.Session.PRODUCER_AUTH_CACHE_SIZE));
+ _maxUncommittedInMemorySize = getAMQPConnection().getContextProvider().getContextValue(Long.class, org.apache.qpid.server.model.Connection.MAX_UNCOMMITTED_IN_MEMORY_SIZE);
+ _publishAuthCache = new PublishAuthorisationCache(_token,
+ amqpConnection.getContextValue(Long.class, org.apache.qpid.server.model.Session.PRODUCER_AUTH_CACHE_TIMEOUT),
+ amqpConnection.getContextValue(Integer.class, org.apache.qpid.server.model.Session.PRODUCER_AUTH_CACHE_SIZE));
}
@@ -226,7 +229,7 @@ public class ServerSession extends Sessi
if (state == State.OPEN)
{
- getConnection().getAmqpConnection().getEventLogger().message(ChannelMessages.CREATE());
+ getAMQPConnection().getEventLogger().message(ChannelMessages.CREATE());
}
}
else
@@ -275,7 +278,7 @@ public class ServerSession extends Sessi
final boolean immediate,
final long currentTime)
{
- _publishAuthCahe.authorisePublish(destination, routingKey, immediate, currentTime);
+ _publishAuthCache.authorisePublish(destination, routingKey, immediate, currentTime);
}
@Override
@@ -320,8 +323,8 @@ public class ServerSession extends Sessi
handle.flowToDisk();
if(!_uncommittedMessages.isEmpty() || _uncommittedMessageSize == handle.getMetaData().getContentSize())
{
- getConnection().getAmqpConnection().getEventLogger()
- .message(_logSubject, ChannelMessages.LARGE_TRANSACTION_WARN(_uncommittedMessageSize));
+ getAMQPConnection().getEventLogger()
+ .message(_logSubject, ChannelMessages.LARGE_TRANSACTION_WARN(_uncommittedMessageSize));
}
if(!_uncommittedMessages.isEmpty())
@@ -528,7 +531,7 @@ public class ServerSession extends Sessi
{
operationalLoggingMessage = ChannelMessages.CLOSE();
}
- getConnection().getAmqpConnection().getEventLogger().message(getLogSubject(), operationalLoggingMessage);
+ getAMQPConnection().getEventLogger().message(getLogSubject(), operationalLoggingMessage);
}
@Override
@@ -853,10 +856,10 @@ public class ServerSession extends Sessi
if(_blocking.compareAndSet(false,true))
{
- getConnection().getAmqpConnection().getEventLogger().message(_logSubject, ChannelMessages.FLOW_ENFORCED(name));
+ getAMQPConnection().getEventLogger().message(_logSubject, ChannelMessages.FLOW_ENFORCED(name));
if(getState() == State.OPEN)
{
- getConnection().notifyWork();
+ getAMQPConnection().notifyWork(this);
}
}
@@ -881,8 +884,8 @@ public class ServerSession extends Sessi
{
if(_blocking.compareAndSet(true,false) && !isClosing())
{
- getConnection().getAmqpConnection().getEventLogger().message(_logSubject, ChannelMessages.FLOW_REMOVED());
- getConnection().notifyWork();
+ getAMQPConnection().getEventLogger().message(_logSubject, ChannelMessages.FLOW_REMOVED());
+ getAMQPConnection().notifyWork(this);
}
}
}
@@ -1185,7 +1188,7 @@ public class ServerSession extends Sessi
@Override
public boolean processPending()
{
- if (!getAMQPConnection().isIOThread())
+ if (!getAMQPConnection().isIOThread() || isClosing())
{
return false;
}
@@ -1206,40 +1209,31 @@ public class ServerSession extends Sessi
_blockTime = desiredBlockingState ? System.currentTimeMillis() : 0;
}
- boolean consumerListNeedsRefreshing;
- if(_consumersWithPendingWork.isEmpty())
- {
- _consumersWithPendingWork.addAll(getSubscriptions());
- consumerListNeedsRefreshing = false;
- }
- else
+ if (!_consumersWithPendingWork.isEmpty())
{
- consumerListNeedsRefreshing = true;
- }
-
- // QPID-7447: prevent unnecessary allocation of empty iterator
- Iterator<ConsumerTarget> iter = _consumersWithPendingWork.isEmpty() ? Collections.<ConsumerTarget>emptyIterator() : _consumersWithPendingWork.iterator();
+ if (_processPendingIterator == null || !_processPendingIterator.hasNext())
+ {
+ _processPendingIterator = _consumersWithPendingWork.iterator();
+ }
- boolean consumerHasMoreWork = false;
- while(iter.hasNext())
- {
- final ConsumerTarget target = iter.next();
- iter.remove();
- if(target.hasPendingWork())
+ if (_processPendingIterator.hasNext())
{
- consumerHasMoreWork = true;
- target.processPending();
- break;
+ ConsumerTarget target = _processPendingIterator.next();
+ _processPendingIterator.remove();
+ if (target.processPending())
+ {
+ _consumersWithPendingWork.add(target);
+ }
}
}
- return consumerHasMoreWork || consumerListNeedsRefreshing;
+ return !_consumersWithPendingWork.isEmpty();
}
@Override
public void addTicker(final Ticker ticker)
{
- getConnection().getAmqpConnection().getAggregateTicker().addTicker(ticker);
+ getAMQPConnection().getAggregateTicker().addTicker(ticker);
// trigger a wakeup to ensure the ticker will be taken into account
getAMQPConnection().notifyWork();
}
@@ -1247,36 +1241,15 @@ public class ServerSession extends Sessi
@Override
public void removeTicker(final Ticker ticker)
{
- getConnection().getAmqpConnection().getAggregateTicker().removeTicker(ticker);
- }
-
- @Override
- public void notifyConsumerTargetCurrentStates()
- {
- Collection<ConsumerTarget_0_10> consumerTargets = getSubscriptions();
- for(ConsumerTarget_0_10 consumerTarget: consumerTargets)
- {
- if(!consumerTarget.isPullOnly())
- {
- consumerTarget.notifyCurrentState();
- }
- }
+ getAMQPConnection().getAggregateTicker().removeTicker(ticker);
}
@Override
- public void ensureConsumersNoticedStateChange()
+ public void notifyWork(final ConsumerTarget target)
{
- Collection<ConsumerTarget_0_10> consumerTargets = getSubscriptions();
- for(ConsumerTarget_0_10 consumerTarget: consumerTargets)
+ if(_consumersWithPendingWork.add(target))
{
- try
- {
- consumerTarget.getSendLock();
- }
- finally
- {
- consumerTarget.releaseSendLock();
- }
+ getAMQPConnection().notifyWork(this);
}
}
Modified: qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java?rev=1769837&r1=1769836&r2=1769837&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java Tue Nov 15 14:16:10 2016
@@ -361,6 +361,7 @@ public class ServerSessionDelegate exten
options,
priority));
}
+ target.updateNotifyWorkDesired();
}
catch (Queue.ExistingExclusiveConsumer existing)
{
@@ -378,6 +379,10 @@ public class ServerSessionDelegate exten
{
exception(session, method, ExecutionErrorCode.RESOURCE_LOCKED, "Queue has an incompatible exclusivity policy");
}
+ catch (MessageSource.QueueDeleted queueDeleted)
+ {
+ exception(session, method, ExecutionErrorCode.NOT_FOUND, "Queue was deleted");
+ }
}
}
}
@@ -1676,10 +1681,14 @@ public class ServerSessionDelegate exten
{
exception(session, sfm, ExecutionErrorCode.NOT_FOUND, "not-found: destination '" + destination + "'");
}
- else if(sub.isStopped())
+ else if(sub.isFlowModeChangeAllowed())
{
sub.setFlowMode(sfm.getFlowMode());
}
+ else
+ {
+ exception(session, sfm, ExecutionErrorCode.PRECONDITION_FAILED, "destination '" + destination + "' has credit");
+ }
}
@Override
Modified: qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/WindowCreditManager.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/WindowCreditManager.java?rev=1769837&r1=1769836&r2=1769837&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/WindowCreditManager.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/WindowCreditManager.java Tue Nov 15 14:16:10 2016
@@ -25,9 +25,8 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.qpid.server.transport.ProtocolEngine;
-import org.apache.qpid.server.flow.AbstractFlowCreditManager;
-public class WindowCreditManager extends AbstractFlowCreditManager implements FlowCreditManager_0_10
+public class WindowCreditManager implements FlowCreditManager_0_10
{
private static final Logger LOGGER = LoggerFactory.getLogger(WindowCreditManager.class);
private final ProtocolEngine _protocolEngine;
@@ -45,8 +44,6 @@ public class WindowCreditManager extends
_protocolEngine = protocolEngine;
_bytesCreditLimit = bytesCreditLimit;
_messageCreditLimit = messageCreditLimit;
- setSuspended(!hasCredit());
-
}
public synchronized long getMessageCreditLimit()
@@ -77,30 +74,14 @@ public class WindowCreditManager extends
_messageUsed = 0;
}
- boolean notifyIncrease = true;
-
- if (_messageCreditLimit > 0L)
- {
- notifyIncrease = (_messageUsed != _messageCreditLimit);
- }
-
_bytesUsed -= bytesCredit;
if (_bytesUsed < 0L)
{
LOGGER.error("Bytes credit used value was negative: " + _bytesUsed);
_bytesUsed = 0;
}
-
- notifyIncrease = notifyIncrease && bytesCredit > 0 && _bytesCreditLimit > 0L ;
-
- if (!setSuspended(!hasCredit()) && notifyIncrease)
- {
- notifyIncreaseBytesCredit();
- }
}
-
-
public synchronized boolean hasCredit()
{
return (_bytesCreditLimit < 0L || _bytesCreditLimit > _bytesUsed)
@@ -108,11 +89,16 @@ public class WindowCreditManager extends
&& !_protocolEngine.isTransportBlockedForWriting();
}
+ @Override
+ public boolean hasNeitherCredit()
+ {
+ return _bytesCreditLimit == 0L && _messageCreditLimit == 0L;
+ }
+
public synchronized boolean useCreditForMessage(final long msgSize)
{
if (_protocolEngine.isTransportBlockedForWriting())
{
- setSuspended(true);
return false;
}
else if(_messageCreditLimit >= 0L)
@@ -139,7 +125,6 @@ public class WindowCreditManager extends
}
else
{
- setSuspended(true);
return false;
}
}
@@ -191,6 +176,5 @@ public class WindowCreditManager extends
{
_bytesCreditLimit = 0l;
_messageCreditLimit = 0l;
- setSuspended(true);
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org