You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2016/11/02 21:29:17 UTC
svn commit: r1767787 - in /qpid/java/branches/remove-queue-runner:
broker-core/src/main/java/org/apache/qpid/server/consumer/
broker-core/src/main/java/org/apache/qpid/server/model/
broker-core/src/main/java/org/apache/qpid/server/queue/ broker-core/sr...
Author: rgodfrey
Date: Wed Nov 2 21:29:16 2016
New Revision: 1767787
URL: http://svn.apache.org/viewvc?rev=1767787&view=rev
Log:
Remove queue runner, message assignment suspension
Removed:
qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/QueueRunner.java
Modified:
qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java
qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java
qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java
qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/transport/AMQPConnection.java
qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/transport/AbstractAMQPConnection.java
qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/transport/MultiVersionProtocolEngine.java
qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnection.java
qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/transport/ProtocolEngine.java
qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractSystemMessageSource.java
qpid/java/branches/remove-queue-runner/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java
qpid/java/branches/remove-queue-runner/broker-core/src/test/java/org/apache/qpid/server/queue/AbstractQueueTestBase.java
qpid/java/branches/remove-queue-runner/broker-core/src/test/java/org/apache/qpid/server/queue/PriorityQueueTest.java
qpid/java/branches/remove-queue-runner/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueTest.java
qpid/java/branches/remove-queue-runner/broker-plugins/websocket/src/main/java/org/apache/qpid/server/transport/websocket/WebSocketProvider.java
Modified: qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java?rev=1767787&r1=1767786&r2=1767787&view=diff
==============================================================================
--- qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java (original)
+++ qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java Wed Nov 2 21:29:16 2016
@@ -156,7 +156,7 @@ public abstract class AbstractConsumerTa
@Override
public final boolean isSuspended()
{
- return getSessionModel().getAMQPConnection().isMessageAssignmentSuspended() || isFlowSuspended();
+ return isFlowSuspended();
}
@Override
Modified: qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java?rev=1767787&r1=1767786&r2=1767787&view=diff
==============================================================================
--- qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java (original)
+++ qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java Wed Nov 2 21:29:16 2016
@@ -401,8 +401,6 @@ public interface Queue<X extends Queue<X
Set<NotificationCheck> getNotificationChecks();
- void deliverAsync();
-
Collection<String> getAvailableAttributes();
void completeRecovery();
Modified: qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java?rev=1767787&r1=1767786&r2=1767787&view=diff
==============================================================================
--- qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java (original)
+++ qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java Wed Nov 2 21:29:16 2016
@@ -285,7 +285,6 @@ public abstract class AbstractQueue<X ex
private final ConcurrentLinkedQueue<EnqueueRequest> _postRecoveryQueue = new ConcurrentLinkedQueue<>();
- private final QueueRunner _queueRunner;
private boolean _closing;
private final ConcurrentMap<String, Callable<MessageFilter>> _defaultFiltersMap = new ConcurrentHashMap<>();
private final List<HoldMethod> _holdMethods = new CopyOnWriteArrayList<>();
@@ -305,8 +304,6 @@ public abstract class AbstractQueue<X ex
_virtualHost = virtualHost;
_immediateDeliveryContext = getSystemTaskControllerContext("Immediate Delivery", virtualHost.getPrincipal());
- _queueRunner = new QueueRunner(this, getSystemTaskControllerContext("Queue Delivery",
- virtualHost.getPrincipal()));
}
@Override
@@ -972,10 +969,6 @@ public abstract class AbstractQueue<X ex
{
consumer.notifyWork();
}
- else
- {
- deliverAsync();
- }
return consumer;
}
@@ -1099,7 +1092,6 @@ public abstract class AbstractQueue<X ex
}
}
notifyPullOnlyConsumers();
- deliverAsync();
}
public void addBinding(final Binding<?> binding)
@@ -1234,25 +1226,10 @@ public abstract class AbstractQueue<X ex
try
{
- if (action != null || (exclusiveSub == null && _queueRunner.isIdle()))
- {
- AccessController.doPrivileged(
- new PrivilegedAction<Void>()
- {
- @Override
- public Void run()
- {
- tryDeliverStraightThrough(entry);
- return null;
- }
- }, _immediateDeliveryContext);
- }
-
if (entry.isAvailable())
{
checkConsumersNotAheadOfDelivery(entry);
notifyPullOnlyConsumers();
- deliverAsync();
}
checkForNotificationOnNewMessage(entry.getMessage());
@@ -1546,7 +1523,6 @@ public abstract class AbstractQueue<X ex
}
}
notifyPullOnlyConsumers();
- deliverAsync();
}
@@ -1740,20 +1716,9 @@ public abstract class AbstractQueue<X ex
if (oldState != State.ACTIVE)
{
_activeSubscriberCount.incrementAndGet();
- if(sub.isPullOnly())
- {
- sub.notifyWork();
- }
-
- }
- if(!sub.isPullOnly())
- {
- deliverAsync();
- }
- else
- {
- sub.notifyWork();
}
+ sub.notifyWork();
+
}
}
@@ -2168,14 +2133,6 @@ public abstract class AbstractQueue<X ex
}
}
- public void deliverAsync()
- {
- _stateChangeCount.incrementAndGet();
-
- _queueRunner.execute();
-
- }
-
void notifyPullOnlyConsumers()
{
if(_hasPullOnlyConsumers)
@@ -2184,7 +2141,7 @@ public abstract class AbstractQueue<X ex
while (consumerNode != null)
{
QueueConsumer<?> consumer = consumerNode.getConsumer();
- if (consumer.isActive() && consumer.isPullOnly() && getNextAvailableEntry(consumer) != null)
+ if (consumer.isActive() && getNextAvailableEntry(consumer) != null)
{
consumer.notifyWork();
}
@@ -2445,151 +2402,6 @@ public abstract class AbstractQueue<X ex
return getNextAvailableEntry(queueConsumer) != null;
}
- /**
- * Used by queue Runners to asynchronously deliver messages to consumers.
- *
- * A queue Runner is started whenever a state change occurs, e.g when a new
- * message arrives on the queue and cannot be immediately delivered to a
- * consumer (i.e. asynchronous delivery is required).
- *
- * processQueue should be running while there are messages on the queue AND
- * there are consumers that can deliver them. If there are no
- * consumers capable of delivering the remaining messages on the queue
- * then processQueue should stop to prevent spinning.
- *
- * Since processQueue is runs in a fixed size Executor, it should not run
- * indefinitely to prevent starving other tasks of CPU (e.g jobs to process
- * incoming messages may not be able to be scheduled in the thread pool
- * because all threads are working on clearing down large queues). To solve
- * this problem, after an arbitrary number of message deliveries the
- * processQueue job stops iterating, resubmits itself to the executor, and
- * ends the current instance
- *
- * @param runner the Runner to schedule
- */
- public long processQueue(QueueRunner runner)
- {
- long stateChangeCount;
- long previousStateChangeCount = Long.MIN_VALUE;
- long rVal = Long.MIN_VALUE;
- boolean deliveryIncomplete = true;
-
- boolean lastLoop = false;
- int iterations = getMaxAsyncDeliveries();
-
- final int numSubs = _consumerList.size();
-
- final int perSub = Math.max(iterations / Math.max(numSubs,1), 1);
-
- // For every message enqueue/requeue the we fire deliveryAsync() which
- // increases _stateChangeCount. If _sCC changes whilst we are in our loop
- // (detected by setting previousStateChangeCount to stateChangeCount in the loop body)
- // then we will continue to run for a maximum of iterations.
- // So whilst delivery/rejection is going on a processQueue thread will be running
- while (iterations != 0 && ((previousStateChangeCount != (stateChangeCount = _stateChangeCount.get())) || deliveryIncomplete))
- {
- // we want to have one extra loop after every consumer has reached the point where it cannot move
- // further, just in case the advance of one consumer in the last loop allows a different consumer to
- // move forward in the next iteration
-
- if (previousStateChangeCount != stateChangeCount)
- {
- //further asynchronous delivery is required since the
- //previous loop. keep going if iteration slicing allows.
- lastLoop = false;
- rVal = stateChangeCount;
- }
-
- previousStateChangeCount = stateChangeCount;
- boolean allConsumersDone = true;
- boolean consumerDone;
-
- ConsumerNodeIterator consumerNodeIterator = _consumerList.iterator();
- //iterate over the subscribers and try to advance their pointer
- while (consumerNodeIterator.advance())
- {
-
- QueueConsumer<?> sub = consumerNodeIterator.getNode().getConsumer();
-
- if(!sub.isPullOnly())
- {
- sub.getSendLock();
-
- try
- {
- for (int i = 0; i < perSub; i++)
- {
- //attempt delivery. returns true if no further delivery currently possible to this sub
- consumerDone = attemptDelivery(sub, true);
- if (consumerDone)
- {
- sub.flushBatched();
- boolean noMore = getNextAvailableEntry(sub) == null;
- if (lastLoop && noMore)
- {
- sub.queueEmpty();
- }
- break;
- }
- else
- {
- //this consumer can accept additional deliveries, so we must
- //keep going after this (if iteration slicing allows it)
- allConsumersDone = false;
- lastLoop = false;
- if (--iterations == 0)
- {
- sub.flushBatched();
- break;
- }
- }
- }
-
- sub.flushBatched();
- }
- finally
- {
- sub.releaseSendLock();
- }
- }
- }
-
- if(allConsumersDone && lastLoop)
- {
- //We have done an extra loop already and there are again
- //again no further delivery attempts possible, only
- //keep going if state change demands it.
- deliveryIncomplete = false;
- }
- else if(allConsumersDone)
- {
- //All consumers reported being done, but we have to do
- //an extra loop if the iterations are not exhausted and
- //there is still any work to be done
- deliveryIncomplete = _consumerList.size() != 0;
- lastLoop = true;
- }
- else
- {
- //some consumers can still accept more messages,
- //keep going if iteration count allows.
- lastLoop = false;
- deliveryIncomplete = true;
- }
-
- }
-
- // If iterations == 0 then the limiting factor was the time-slicing rather than available messages or credit
- // therefore we should schedule this runner again (unless someone beats us to it :-) ).
- if (iterations == 0)
- {
- _logger.debug("Rescheduling runner: {}", runner);
- return 0L;
- }
- return rVal;
-
- }
-
public void checkMessageStatus()
{
QueueEntryIterator queueListIterator = getEntries().iterator();
Modified: qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java?rev=1767787&r1=1767786&r2=1767787&view=diff
==============================================================================
--- qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java (original)
+++ qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java Wed Nov 2 21:29:16 2016
@@ -262,14 +262,7 @@ class QueueConsumerImpl
@Override
public void externalStateChange()
{
- if(isPullOnly())
- {
- _target.notifyWork();
- }
- else
- {
- _queue.deliverAsync();
- }
+ _target.notifyWork();
}
@Override
@@ -406,16 +399,8 @@ class QueueConsumerImpl
public final void flush()
{
AMQPConnection<?> connection = _target.getSessionModel().getAMQPConnection();
- try
- {
- connection.alwaysAllowMessageAssignmentInThisThreadIfItIsIOThread(true);
- _queue.flushConsumer(this);
- _target.processPending();
- }
- finally
- {
- connection.alwaysAllowMessageAssignmentInThisThreadIfItIsIOThread(false);
- }
+ _queue.flushConsumer(this);
+ _target.processPending();
}
@@ -423,15 +408,8 @@ class QueueConsumerImpl
public void pullMessage()
{
AMQPConnection<?> connection = _target.getSessionModel().getAMQPConnection();
- try
- {
- connection.alwaysAllowMessageAssignmentInThisThreadIfItIsIOThread(true);
- _queue.flushConsumer(this, 1);
- }
- finally
- {
- connection.alwaysAllowMessageAssignmentInThisThreadIfItIsIOThread(false);
- }
+ _queue.flushConsumer(this, 1);
+
}
@@ -711,14 +689,7 @@ class QueueConsumerImpl
entry.addStateChangeListener(this);
if(!entry.isAvailable())
{
- if(isPullOnly())
- {
- _target.notifyWork();
- }
- else
- {
- _queue.deliverAsync();
- }
+ _target.notifyWork();
remove();
}
}
@@ -738,14 +709,7 @@ class QueueConsumerImpl
{
entry.removeStateChangeListener(this);
_entry.compareAndSet(entry, null);
- if(isPullOnly())
- {
- _target.notifyWork();
- }
- else
- {
- _queue.deliverAsync();
- }
+ _target.notifyWork();
}
}
Modified: qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/transport/AMQPConnection.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/transport/AMQPConnection.java?rev=1767787&r1=1767786&r2=1767787&view=diff
==============================================================================
--- qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/transport/AMQPConnection.java (original)
+++ qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/transport/AMQPConnection.java Wed Nov 2 21:29:16 2016
@@ -35,9 +35,6 @@ import org.apache.qpid.server.util.Delet
public interface AMQPConnection<C extends AMQPConnection<C>> extends Connection<C>, Deletable<C>, EventLoggerProvider
{
- boolean isMessageAssignmentSuspended();
-
- void alwaysAllowMessageAssignmentInThisThreadIfItIsIOThread(boolean override);
AccessControlContext getAccessControlContextFromSubject(Subject subject);
Modified: qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/transport/AbstractAMQPConnection.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/transport/AbstractAMQPConnection.java?rev=1767787&r1=1767786&r2=1767787&view=diff
==============================================================================
--- qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/transport/AbstractAMQPConnection.java (original)
+++ qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/transport/AbstractAMQPConnection.java Wed Nov 2 21:29:16 2016
@@ -35,7 +35,6 @@ import java.util.Map;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.atomic.AtomicReference;
import javax.security.auth.Subject;
import javax.security.auth.SubjectDomainCombiner;
@@ -96,8 +95,6 @@ public abstract class AbstractAMQPConnec
new CopyOnWriteArrayList<>();
private final LogSubject _logSubject;
- private final AtomicReference<Thread> _messageAssignmentAllowedThread = new AtomicReference<>();
- private final AtomicBoolean _messageAssignmentSuspended = new AtomicBoolean();
private volatile ContextProvider _contextProvider;
private volatile EventLoggerProvider _eventLoggerProvider;
private String _clientProduct;
@@ -489,50 +486,6 @@ public abstract class AbstractAMQPConnec
}
@Override
- public boolean isMessageAssignmentSuspended()
- {
- Thread currentThread = Thread.currentThread();
- if (_messageAssignmentAllowedThread.get() == currentThread && currentThread == _ioThread)
- {
- return false;
- }
- return _messageAssignmentSuspended.get();
- }
-
- @Override
- public void setMessageAssignmentSuspended(final boolean messageAssignmentSuspended, final boolean notifyConsumers)
- {
- _messageAssignmentSuspended.set(messageAssignmentSuspended);
- if(notifyConsumers)
- {
- for (AMQSessionModel<?> session : getSessionModels())
- {
- if (messageAssignmentSuspended)
- {
- session.ensureConsumersNoticedStateChange();
- }
- else
- {
- session.notifyConsumerTargetCurrentStates();
- }
- }
- }
- }
-
- @Override
- public void alwaysAllowMessageAssignmentInThisThreadIfItIsIOThread(boolean allowed)
- {
- if (allowed)
- {
- _messageAssignmentAllowedThread.set(Thread.currentThread());
- }
- else
- {
- _messageAssignmentAllowedThread.set(null);
- }
- }
-
- @Override
public void setIOThread(final Thread ioThread)
{
_ioThread = ioThread;
Modified: qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/transport/MultiVersionProtocolEngine.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/transport/MultiVersionProtocolEngine.java?rev=1767787&r1=1767786&r2=1767787&view=diff
==============================================================================
--- qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/transport/MultiVersionProtocolEngine.java (original)
+++ qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/transport/MultiVersionProtocolEngine.java Wed Nov 2 21:29:16 2016
@@ -91,18 +91,6 @@ public class MultiVersionProtocolEngine
_onCloseTask = onCloseTask;
}
- @Override
- public void setMessageAssignmentSuspended(final boolean value, final boolean notifyConsumers)
- {
- _delegate.setMessageAssignmentSuspended(value, notifyConsumers);
- }
-
- @Override
- public boolean isMessageAssignmentSuspended()
- {
- return _delegate.isMessageAssignmentSuspended();
- }
-
public void closed()
{
_logger.debug("Closed");
@@ -244,18 +232,6 @@ public class MultiVersionProtocolEngine
{
@Override
- public void setMessageAssignmentSuspended(final boolean value, final boolean notifyConsumers)
- {
-
- }
-
- @Override
- public boolean isMessageAssignmentSuspended()
- {
- return false;
- }
-
- @Override
public Iterator<Runnable> processPendingIterator()
{
return Collections.emptyIterator();
@@ -366,17 +342,6 @@ public class MultiVersionProtocolEngine
private final AtomicBoolean _hasWork = new AtomicBoolean();
@Override
- public void setMessageAssignmentSuspended(final boolean value, final boolean notifyConsumers)
- {
- }
-
- @Override
- public boolean isMessageAssignmentSuspended()
- {
- return false;
- }
-
- @Override
public Iterator<Runnable> processPendingIterator()
{
return Collections.emptyIterator();
Modified: qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnection.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnection.java?rev=1767787&r1=1767786&r2=1767787&view=diff
==============================================================================
--- qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnection.java (original)
+++ qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnection.java Wed Nov 2 21:29:16 2016
@@ -218,7 +218,7 @@ public class NonBlockingConnection imple
{
if (_usedOutboundMessageSpace.addAndGet(size) > _outboundMessageBufferLimit)
{
- _protocolEngine.setMessageAssignmentSuspended(true, false);
+ // RG - TODO
}
}
@@ -274,7 +274,6 @@ public class NonBlockingConnection imple
}
_protocolEngine.setIOThread(Thread.currentThread());
- _protocolEngine.setMessageAssignmentSuspended(true, true);
boolean processPendingComplete = processPending();
@@ -290,10 +289,6 @@ public class NonBlockingConnection imple
_protocolEngine.notifyWork();
}
- if (_fullyWritten)
- {
- _protocolEngine.setMessageAssignmentSuspended(false, true);
- }
}
else
{
Modified: qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/transport/ProtocolEngine.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/transport/ProtocolEngine.java?rev=1767787&r1=1767786&r2=1767787&view=diff
==============================================================================
--- qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/transport/ProtocolEngine.java (original)
+++ qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/transport/ProtocolEngine.java Wed Nov 2 21:29:16 2016
@@ -53,10 +53,6 @@ public interface ProtocolEngine extends
void setTransportBlockedForWriting(boolean blocked);
- void setMessageAssignmentSuspended(boolean value, final boolean notifyConsumers);
-
- boolean isMessageAssignmentSuspended();
-
Iterator<Runnable> processPendingIterator();
boolean hasWork();
Modified: qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractSystemMessageSource.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractSystemMessageSource.java?rev=1767787&r1=1767786&r2=1767787&view=diff
==============================================================================
--- qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractSystemMessageSource.java (original)
+++ qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractSystemMessageSource.java Wed Nov 2 21:29:16 2016
@@ -150,26 +150,18 @@ public abstract class AbstractSystemMess
{
AMQPConnection<?> connection = _target.getSessionModel().getAMQPConnection();
_target.getSendLock();
+
try
{
- connection.alwaysAllowMessageAssignmentInThisThreadIfItIsIOThread(true);
-
- try
+ if (!_queue.isEmpty())
{
- if (!_queue.isEmpty())
+ final PropertiesMessageInstance propertiesMessageInstance = _queue.get(0);
+ if (!_target.isSuspended() && _target.allocateCredit(propertiesMessageInstance.getMessage()))
{
- final PropertiesMessageInstance propertiesMessageInstance = _queue.get(0);
- if (!_target.isSuspended() && _target.allocateCredit(propertiesMessageInstance.getMessage()))
- {
- _queue.remove(0);
- _target.send(this, propertiesMessageInstance, false);
- }
+ _queue.remove(0);
+ _target.send(this, propertiesMessageInstance, false);
}
}
- finally
- {
- connection.alwaysAllowMessageAssignmentInThisThreadIfItIsIOThread(false);
- }
}
finally
{
@@ -285,16 +277,8 @@ public abstract class AbstractSystemMess
public void flush()
{
AMQPConnection<?> connection = getSessionModel().getAMQPConnection();
- try
- {
- connection.alwaysAllowMessageAssignmentInThisThreadIfItIsIOThread(true);
- deliverMessages();
- _target.processPending();
- }
- finally
- {
- connection.alwaysAllowMessageAssignmentInThisThreadIfItIsIOThread(false);
- }
+ deliverMessages();
+ _target.processPending();
}
Modified: qpid/java/branches/remove-queue-runner/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/remove-queue-runner/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java?rev=1767787&r1=1767786&r2=1767787&view=diff
==============================================================================
--- qpid/java/branches/remove-queue-runner/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java (original)
+++ qpid/java/branches/remove-queue-runner/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java Wed Nov 2 21:29:16 2016
@@ -63,6 +63,9 @@ public class MockConsumer implements Con
private final Lock _stateChangeLock = new ReentrantLock();
private boolean _isActive = true;
+ private ConsumerImpl _consumer;
+ private boolean _messageSent;
+ private MockSessionModel _sessionModel = new MockSessionModel();
public MockConsumer()
{
@@ -107,7 +110,7 @@ public class MockConsumer implements Con
public AMQSessionModel getSessionModel()
{
- return new MockSessionModel();
+ return _sessionModel;
}
public boolean isActive()
@@ -138,6 +141,7 @@ public class MockConsumer implements Con
public long send(final ConsumerImpl consumer, MessageInstance entry, boolean batch)
{
+ _messageSent = true;
long size = entry.getMessage().getSize();
if (messages.contains(entry))
{
@@ -190,6 +194,7 @@ public class MockConsumer implements Con
@Override
public void consumerAdded(final ConsumerImpl sub)
{
+ _consumer = sub;
}
@Override
@@ -232,7 +237,16 @@ public class MockConsumer implements Con
@Override
public boolean processPending()
{
- return false;
+ _consumer.pullMessage();
+ if(_messageSent)
+ {
+ _messageSent = false;
+ return true;
+ }
+ else
+ {
+ return false;
+ }
}
@Override
@@ -281,7 +295,7 @@ public class MockConsumer implements Con
@Override
public boolean isPullOnly()
{
- return false;
+ return true;
}
@Override
@@ -300,6 +314,7 @@ public class MockConsumer implements Con
{
private final UUID _id = UUID.randomUUID();
private Session _modelObject;
+ private AMQPConnection<?> _connection = mock(AMQPConnection.class);
private MockSessionModel()
{
@@ -322,7 +337,7 @@ public class MockConsumer implements Con
@Override
public AMQPConnection<?> getAMQPConnection()
{
- return null;
+ return _connection;
}
@Override
Modified: qpid/java/branches/remove-queue-runner/broker-core/src/test/java/org/apache/qpid/server/queue/AbstractQueueTestBase.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/remove-queue-runner/broker-core/src/test/java/org/apache/qpid/server/queue/AbstractQueueTestBase.java?rev=1767787&r1=1767786&r2=1767787&view=diff
==============================================================================
--- qpid/java/branches/remove-queue-runner/broker-core/src/test/java/org/apache/qpid/server/queue/AbstractQueueTestBase.java (original)
+++ qpid/java/branches/remove-queue-runner/broker-core/src/test/java/org/apache/qpid/server/queue/AbstractQueueTestBase.java Wed Nov 2 21:29:16 2016
@@ -40,10 +40,6 @@ import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
-import org.apache.qpid.server.model.Binding;
-import org.apache.qpid.server.model.Exchange;
-import org.apache.qpid.server.model.VirtualHost;
-import org.apache.qpid.server.util.StateChangeListener;
import org.mockito.ArgumentCaptor;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
@@ -60,12 +56,16 @@ import org.apache.qpid.server.message.Me
import org.apache.qpid.server.message.MessageReference;
import org.apache.qpid.server.message.MessageSource;
import org.apache.qpid.server.message.ServerMessage;
+import org.apache.qpid.server.model.Binding;
+import org.apache.qpid.server.model.BrokerTestHelper;
+import org.apache.qpid.server.model.Exchange;
import org.apache.qpid.server.model.Queue;
import org.apache.qpid.server.model.QueueNotificationListener;
+import org.apache.qpid.server.model.VirtualHost;
import org.apache.qpid.server.queue.AbstractQueue.QueueEntryFilter;
import org.apache.qpid.server.store.TransactionLogResource;
import org.apache.qpid.server.util.Action;
-import org.apache.qpid.server.model.BrokerTestHelper;
+import org.apache.qpid.server.util.StateChangeListener;
import org.apache.qpid.test.utils.QpidTestCase;
abstract class AbstractQueueTestBase extends QpidTestCase
@@ -182,7 +182,7 @@ abstract class AbstractQueueTestBase ext
// Check sending a message ends up with the subscriber
_queue.enqueue(messageA, null, null);
- Thread.sleep(_queueRunnerWaitTime);
+ while(_consumerTarget.processPending());
assertEquals(messageA, _consumer.getQueueContext().getLastSeenEntry().getMessage());
assertNull(_consumer.getQueueContext().getReleasedEntry());
@@ -207,7 +207,7 @@ abstract class AbstractQueueTestBase ext
_consumer = (QueueConsumer<?>) _queue.addConsumer(_consumerTarget, null, messageA.getClass(), "test",
EnumSet.of(ConsumerImpl.Option.ACQUIRES,
ConsumerImpl.Option.SEES_REQUEUES), 0);
- Thread.sleep(_queueRunnerWaitTime);
+ while(_consumerTarget.processPending());
assertEquals(messageA, _consumer.getQueueContext().getLastSeenEntry().getMessage());
assertNull("There should be no releasedEntry after an enqueue",
_consumer.getQueueContext().getReleasedEntry());
@@ -225,7 +225,7 @@ abstract class AbstractQueueTestBase ext
_consumer = (QueueConsumer<?>) _queue.addConsumer(_consumerTarget, null, messageA.getClass(), "test",
EnumSet.of(ConsumerImpl.Option.ACQUIRES,
ConsumerImpl.Option.SEES_REQUEUES), 0);
- Thread.sleep(_queueRunnerWaitTime);
+ while(_consumerTarget.processPending());
assertEquals(messageB, _consumer.getQueueContext().getLastSeenEntry().getMessage());
assertNull("There should be no releasedEntry after enqueues",
_consumer.getQueueContext().getReleasedEntry());
@@ -248,12 +248,12 @@ abstract class AbstractQueueTestBase ext
_consumer = (QueueConsumer<?>) _queue.addConsumer(_consumerTarget, null, messageA.getClass(), "test",
EnumSet.of(ConsumerImpl.Option.ACQUIRES,
ConsumerImpl.Option.SEES_REQUEUES), 0);
- Thread.sleep(_queueRunnerWaitTime);
+ while(_consumerTarget.processPending());
assertEquals("Message which was not yet valid was received", 0, _consumerTarget.getMessages().size());
when(messageHeader.getNotValidBefore()).thenReturn(System.currentTimeMillis()-100L);
_queue.checkMessageStatus();
- Thread.sleep(_queueRunnerWaitTime);
+ while(_consumerTarget.processPending());
assertEquals("Message which was valid was not received", 1, _consumerTarget.getMessages().size());
}
@@ -274,7 +274,7 @@ abstract class AbstractQueueTestBase ext
_consumer = (QueueConsumer<?>) _queue.addConsumer(_consumerTarget, null, messageA.getClass(), "test",
EnumSet.of(ConsumerImpl.Option.ACQUIRES,
ConsumerImpl.Option.SEES_REQUEUES), 0);
- Thread.sleep(_queueRunnerWaitTime);
+ while(_consumerTarget.processPending());
assertEquals("Message was held despite queue not having holding enabled", 1, _consumerTarget.getMessages().size());
@@ -300,14 +300,14 @@ abstract class AbstractQueueTestBase ext
_consumer = (QueueConsumer<?>) _queue.addConsumer(_consumerTarget, null, messageA.getClass(), "test",
EnumSet.of(ConsumerImpl.Option.ACQUIRES,
ConsumerImpl.Option.SEES_REQUEUES), 0);
- Thread.sleep(_queueRunnerWaitTime);
+ while(_consumerTarget.processPending());
assertEquals("Expect one message (message B)", 1, _consumerTarget.getMessages().size());
assertEquals("Wrong message received", messageB.getMessageHeader().getMessageId(), _consumerTarget.getMessages().get(0).getMessage().getMessageHeader().getMessageId());
when(messageHeader.getNotValidBefore()).thenReturn(System.currentTimeMillis()-100L);
_queue.checkMessageStatus();
- Thread.sleep(_queueRunnerWaitTime);
+ while(_consumerTarget.processPending());
assertEquals("Message which was valid was not received", 2, _consumerTarget.getMessages().size());
assertEquals("Wrong message received", messageA.getMessageHeader().getMessageId(), _consumerTarget.getMessages().get(1).getMessage().getMessageHeader().getMessageId());
@@ -338,7 +338,7 @@ abstract class AbstractQueueTestBase ext
_queue.enqueue(messageB, postEnqueueAction, null);
_queue.enqueue(messageC, postEnqueueAction, null);
- Thread.sleep(_queueRunnerWaitTime); // Work done by QueueRunner Thread
+ while(_consumerTarget.processPending());
assertEquals("Unexpected total number of messages sent to consumer",
3,
@@ -351,7 +351,7 @@ abstract class AbstractQueueTestBase ext
queueEntries.get(0).release();
- Thread.sleep(_queueRunnerWaitTime); // Work done by QueueRunner Thread
+ while(_consumerTarget.processPending());
assertEquals("Unexpected total number of messages sent to consumer",
4,
@@ -374,6 +374,13 @@ abstract class AbstractQueueTestBase ext
final CountDownLatch sendIndicator = new CountDownLatch(1);
_consumerTarget = new MockConsumer()
{
+
+ @Override
+ public void notifyWork()
+ {
+ while(processPending());
+ }
+
@Override
public long send(ConsumerImpl consumer, MessageInstance entry, boolean batch)
{
@@ -472,7 +479,7 @@ abstract class AbstractQueueTestBase ext
_queue.enqueue(messageB, postEnqueueAction, null);
_queue.enqueue(messageC, postEnqueueAction, null);
- Thread.sleep(_queueRunnerWaitTime); // Work done by QueueRunner Thread
+ while(_consumerTarget.processPending());
assertEquals("Unexpected total number of messages sent to consumer",
3,
@@ -486,7 +493,7 @@ abstract class AbstractQueueTestBase ext
queueEntries.get(2).release();
queueEntries.get(0).release();
- Thread.sleep(_queueRunnerWaitTime); // Work done by QueueRunner Thread
+ while(_consumerTarget.processPending());
assertEquals("Unexpected total number of messages sent to consumer",
5,
@@ -529,7 +536,8 @@ abstract class AbstractQueueTestBase ext
_queue.enqueue(messageA, postEnqueueAction, null);
_queue.enqueue(messageB, postEnqueueAction, null);
- Thread.sleep(_queueRunnerWaitTime); // Work done by QueueRunner Thread
+ while(target1.processPending());
+ while(target2.processPending());
assertEquals("Unexpected total number of messages sent to both after enqueue",
2,
@@ -538,7 +546,8 @@ abstract class AbstractQueueTestBase ext
/* Now release the first message only, causing it to be requeued */
queueEntries.get(0).release();
- Thread.sleep(_queueRunnerWaitTime); // Work done by QueueRunner Thread
+ while(target1.processPending());
+ while(target2.processPending());
assertEquals("Unexpected total number of messages sent to both consumers after release",
3,
@@ -569,12 +578,15 @@ abstract class AbstractQueueTestBase ext
final long timeout = System.currentTimeMillis() + _queueRunnerWaitTime;
QueueEntry lastSeen = null;
- while (timeout > System.currentTimeMillis() &&
+
+ while(_consumerTarget.processPending());
+
+ /*while (timeout > System.currentTimeMillis() &&
((lastSeen = _consumer.getQueueContext().getLastSeenEntry()) == null || lastSeen.getMessage() == null))
{
Thread.sleep(10);
}
-
+*/
assertEquals("Queue context did not see expected message within timeout",
messageA, _consumer.getQueueContext().getLastSeenEntry().getMessage());
@@ -1062,14 +1074,7 @@ abstract class AbstractQueueTestBase ext
queue.enqueue(message,null, null);
}
- try
- {
- Thread.sleep(2000L);
- }
- catch (InterruptedException e)
- {
- _logger.error("Thread interrupted", e);
- }
+
}
/**
Modified: qpid/java/branches/remove-queue-runner/broker-core/src/test/java/org/apache/qpid/server/queue/PriorityQueueTest.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/remove-queue-runner/broker-core/src/test/java/org/apache/qpid/server/queue/PriorityQueueTest.java?rev=1767787&r1=1767786&r2=1767787&view=diff
==============================================================================
--- qpid/java/branches/remove-queue-runner/broker-core/src/test/java/org/apache/qpid/server/queue/PriorityQueueTest.java (original)
+++ qpid/java/branches/remove-queue-runner/broker-core/src/test/java/org/apache/qpid/server/queue/PriorityQueueTest.java Wed Nov 2 21:29:16 2016
@@ -64,7 +64,8 @@ public class PriorityQueueTest extends A
// Register subscriber
queue.addConsumer(getConsumer(), null, null, "test", EnumSet.noneOf(ConsumerImpl.Option.class), 0);
- Thread.sleep(getQueueRunnerWaitTime());
+
+ while(getConsumer().processPending());
ArrayList<MessageInstance> msgs = getConsumer().getMessages();
try
Modified: qpid/java/branches/remove-queue-runner/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueTest.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/remove-queue-runner/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueTest.java?rev=1767787&r1=1767786&r2=1767787&view=diff
==============================================================================
--- qpid/java/branches/remove-queue-runner/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueTest.java (original)
+++ qpid/java/branches/remove-queue-runner/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueTest.java Wed Nov 2 21:29:16 2016
@@ -20,7 +20,6 @@
*/
package org.apache.qpid.server.queue;
-import java.security.AccessController;
import java.util.Arrays;
import java.util.EnumSet;
import java.util.HashMap;
@@ -156,6 +155,7 @@ public class StandardQueueTest extends A
// put test messages into a queue
putGivenNumberOfMessages(queue, 4);
+ while(consumer.processPending());
// assert received messages
List<MessageInstance> messages = consumer.getMessages();
@@ -167,8 +167,7 @@ public class StandardQueueTest extends A
}
/**
- * Tests whether dequeued entry is sent to subscriber in result of
- * invocation of {@link AbstractQueue#processQueue(QueueRunner)}
+ * Tests whether dequeued entry is sent to subscriber
*/
public void testProcessQueueWithDequeuedEntry() throws Exception
{
@@ -195,6 +194,13 @@ public class StandardQueueTest extends A
// create a consumer
MockConsumer consumer = new MockConsumer()
{
+
+ @Override
+ public void notifyWork()
+ {
+ while(processPending());
+ }
+
/**
* Send a message and decrement latch
* @param consumer
@@ -217,14 +223,6 @@ public class StandardQueueTest extends A
EnumSet.of(ConsumerImpl.Option.ACQUIRES,
ConsumerImpl.Option.SEES_REQUEUES), 0);
- // process queue
- testQueue.processQueue(new QueueRunner(testQueue, AccessController.getContext())
- {
- public void run()
- {
- // do nothing
- }
- });
// wait up to 1 minute for message receipt
try
Modified: qpid/java/branches/remove-queue-runner/broker-plugins/websocket/src/main/java/org/apache/qpid/server/transport/websocket/WebSocketProvider.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/remove-queue-runner/broker-plugins/websocket/src/main/java/org/apache/qpid/server/transport/websocket/WebSocketProvider.java?rev=1767787&r1=1767786&r2=1767787&view=diff
==============================================================================
--- qpid/java/branches/remove-queue-runner/broker-plugins/websocket/src/main/java/org/apache/qpid/server/transport/websocket/WebSocketProvider.java (original)
+++ qpid/java/branches/remove-queue-runner/broker-plugins/websocket/src/main/java/org/apache/qpid/server/transport/websocket/WebSocketProvider.java Wed Nov 2 21:29:16 2016
@@ -281,7 +281,6 @@ class WebSocketProvider implements Accep
try
{
_protocolEngine.setIOThread(Thread.currentThread());
- _protocolEngine.setMessageAssignmentSuspended(true, true);
Iterator<Runnable> iter = _protocolEngine.processPendingIterator();
while(iter.hasNext())
{
@@ -296,7 +295,6 @@ class WebSocketProvider implements Accep
_connectionWrapper.doWrite();
- _protocolEngine.setMessageAssignmentSuspended(false, true);
}
finally
{
@@ -469,7 +467,7 @@ class WebSocketProvider implements Accep
{
if (_usedOutboundMessageSpace.addAndGet(size) > _outboundMessageBufferLimit)
{
- _protocolEngine.setMessageAssignmentSuspended(true, false);
+ // RG - TODO
}
}
@@ -532,7 +530,6 @@ class WebSocketProvider implements Accep
try
{
_protocolEngine.setIOThread(Thread.currentThread());
- _protocolEngine.setMessageAssignmentSuspended(true, true);
Iterator<Runnable> iter = _protocolEngine.processPendingIterator();
while(iter.hasNext())
@@ -542,7 +539,6 @@ class WebSocketProvider implements Accep
doWrite();
- _protocolEngine.setMessageAssignmentSuspended(false, true);
}
finally
{
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org