You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2016/11/02 21:29:17 UTC

svn commit: r1767787 - in /qpid/java/branches/remove-queue-runner: broker-core/src/main/java/org/apache/qpid/server/consumer/ broker-core/src/main/java/org/apache/qpid/server/model/ broker-core/src/main/java/org/apache/qpid/server/queue/ broker-core/sr...

Author: rgodfrey
Date: Wed Nov  2 21:29:16 2016
New Revision: 1767787

URL: http://svn.apache.org/viewvc?rev=1767787&view=rev
Log:
Remove queue runner, message assignment suspension

Removed:
    qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/QueueRunner.java
Modified:
    qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java
    qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java
    qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
    qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java
    qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/transport/AMQPConnection.java
    qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/transport/AbstractAMQPConnection.java
    qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/transport/MultiVersionProtocolEngine.java
    qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnection.java
    qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/transport/ProtocolEngine.java
    qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractSystemMessageSource.java
    qpid/java/branches/remove-queue-runner/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java
    qpid/java/branches/remove-queue-runner/broker-core/src/test/java/org/apache/qpid/server/queue/AbstractQueueTestBase.java
    qpid/java/branches/remove-queue-runner/broker-core/src/test/java/org/apache/qpid/server/queue/PriorityQueueTest.java
    qpid/java/branches/remove-queue-runner/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueTest.java
    qpid/java/branches/remove-queue-runner/broker-plugins/websocket/src/main/java/org/apache/qpid/server/transport/websocket/WebSocketProvider.java

Modified: qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java?rev=1767787&r1=1767786&r2=1767787&view=diff
==============================================================================
--- qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java (original)
+++ qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java Wed Nov  2 21:29:16 2016
@@ -156,7 +156,7 @@ public abstract class AbstractConsumerTa
     @Override
     public final boolean isSuspended()
     {
-        return getSessionModel().getAMQPConnection().isMessageAssignmentSuspended() || isFlowSuspended();
+        return isFlowSuspended();
     }
 
     @Override

Modified: qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java?rev=1767787&r1=1767786&r2=1767787&view=diff
==============================================================================
--- qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java (original)
+++ qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java Wed Nov  2 21:29:16 2016
@@ -401,8 +401,6 @@ public interface Queue<X extends Queue<X
 
     Set<NotificationCheck> getNotificationChecks();
 
-    void deliverAsync();
-
     Collection<String> getAvailableAttributes();
 
     void completeRecovery();

Modified: qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java?rev=1767787&r1=1767786&r2=1767787&view=diff
==============================================================================
--- qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java (original)
+++ qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java Wed Nov  2 21:29:16 2016
@@ -285,7 +285,6 @@ public abstract class AbstractQueue<X ex
 
     private final ConcurrentLinkedQueue<EnqueueRequest> _postRecoveryQueue = new ConcurrentLinkedQueue<>();
 
-    private final QueueRunner _queueRunner;
     private boolean _closing;
     private final ConcurrentMap<String, Callable<MessageFilter>> _defaultFiltersMap = new ConcurrentHashMap<>();
     private final List<HoldMethod> _holdMethods = new CopyOnWriteArrayList<>();
@@ -305,8 +304,6 @@ public abstract class AbstractQueue<X ex
         _virtualHost = virtualHost;
         _immediateDeliveryContext = getSystemTaskControllerContext("Immediate Delivery", virtualHost.getPrincipal());
 
-        _queueRunner = new QueueRunner(this, getSystemTaskControllerContext("Queue Delivery",
-                                                                            virtualHost.getPrincipal()));
     }
 
     @Override
@@ -972,10 +969,6 @@ public abstract class AbstractQueue<X ex
         {
             consumer.notifyWork();
         }
-        else
-        {
-            deliverAsync();
-        }
 
         return consumer;
     }
@@ -1099,7 +1092,6 @@ public abstract class AbstractQueue<X ex
             }
         }
         notifyPullOnlyConsumers();
-        deliverAsync();
     }
 
     public void addBinding(final Binding<?> binding)
@@ -1234,25 +1226,10 @@ public abstract class AbstractQueue<X ex
 
         try
         {
-            if (action != null || (exclusiveSub == null  && _queueRunner.isIdle()))
-            {
-                AccessController.doPrivileged(
-                        new PrivilegedAction<Void>()
-                        {
-                            @Override
-                            public Void run()
-                            {
-                                tryDeliverStraightThrough(entry);
-                                return null;
-                            }
-                        }, _immediateDeliveryContext);
-            }
-
             if (entry.isAvailable())
             {
                 checkConsumersNotAheadOfDelivery(entry);
                 notifyPullOnlyConsumers();
-                deliverAsync();
             }
 
             checkForNotificationOnNewMessage(entry.getMessage());
@@ -1546,7 +1523,6 @@ public abstract class AbstractQueue<X ex
             }
         }
         notifyPullOnlyConsumers();
-        deliverAsync();
 
     }
 
@@ -1740,20 +1716,9 @@ public abstract class AbstractQueue<X ex
             if (oldState != State.ACTIVE)
             {
                 _activeSubscriberCount.incrementAndGet();
-                if(sub.isPullOnly())
-                {
-                    sub.notifyWork();
-                }
-
-            }
-            if(!sub.isPullOnly())
-            {
-                deliverAsync();
-            }
-            else
-            {
-                sub.notifyWork();
             }
+            sub.notifyWork();
+
         }
     }
 
@@ -2168,14 +2133,6 @@ public abstract class AbstractQueue<X ex
         }
     }
 
-    public void deliverAsync()
-    {
-        _stateChangeCount.incrementAndGet();
-
-        _queueRunner.execute();
-
-    }
-
     void notifyPullOnlyConsumers()
     {
         if(_hasPullOnlyConsumers)
@@ -2184,7 +2141,7 @@ public abstract class AbstractQueue<X ex
             while (consumerNode != null)
             {
                 QueueConsumer<?> consumer = consumerNode.getConsumer();
-                if (consumer.isActive() && consumer.isPullOnly() && getNextAvailableEntry(consumer) != null)
+                if (consumer.isActive() && getNextAvailableEntry(consumer) != null)
                 {
                     consumer.notifyWork();
                 }
@@ -2445,151 +2402,6 @@ public abstract class AbstractQueue<X ex
         return getNextAvailableEntry(queueConsumer) != null;
     }
 
-    /**
-     * Used by queue Runners to asynchronously deliver messages to consumers.
-     *
-     * A queue Runner is started whenever a state change occurs, e.g when a new
-     * message arrives on the queue and cannot be immediately delivered to a
-     * consumer (i.e. asynchronous delivery is required).
-     *
-     * processQueue should be running while there are messages on the queue AND
-     * there are consumers that can deliver them. If there are no
-     * consumers capable of delivering the remaining messages on the queue
-     * then processQueue should stop to prevent spinning.
-     *
-     * Since processQueue is runs in a fixed size Executor, it should not run
-     * indefinitely to prevent starving other tasks of CPU (e.g jobs to process
-     * incoming messages may not be able to be scheduled in the thread pool
-     * because all threads are working on clearing down large queues). To solve
-     * this problem, after an arbitrary number of message deliveries the
-     * processQueue job stops iterating, resubmits itself to the executor, and
-     * ends the current instance
-     *
-     * @param runner the Runner to schedule
-     */
-    public long processQueue(QueueRunner runner)
-    {
-        long stateChangeCount;
-        long previousStateChangeCount = Long.MIN_VALUE;
-        long rVal = Long.MIN_VALUE;
-        boolean deliveryIncomplete = true;
-
-        boolean lastLoop = false;
-        int iterations = getMaxAsyncDeliveries();
-
-        final int numSubs = _consumerList.size();
-
-        final int perSub = Math.max(iterations / Math.max(numSubs,1), 1);
-
-        // For every message enqueue/requeue the we fire deliveryAsync() which
-        // increases _stateChangeCount. If _sCC changes whilst we are in our loop
-        // (detected by setting previousStateChangeCount to stateChangeCount in the loop body)
-        // then we will continue to run for a maximum of iterations.
-        // So whilst delivery/rejection is going on a processQueue thread will be running
-        while (iterations != 0 && ((previousStateChangeCount != (stateChangeCount = _stateChangeCount.get())) || deliveryIncomplete))
-        {
-            // we want to have one extra loop after every consumer has reached the point where it cannot move
-            // further, just in case the advance of one consumer in the last loop allows a different consumer to
-            // move forward in the next iteration
-
-            if (previousStateChangeCount != stateChangeCount)
-            {
-                //further asynchronous delivery is required since the
-                //previous loop. keep going if iteration slicing allows.
-                lastLoop = false;
-                rVal = stateChangeCount;
-            }
-
-            previousStateChangeCount = stateChangeCount;
-            boolean allConsumersDone = true;
-            boolean consumerDone;
-
-            ConsumerNodeIterator consumerNodeIterator = _consumerList.iterator();
-            //iterate over the subscribers and try to advance their pointer
-            while (consumerNodeIterator.advance())
-            {
-
-                QueueConsumer<?> sub = consumerNodeIterator.getNode().getConsumer();
-
-                if(!sub.isPullOnly())
-                {
-                    sub.getSendLock();
-
-                    try
-                    {
-                        for (int i = 0; i < perSub; i++)
-                        {
-                            //attempt delivery. returns true if no further delivery currently possible to this sub
-                            consumerDone = attemptDelivery(sub, true);
-                            if (consumerDone)
-                            {
-                                sub.flushBatched();
-                                boolean noMore = getNextAvailableEntry(sub) == null;
-                                if (lastLoop && noMore)
-                                {
-                                    sub.queueEmpty();
-                                }
-                                break;
-                            }
-                            else
-                            {
-                                //this consumer can accept additional deliveries, so we must
-                                //keep going after this (if iteration slicing allows it)
-                                allConsumersDone = false;
-                                lastLoop = false;
-                                if (--iterations == 0)
-                                {
-                                    sub.flushBatched();
-                                    break;
-                                }
-                            }
-                        }
-
-                        sub.flushBatched();
-                    }
-                    finally
-                    {
-                        sub.releaseSendLock();
-                    }
-                }
-            }
-
-            if(allConsumersDone && lastLoop)
-            {
-                //We have done an extra loop already and there are again
-                //again no further delivery attempts possible, only
-                //keep going if state change demands it.
-                deliveryIncomplete = false;
-            }
-            else if(allConsumersDone)
-            {
-                //All consumers reported being done, but we have to do
-                //an extra loop if the iterations are not exhausted and
-                //there is still any work to be done
-                deliveryIncomplete = _consumerList.size() != 0;
-                lastLoop = true;
-            }
-            else
-            {
-                //some consumers can still accept more messages,
-                //keep going if iteration count allows.
-                lastLoop = false;
-                deliveryIncomplete = true;
-            }
-
-        }
-
-        // If iterations == 0 then the limiting factor was the time-slicing rather than available messages or credit
-        // therefore we should schedule this runner again (unless someone beats us to it :-) ).
-        if (iterations == 0)
-        {
-            _logger.debug("Rescheduling runner: {}", runner);
-            return 0L;
-        }
-        return rVal;
-
-    }
-
     public void checkMessageStatus()
     {
         QueueEntryIterator queueListIterator = getEntries().iterator();

Modified: qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java?rev=1767787&r1=1767786&r2=1767787&view=diff
==============================================================================
--- qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java (original)
+++ qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java Wed Nov  2 21:29:16 2016
@@ -262,14 +262,7 @@ class QueueConsumerImpl
     @Override
     public void externalStateChange()
     {
-        if(isPullOnly())
-        {
-            _target.notifyWork();
-        }
-        else
-        {
-            _queue.deliverAsync();
-        }
+        _target.notifyWork();
     }
 
     @Override
@@ -406,16 +399,8 @@ class QueueConsumerImpl
     public final void flush()
     {
         AMQPConnection<?> connection = _target.getSessionModel().getAMQPConnection();
-        try
-        {
-            connection.alwaysAllowMessageAssignmentInThisThreadIfItIsIOThread(true);
-            _queue.flushConsumer(this);
-            _target.processPending();
-        }
-        finally
-        {
-            connection.alwaysAllowMessageAssignmentInThisThreadIfItIsIOThread(false);
-        }
+        _queue.flushConsumer(this);
+        _target.processPending();
 
     }
 
@@ -423,15 +408,8 @@ class QueueConsumerImpl
     public void pullMessage()
     {
         AMQPConnection<?> connection = _target.getSessionModel().getAMQPConnection();
-        try
-        {
-            connection.alwaysAllowMessageAssignmentInThisThreadIfItIsIOThread(true);
-            _queue.flushConsumer(this, 1);
-        }
-        finally
-        {
-            connection.alwaysAllowMessageAssignmentInThisThreadIfItIsIOThread(false);
-        }
+        _queue.flushConsumer(this, 1);
+
 
     }
 
@@ -711,14 +689,7 @@ class QueueConsumerImpl
             entry.addStateChangeListener(this);
             if(!entry.isAvailable())
             {
-                if(isPullOnly())
-                {
-                    _target.notifyWork();
-                }
-                else
-                {
-                    _queue.deliverAsync();
-                }
+                _target.notifyWork();
                 remove();
             }
         }
@@ -738,14 +709,7 @@ class QueueConsumerImpl
         {
             entry.removeStateChangeListener(this);
             _entry.compareAndSet(entry, null);
-            if(isPullOnly())
-            {
-                _target.notifyWork();
-            }
-            else
-            {
-                _queue.deliverAsync();
-            }
+            _target.notifyWork();
         }
 
     }

Modified: qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/transport/AMQPConnection.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/transport/AMQPConnection.java?rev=1767787&r1=1767786&r2=1767787&view=diff
==============================================================================
--- qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/transport/AMQPConnection.java (original)
+++ qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/transport/AMQPConnection.java Wed Nov  2 21:29:16 2016
@@ -35,9 +35,6 @@ import org.apache.qpid.server.util.Delet
 
 public interface AMQPConnection<C extends AMQPConnection<C>> extends Connection<C>, Deletable<C>, EventLoggerProvider
 {
-    boolean isMessageAssignmentSuspended();
-
-    void alwaysAllowMessageAssignmentInThisThreadIfItIsIOThread(boolean override);
 
     AccessControlContext getAccessControlContextFromSubject(Subject subject);
 

Modified: qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/transport/AbstractAMQPConnection.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/transport/AbstractAMQPConnection.java?rev=1767787&r1=1767786&r2=1767787&view=diff
==============================================================================
--- qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/transport/AbstractAMQPConnection.java (original)
+++ qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/transport/AbstractAMQPConnection.java Wed Nov  2 21:29:16 2016
@@ -35,7 +35,6 @@ import java.util.Map;
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.atomic.AtomicReference;
 
 import javax.security.auth.Subject;
 import javax.security.auth.SubjectDomainCombiner;
@@ -96,8 +95,6 @@ public abstract class AbstractAMQPConnec
             new CopyOnWriteArrayList<>();
 
     private final LogSubject _logSubject;
-    private final AtomicReference<Thread> _messageAssignmentAllowedThread = new AtomicReference<>();
-    private final AtomicBoolean _messageAssignmentSuspended = new AtomicBoolean();
     private volatile ContextProvider _contextProvider;
     private volatile EventLoggerProvider _eventLoggerProvider;
     private String _clientProduct;
@@ -489,50 +486,6 @@ public abstract class AbstractAMQPConnec
     }
 
     @Override
-    public boolean isMessageAssignmentSuspended()
-    {
-        Thread currentThread = Thread.currentThread();
-        if (_messageAssignmentAllowedThread.get() == currentThread && currentThread == _ioThread)
-        {
-            return false;
-        }
-        return _messageAssignmentSuspended.get();
-    }
-
-    @Override
-    public void setMessageAssignmentSuspended(final boolean messageAssignmentSuspended, final boolean notifyConsumers)
-    {
-        _messageAssignmentSuspended.set(messageAssignmentSuspended);
-        if(notifyConsumers)
-        {
-            for (AMQSessionModel<?> session : getSessionModels())
-            {
-                if (messageAssignmentSuspended)
-                {
-                    session.ensureConsumersNoticedStateChange();
-                }
-                else
-                {
-                    session.notifyConsumerTargetCurrentStates();
-                }
-            }
-        }
-    }
-
-    @Override
-    public void alwaysAllowMessageAssignmentInThisThreadIfItIsIOThread(boolean allowed)
-    {
-        if (allowed)
-        {
-            _messageAssignmentAllowedThread.set(Thread.currentThread());
-        }
-        else
-        {
-            _messageAssignmentAllowedThread.set(null);
-        }
-    }
-
-    @Override
     public void setIOThread(final Thread ioThread)
     {
         _ioThread = ioThread;

Modified: qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/transport/MultiVersionProtocolEngine.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/transport/MultiVersionProtocolEngine.java?rev=1767787&r1=1767786&r2=1767787&view=diff
==============================================================================
--- qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/transport/MultiVersionProtocolEngine.java (original)
+++ qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/transport/MultiVersionProtocolEngine.java Wed Nov  2 21:29:16 2016
@@ -91,18 +91,6 @@ public class MultiVersionProtocolEngine
         _onCloseTask = onCloseTask;
     }
 
-    @Override
-    public void setMessageAssignmentSuspended(final boolean value, final boolean notifyConsumers)
-    {
-        _delegate.setMessageAssignmentSuspended(value, notifyConsumers);
-    }
-
-    @Override
-    public boolean isMessageAssignmentSuspended()
-    {
-        return _delegate.isMessageAssignmentSuspended();
-    }
-
     public void closed()
     {
         _logger.debug("Closed");
@@ -244,18 +232,6 @@ public class MultiVersionProtocolEngine
     {
 
         @Override
-        public void setMessageAssignmentSuspended(final boolean value, final boolean notifyConsumers)
-        {
-
-        }
-
-        @Override
-        public boolean isMessageAssignmentSuspended()
-        {
-            return false;
-        }
-
-        @Override
         public Iterator<Runnable> processPendingIterator()
         {
             return Collections.emptyIterator();
@@ -366,17 +342,6 @@ public class MultiVersionProtocolEngine
         private final AtomicBoolean _hasWork = new AtomicBoolean();
 
         @Override
-        public void setMessageAssignmentSuspended(final boolean value, final boolean notifyConsumers)
-        {
-        }
-
-        @Override
-        public boolean isMessageAssignmentSuspended()
-        {
-            return false;
-        }
-
-        @Override
         public Iterator<Runnable> processPendingIterator()
         {
             return Collections.emptyIterator();

Modified: qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnection.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnection.java?rev=1767787&r1=1767786&r2=1767787&view=diff
==============================================================================
--- qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnection.java (original)
+++ qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnection.java Wed Nov  2 21:29:16 2016
@@ -218,7 +218,7 @@ public class NonBlockingConnection imple
     {
         if (_usedOutboundMessageSpace.addAndGet(size) > _outboundMessageBufferLimit)
         {
-            _protocolEngine.setMessageAssignmentSuspended(true, false);
+            // RG - TODO
         }
     }
 
@@ -274,7 +274,6 @@ public class NonBlockingConnection imple
                 }
 
                 _protocolEngine.setIOThread(Thread.currentThread());
-                _protocolEngine.setMessageAssignmentSuspended(true, true);
 
                 boolean processPendingComplete = processPending();
 
@@ -290,10 +289,6 @@ public class NonBlockingConnection imple
                         _protocolEngine.notifyWork();
                     }
 
-                    if (_fullyWritten)
-                    {
-                        _protocolEngine.setMessageAssignmentSuspended(false, true);
-                    }
                 }
                 else
                 {

Modified: qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/transport/ProtocolEngine.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/transport/ProtocolEngine.java?rev=1767787&r1=1767786&r2=1767787&view=diff
==============================================================================
--- qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/transport/ProtocolEngine.java (original)
+++ qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/transport/ProtocolEngine.java Wed Nov  2 21:29:16 2016
@@ -53,10 +53,6 @@ public interface ProtocolEngine extends
 
     void setTransportBlockedForWriting(boolean blocked);
 
-    void setMessageAssignmentSuspended(boolean value, final boolean notifyConsumers);
-
-    boolean isMessageAssignmentSuspended();
-
     Iterator<Runnable> processPendingIterator();
 
     boolean hasWork();

Modified: qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractSystemMessageSource.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractSystemMessageSource.java?rev=1767787&r1=1767786&r2=1767787&view=diff
==============================================================================
--- qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractSystemMessageSource.java (original)
+++ qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractSystemMessageSource.java Wed Nov  2 21:29:16 2016
@@ -150,26 +150,18 @@ public abstract class AbstractSystemMess
         {
             AMQPConnection<?> connection = _target.getSessionModel().getAMQPConnection();
             _target.getSendLock();
+
             try
             {
-                connection.alwaysAllowMessageAssignmentInThisThreadIfItIsIOThread(true);
-
-                try
+                if (!_queue.isEmpty())
                 {
-                    if (!_queue.isEmpty())
+                    final PropertiesMessageInstance propertiesMessageInstance = _queue.get(0);
+                    if (!_target.isSuspended() && _target.allocateCredit(propertiesMessageInstance.getMessage()))
                     {
-                        final PropertiesMessageInstance propertiesMessageInstance = _queue.get(0);
-                        if (!_target.isSuspended() && _target.allocateCredit(propertiesMessageInstance.getMessage()))
-                        {
-                            _queue.remove(0);
-                            _target.send(this, propertiesMessageInstance, false);
-                        }
+                        _queue.remove(0);
+                        _target.send(this, propertiesMessageInstance, false);
                     }
                 }
-                finally
-                {
-                    connection.alwaysAllowMessageAssignmentInThisThreadIfItIsIOThread(false);
-                }
             }
             finally
             {
@@ -285,16 +277,8 @@ public abstract class AbstractSystemMess
         public void flush()
         {
             AMQPConnection<?> connection = getSessionModel().getAMQPConnection();
-            try
-            {
-                connection.alwaysAllowMessageAssignmentInThisThreadIfItIsIOThread(true);
-                deliverMessages();
-                _target.processPending();
-            }
-            finally
-            {
-                connection.alwaysAllowMessageAssignmentInThisThreadIfItIsIOThread(false);
-            }
+            deliverMessages();
+            _target.processPending();
         }
 
 

Modified: qpid/java/branches/remove-queue-runner/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/remove-queue-runner/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java?rev=1767787&r1=1767786&r2=1767787&view=diff
==============================================================================
--- qpid/java/branches/remove-queue-runner/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java (original)
+++ qpid/java/branches/remove-queue-runner/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java Wed Nov  2 21:29:16 2016
@@ -63,6 +63,9 @@ public class MockConsumer implements Con
     private final Lock _stateChangeLock = new ReentrantLock();
 
     private boolean _isActive = true;
+    private ConsumerImpl _consumer;
+    private boolean _messageSent;
+    private MockSessionModel _sessionModel = new MockSessionModel();
 
     public MockConsumer()
     {
@@ -107,7 +110,7 @@ public class MockConsumer implements Con
 
     public AMQSessionModel getSessionModel()
     {
-        return new MockSessionModel();
+        return _sessionModel;
     }
 
     public boolean isActive()
@@ -138,6 +141,7 @@ public class MockConsumer implements Con
 
     public long send(final ConsumerImpl consumer, MessageInstance entry, boolean batch)
     {
+        _messageSent = true;
         long size = entry.getMessage().getSize();
         if (messages.contains(entry))
         {
@@ -190,6 +194,7 @@ public class MockConsumer implements Con
     @Override
     public void consumerAdded(final ConsumerImpl sub)
     {
+        _consumer = sub;
     }
 
     @Override
@@ -232,7 +237,16 @@ public class MockConsumer implements Con
     @Override
     public boolean processPending()
     {
-        return false;
+        _consumer.pullMessage();
+        if(_messageSent)
+        {
+            _messageSent = false;
+            return true;
+        }
+        else
+        {
+            return false;
+        }
     }
 
     @Override
@@ -281,7 +295,7 @@ public class MockConsumer implements Con
     @Override
     public boolean isPullOnly()
     {
-        return false;
+        return true;
     }
 
     @Override
@@ -300,6 +314,7 @@ public class MockConsumer implements Con
     {
         private final UUID _id = UUID.randomUUID();
         private Session _modelObject;
+        private AMQPConnection<?> _connection = mock(AMQPConnection.class);
 
         private MockSessionModel()
         {
@@ -322,7 +337,7 @@ public class MockConsumer implements Con
         @Override
         public AMQPConnection<?> getAMQPConnection()
         {
-            return null;
+            return _connection;
         }
 
         @Override

Modified: qpid/java/branches/remove-queue-runner/broker-core/src/test/java/org/apache/qpid/server/queue/AbstractQueueTestBase.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/remove-queue-runner/broker-core/src/test/java/org/apache/qpid/server/queue/AbstractQueueTestBase.java?rev=1767787&r1=1767786&r2=1767787&view=diff
==============================================================================
--- qpid/java/branches/remove-queue-runner/broker-core/src/test/java/org/apache/qpid/server/queue/AbstractQueueTestBase.java (original)
+++ qpid/java/branches/remove-queue-runner/broker-core/src/test/java/org/apache/qpid/server/queue/AbstractQueueTestBase.java Wed Nov  2 21:29:16 2016
@@ -40,10 +40,6 @@ import java.util.Set;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 
-import org.apache.qpid.server.model.Binding;
-import org.apache.qpid.server.model.Exchange;
-import org.apache.qpid.server.model.VirtualHost;
-import org.apache.qpid.server.util.StateChangeListener;
 import org.mockito.ArgumentCaptor;
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
@@ -60,12 +56,16 @@ import org.apache.qpid.server.message.Me
 import org.apache.qpid.server.message.MessageReference;
 import org.apache.qpid.server.message.MessageSource;
 import org.apache.qpid.server.message.ServerMessage;
+import org.apache.qpid.server.model.Binding;
+import org.apache.qpid.server.model.BrokerTestHelper;
+import org.apache.qpid.server.model.Exchange;
 import org.apache.qpid.server.model.Queue;
 import org.apache.qpid.server.model.QueueNotificationListener;
+import org.apache.qpid.server.model.VirtualHost;
 import org.apache.qpid.server.queue.AbstractQueue.QueueEntryFilter;
 import org.apache.qpid.server.store.TransactionLogResource;
 import org.apache.qpid.server.util.Action;
-import org.apache.qpid.server.model.BrokerTestHelper;
+import org.apache.qpid.server.util.StateChangeListener;
 import org.apache.qpid.test.utils.QpidTestCase;
 
 abstract class AbstractQueueTestBase extends QpidTestCase
@@ -182,7 +182,7 @@ abstract class AbstractQueueTestBase ext
 
         // Check sending a message ends up with the subscriber
         _queue.enqueue(messageA, null, null);
-        Thread.sleep(_queueRunnerWaitTime);
+        while(_consumerTarget.processPending());
 
         assertEquals(messageA, _consumer.getQueueContext().getLastSeenEntry().getMessage());
         assertNull(_consumer.getQueueContext().getReleasedEntry());
@@ -207,7 +207,7 @@ abstract class AbstractQueueTestBase ext
         _consumer = (QueueConsumer<?>) _queue.addConsumer(_consumerTarget, null, messageA.getClass(), "test",
                                                           EnumSet.of(ConsumerImpl.Option.ACQUIRES,
                                                   ConsumerImpl.Option.SEES_REQUEUES), 0);
-        Thread.sleep(_queueRunnerWaitTime);
+        while(_consumerTarget.processPending());
         assertEquals(messageA, _consumer.getQueueContext().getLastSeenEntry().getMessage());
         assertNull("There should be no releasedEntry after an enqueue",
                    _consumer.getQueueContext().getReleasedEntry());
@@ -225,7 +225,7 @@ abstract class AbstractQueueTestBase ext
         _consumer = (QueueConsumer<?>) _queue.addConsumer(_consumerTarget, null, messageA.getClass(), "test",
                                                           EnumSet.of(ConsumerImpl.Option.ACQUIRES,
                                                   ConsumerImpl.Option.SEES_REQUEUES), 0);
-        Thread.sleep(_queueRunnerWaitTime);
+        while(_consumerTarget.processPending());
         assertEquals(messageB, _consumer.getQueueContext().getLastSeenEntry().getMessage());
         assertNull("There should be no releasedEntry after enqueues",
                    _consumer.getQueueContext().getReleasedEntry());
@@ -248,12 +248,12 @@ abstract class AbstractQueueTestBase ext
         _consumer = (QueueConsumer<?>) _queue.addConsumer(_consumerTarget, null, messageA.getClass(), "test",
                                                           EnumSet.of(ConsumerImpl.Option.ACQUIRES,
                                                                      ConsumerImpl.Option.SEES_REQUEUES), 0);
-        Thread.sleep(_queueRunnerWaitTime);
+        while(_consumerTarget.processPending());
 
         assertEquals("Message which was not yet valid was received", 0, _consumerTarget.getMessages().size());
         when(messageHeader.getNotValidBefore()).thenReturn(System.currentTimeMillis()-100L);
         _queue.checkMessageStatus();
-        Thread.sleep(_queueRunnerWaitTime);
+        while(_consumerTarget.processPending());
         assertEquals("Message which was valid was not received", 1, _consumerTarget.getMessages().size());
     }
 
@@ -274,7 +274,7 @@ abstract class AbstractQueueTestBase ext
         _consumer = (QueueConsumer<?>) _queue.addConsumer(_consumerTarget, null, messageA.getClass(), "test",
                                                           EnumSet.of(ConsumerImpl.Option.ACQUIRES,
                                                                      ConsumerImpl.Option.SEES_REQUEUES), 0);
-        Thread.sleep(_queueRunnerWaitTime);
+        while(_consumerTarget.processPending());
 
         assertEquals("Message was held despite queue not having holding enabled", 1, _consumerTarget.getMessages().size());
 
@@ -300,14 +300,14 @@ abstract class AbstractQueueTestBase ext
         _consumer = (QueueConsumer<?>) _queue.addConsumer(_consumerTarget, null, messageA.getClass(), "test",
                                                           EnumSet.of(ConsumerImpl.Option.ACQUIRES,
                                                                      ConsumerImpl.Option.SEES_REQUEUES), 0);
-        Thread.sleep(_queueRunnerWaitTime);
+        while(_consumerTarget.processPending());
 
         assertEquals("Expect one message (message B)", 1, _consumerTarget.getMessages().size());
         assertEquals("Wrong message received", messageB.getMessageHeader().getMessageId(), _consumerTarget.getMessages().get(0).getMessage().getMessageHeader().getMessageId());
 
         when(messageHeader.getNotValidBefore()).thenReturn(System.currentTimeMillis()-100L);
         _queue.checkMessageStatus();
-        Thread.sleep(_queueRunnerWaitTime);
+        while(_consumerTarget.processPending());
         assertEquals("Message which was valid was not received", 2, _consumerTarget.getMessages().size());
         assertEquals("Wrong message received", messageA.getMessageHeader().getMessageId(), _consumerTarget.getMessages().get(1).getMessage().getMessageHeader().getMessageId());
 
@@ -338,7 +338,7 @@ abstract class AbstractQueueTestBase ext
         _queue.enqueue(messageB, postEnqueueAction, null);
         _queue.enqueue(messageC, postEnqueueAction, null);
 
-        Thread.sleep(_queueRunnerWaitTime);  // Work done by QueueRunner Thread
+        while(_consumerTarget.processPending());
 
         assertEquals("Unexpected total number of messages sent to consumer",
                      3,
@@ -351,7 +351,7 @@ abstract class AbstractQueueTestBase ext
 
         queueEntries.get(0).release();
 
-        Thread.sleep(_queueRunnerWaitTime); // Work done by QueueRunner Thread
+        while(_consumerTarget.processPending());
 
         assertEquals("Unexpected total number of messages sent to consumer",
                      4,
@@ -374,6 +374,13 @@ abstract class AbstractQueueTestBase ext
         final CountDownLatch sendIndicator = new CountDownLatch(1);
         _consumerTarget = new MockConsumer()
         {
+
+            @Override
+            public void notifyWork()
+            {
+                while(processPending());
+            }
+
             @Override
             public long send(ConsumerImpl consumer, MessageInstance entry, boolean batch)
             {
@@ -472,7 +479,7 @@ abstract class AbstractQueueTestBase ext
         _queue.enqueue(messageB, postEnqueueAction, null);
         _queue.enqueue(messageC, postEnqueueAction, null);
 
-        Thread.sleep(_queueRunnerWaitTime);  // Work done by QueueRunner Thread
+        while(_consumerTarget.processPending());
 
         assertEquals("Unexpected total number of messages sent to consumer",
                      3,
@@ -486,7 +493,7 @@ abstract class AbstractQueueTestBase ext
         queueEntries.get(2).release();
         queueEntries.get(0).release();
 
-        Thread.sleep(_queueRunnerWaitTime); // Work done by QueueRunner Thread
+        while(_consumerTarget.processPending());
 
         assertEquals("Unexpected total number of messages sent to consumer",
                      5,
@@ -529,7 +536,8 @@ abstract class AbstractQueueTestBase ext
         _queue.enqueue(messageA, postEnqueueAction, null);
         _queue.enqueue(messageB, postEnqueueAction, null);
 
-        Thread.sleep(_queueRunnerWaitTime);  // Work done by QueueRunner Thread
+        while(target1.processPending());
+        while(target2.processPending());
 
         assertEquals("Unexpected total number of messages sent to both after enqueue",
                      2,
@@ -538,7 +546,8 @@ abstract class AbstractQueueTestBase ext
         /* Now release the first message only, causing it to be requeued */
         queueEntries.get(0).release();
 
-        Thread.sleep(_queueRunnerWaitTime); // Work done by QueueRunner Thread
+        while(target1.processPending());
+        while(target2.processPending());
 
         assertEquals("Unexpected total number of messages sent to both consumers after release",
                      3,
@@ -569,12 +578,15 @@ abstract class AbstractQueueTestBase ext
         final long timeout = System.currentTimeMillis() + _queueRunnerWaitTime;
 
         QueueEntry lastSeen = null;
-        while (timeout > System.currentTimeMillis() &&
+
+        while(_consumerTarget.processPending());
+
+        /*while (timeout > System.currentTimeMillis() &&
                ((lastSeen = _consumer.getQueueContext().getLastSeenEntry()) == null || lastSeen.getMessage() == null))
         {
             Thread.sleep(10);
         }
-
+*/
         assertEquals("Queue context did not see expected message within timeout",
                      messageA, _consumer.getQueueContext().getLastSeenEntry().getMessage());
 
@@ -1062,14 +1074,7 @@ abstract class AbstractQueueTestBase ext
             queue.enqueue(message,null, null);
 
         }
-        try
-        {
-            Thread.sleep(2000L);
-        }
-        catch (InterruptedException e)
-        {
-            _logger.error("Thread interrupted", e);
-        }
+
     }
 
     /**

Modified: qpid/java/branches/remove-queue-runner/broker-core/src/test/java/org/apache/qpid/server/queue/PriorityQueueTest.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/remove-queue-runner/broker-core/src/test/java/org/apache/qpid/server/queue/PriorityQueueTest.java?rev=1767787&r1=1767786&r2=1767787&view=diff
==============================================================================
--- qpid/java/branches/remove-queue-runner/broker-core/src/test/java/org/apache/qpid/server/queue/PriorityQueueTest.java (original)
+++ qpid/java/branches/remove-queue-runner/broker-core/src/test/java/org/apache/qpid/server/queue/PriorityQueueTest.java Wed Nov  2 21:29:16 2016
@@ -64,7 +64,8 @@ public class PriorityQueueTest extends A
 
         // Register subscriber
         queue.addConsumer(getConsumer(), null, null, "test", EnumSet.noneOf(ConsumerImpl.Option.class), 0);
-        Thread.sleep(getQueueRunnerWaitTime());
+
+        while(getConsumer().processPending());
 
         ArrayList<MessageInstance> msgs = getConsumer().getMessages();
         try

Modified: qpid/java/branches/remove-queue-runner/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueTest.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/remove-queue-runner/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueTest.java?rev=1767787&r1=1767786&r2=1767787&view=diff
==============================================================================
--- qpid/java/branches/remove-queue-runner/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueTest.java (original)
+++ qpid/java/branches/remove-queue-runner/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueTest.java Wed Nov  2 21:29:16 2016
@@ -20,7 +20,6 @@
  */
 package org.apache.qpid.server.queue;
 
-import java.security.AccessController;
 import java.util.Arrays;
 import java.util.EnumSet;
 import java.util.HashMap;
@@ -156,6 +155,7 @@ public class StandardQueueTest extends A
 
         // put test messages into a queue
         putGivenNumberOfMessages(queue, 4);
+        while(consumer.processPending());
 
         // assert received messages
         List<MessageInstance> messages = consumer.getMessages();
@@ -167,8 +167,7 @@ public class StandardQueueTest extends A
     }
 
     /**
-     * Tests whether dequeued entry is sent to subscriber in result of
-     * invocation of {@link AbstractQueue#processQueue(QueueRunner)}
+     * Tests whether dequeued entry is sent to subscriber
      */
     public void testProcessQueueWithDequeuedEntry() throws Exception
     {
@@ -195,6 +194,13 @@ public class StandardQueueTest extends A
         // create a consumer
         MockConsumer consumer = new MockConsumer()
         {
+
+            @Override
+            public void notifyWork()
+            {
+                while(processPending());
+            }
+
             /**
              * Send a message and decrement latch
              * @param consumer
@@ -217,14 +223,6 @@ public class StandardQueueTest extends A
                               EnumSet.of(ConsumerImpl.Option.ACQUIRES,
                                          ConsumerImpl.Option.SEES_REQUEUES), 0);
 
-        // process queue
-        testQueue.processQueue(new QueueRunner(testQueue, AccessController.getContext())
-        {
-            public void run()
-            {
-                // do nothing
-            }
-        });
 
         // wait up to 1 minute for message receipt
         try

Modified: qpid/java/branches/remove-queue-runner/broker-plugins/websocket/src/main/java/org/apache/qpid/server/transport/websocket/WebSocketProvider.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/remove-queue-runner/broker-plugins/websocket/src/main/java/org/apache/qpid/server/transport/websocket/WebSocketProvider.java?rev=1767787&r1=1767786&r2=1767787&view=diff
==============================================================================
--- qpid/java/branches/remove-queue-runner/broker-plugins/websocket/src/main/java/org/apache/qpid/server/transport/websocket/WebSocketProvider.java (original)
+++ qpid/java/branches/remove-queue-runner/broker-plugins/websocket/src/main/java/org/apache/qpid/server/transport/websocket/WebSocketProvider.java Wed Nov  2 21:29:16 2016
@@ -281,7 +281,6 @@ class WebSocketProvider implements Accep
                 try
                 {
                     _protocolEngine.setIOThread(Thread.currentThread());
-                    _protocolEngine.setMessageAssignmentSuspended(true, true);
                     Iterator<Runnable> iter = _protocolEngine.processPendingIterator();
                     while(iter.hasNext())
                     {
@@ -296,7 +295,6 @@ class WebSocketProvider implements Accep
 
                     _connectionWrapper.doWrite();
 
-                    _protocolEngine.setMessageAssignmentSuspended(false, true);
                 }
                 finally
                 {
@@ -469,7 +467,7 @@ class WebSocketProvider implements Accep
         {
             if (_usedOutboundMessageSpace.addAndGet(size) > _outboundMessageBufferLimit)
             {
-                _protocolEngine.setMessageAssignmentSuspended(true, false);
+                // RG - TODO
             }
         }
 
@@ -532,7 +530,6 @@ class WebSocketProvider implements Accep
             try
             {
                 _protocolEngine.setIOThread(Thread.currentThread());
-                _protocolEngine.setMessageAssignmentSuspended(true, true);
 
                 Iterator<Runnable> iter = _protocolEngine.processPendingIterator();
                 while(iter.hasNext())
@@ -542,7 +539,6 @@ class WebSocketProvider implements Accep
 
                 doWrite();
 
-                _protocolEngine.setMessageAssignmentSuspended(false, true);
             }
             finally
             {



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org