You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by lq...@apache.org on 2016/11/03 16:03:13 UTC

svn commit: r1767917 - in /qpid/java/branches/remove-queue-runner: broker-core/src/main/java/org/apache/qpid/server/consumer/ broker-core/src/main/java/org/apache/qpid/server/message/ broker-core/src/main/java/org/apache/qpid/server/model/ broker-core/...

Author: lquack
Date: Thu Nov  3 16:03:13 2016
New Revision: 1767917

URL: http://svn.apache.org/viewvc?rev=1767917&view=rev
Log:
move ConsumerTarget to proper pull model

Added:
    qpid/java/branches/remove-queue-runner/broker-core/src/test/java/org/apache/qpid/server/consumer/TestConsumerTarget.java
      - copied, changed from r1767916, qpid/java/branches/remove-queue-runner/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java
Removed:
    qpid/java/branches/remove-queue-runner/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java
Modified:
    qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java
    qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerImpl.java
    qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/message/MessageInstance.java
    qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java
    qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
    qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumer.java
    qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java
    qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
    qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractSystemMessageSource.java
    qpid/java/branches/remove-queue-runner/broker-core/src/test/java/org/apache/qpid/server/queue/AbstractQueueTestBase.java
    qpid/java/branches/remove-queue-runner/broker-core/src/test/java/org/apache/qpid/server/queue/MockMessageInstance.java
    qpid/java/branches/remove-queue-runner/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueTest.java
    qpid/java/branches/remove-queue-runner/broker-core/src/test/java/org/apache/qpid/server/security/TrustStoreMessageSourceTest.java
    qpid/java/branches/remove-queue-runner/broker-core/src/test/java/org/apache/qpid/server/virtualhost/VirtualHostPropertiesNodeTest.java
    qpid/java/branches/remove-queue-runner/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
    qpid/java/branches/remove-queue-runner/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNode.java
    qpid/java/branches/remove-queue-runner/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNodeConsumer.java
    qpid/java/branches/remove-queue-runner/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementResponse.java
    qpid/java/branches/remove-queue-runner/broker-plugins/websocket/src/main/java/org/apache/qpid/server/transport/websocket/WebSocketProvider.java

Modified: qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java?rev=1767917&r1=1767916&r2=1767917&view=diff
==============================================================================
--- qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java (original)
+++ qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java Thu Nov  3 16:03:13 2016
@@ -24,7 +24,6 @@ import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Set;
-import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.CopyOnWriteArraySet;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -40,6 +39,7 @@ import org.apache.qpid.server.logging.Lo
 import org.apache.qpid.server.logging.messages.SubscriptionMessages;
 import org.apache.qpid.server.message.MessageInstance;
 import org.apache.qpid.server.model.Consumer;
+import org.apache.qpid.server.queue.AbstractQueue;
 import org.apache.qpid.server.queue.SuspendedConsumerLoggingTicker;
 import org.apache.qpid.server.transport.AMQPConnection;
 import org.apache.qpid.server.util.StateChangeListener;
@@ -56,7 +56,6 @@ public abstract class AbstractConsumerTa
     private final AtomicInteger _stateActivates = new AtomicInteger();
     private final boolean _isMultiQueue;
     private final SuspendedConsumerLoggingTicker _suspendedConsumerLoggingTicker;
-    private ConcurrentLinkedQueue<ConsumerMessageInstancePair> _queue = new ConcurrentLinkedQueue();
     private final List<ConsumerImpl> _consumers = new CopyOnWriteArrayList<>();
 
     private Iterator<ConsumerImpl> _pullIterator;
@@ -269,7 +268,12 @@ public abstract class AbstractConsumerTa
     @Override
     public final long send(final ConsumerImpl consumer, MessageInstance entry, boolean batch)
     {
-        _queue.add(new ConsumerMessageInstancePair(consumer, entry, batch));
+        doSend(consumer, entry, batch);
+
+        if (consumer.acquires())
+        {
+            entry.makeAcquisitionStealable();
+        }
         return entry.getMessage().getSize();
     }
 
@@ -278,12 +282,7 @@ public abstract class AbstractConsumerTa
     @Override
     public boolean hasMessagesToSend()
     {
-        return !_queue.isEmpty() || messagesAvailable();
-    }
-
-    private boolean messagesAvailable()
-    {
-        if(!_waitingOnStateChange.get() && hasCredit())
+        if (!_waitingOnStateChange.get() && hasCredit())
         {
             for (ConsumerImpl consumer : _consumers)
             {
@@ -299,8 +298,12 @@ public abstract class AbstractConsumerTa
     @Override
     public boolean sendNextMessage()
     {
+        _waitingOnStateChange.set(true);
+
+        AbstractQueue.MessageContainer messageContainer = null;
+        ConsumerImpl consumer = null;
         boolean iteratedCompleteList = false;
-        while (_queue.isEmpty())
+        while (messageContainer == null)
         {
             if (_pullIterator == null || !_pullIterator.hasNext())
             {
@@ -314,34 +317,25 @@ public abstract class AbstractConsumerTa
             }
             if (_pullIterator.hasNext())
             {
-                ConsumerImpl consumer = _pullIterator.next();
-
-                _waitingOnStateChange.set(true);
-
-                consumer.pullMessage();
+                consumer = _pullIterator.next();
+                messageContainer = consumer.pullMessage();
             }
         }
 
-        ConsumerMessageInstancePair consumerMessage = _queue.poll();
-        if (consumerMessage != null)
+        if (messageContainer != null)
         {
             _waitingOnStateChange.set(false);
+            MessageInstance entry = messageContainer._messageInstance;
             try
             {
-
-                ConsumerImpl consumer = consumerMessage.getConsumer();
-                MessageInstance entry = consumerMessage.getEntry();
-                boolean batch = consumerMessage.isBatch();
-                doSend(consumer, entry, batch);
-
-                if (consumer.acquires())
-                {
-                    entry.makeAcquisitionStealable();
-                }
+                send(consumer, entry, false);
             }
             finally
             {
-                consumerMessage.release();
+                if (messageContainer._messageReference != null)
+                {
+                    messageContainer._messageReference.release();
+                }
             }
             return true;
         }
@@ -370,12 +364,6 @@ public abstract class AbstractConsumerTa
                 }
             }
             ConsumerMessageInstancePair instance;
-            while((instance = _queue.poll()) != null)
-            {
-                MessageInstance entry = instance.getEntry();
-                entry.release(instance.getConsumer());
-                instance.release();
-            }
             doCloseInternal();
         }
         finally

Modified: qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerImpl.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerImpl.java?rev=1767917&r1=1767916&r2=1767917&view=diff
==============================================================================
--- qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerImpl.java (original)
+++ qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerImpl.java Thu Nov  3 16:03:13 2016
@@ -24,6 +24,7 @@ import java.util.concurrent.atomic.Atomi
 
 import org.apache.qpid.server.message.MessageSource;
 import org.apache.qpid.server.protocol.AMQSessionModel;
+import org.apache.qpid.server.queue.AbstractQueue;
 
 public interface ConsumerImpl
 {
@@ -35,7 +36,7 @@ public interface ConsumerImpl
 
     boolean hasAvailableMessages();
 
-    void pullMessage();
+    AbstractQueue.MessageContainer pullMessage();
 
     enum Option
     {

Modified: qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/message/MessageInstance.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/message/MessageInstance.java?rev=1767917&r1=1767916&r2=1767917&view=diff
==============================================================================
--- qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/message/MessageInstance.java (original)
+++ qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/message/MessageInstance.java Thu Nov  3 16:03:13 2016
@@ -248,8 +248,6 @@ public interface MessageInstance
 
     void release(ConsumerImpl release);
 
-    boolean resend();
-
     void delete();
 
     boolean isDeleted();

Modified: qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java?rev=1767917&r1=1767916&r2=1767917&view=diff
==============================================================================
--- qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java (original)
+++ qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java Thu Nov  3 16:03:13 2016
@@ -369,8 +369,6 @@ public interface Queue<X extends Queue<X
 
     void incrementUnackedMsgCount(QueueEntry entry);
 
-    boolean resend(QueueEntry entry, QueueConsumer<?> consumer);
-
     List<? extends QueueEntry> getMessagesOnTheQueue();
 
     List<Long> getMessagesOnTheQueue(int num);

Modified: qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java?rev=1767917&r1=1767916&r2=1767917&view=diff
==============================================================================
--- qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java (original)
+++ qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java Thu Nov  3 16:03:13 2016
@@ -115,7 +115,6 @@ import org.apache.qpid.server.util.MapVa
 import org.apache.qpid.server.util.ServerScopedRuntimeException;
 import org.apache.qpid.server.util.StateChangeListener;
 import org.apache.qpid.server.virtualhost.VirtualHostUnavailableException;
-import org.apache.qpid.transport.TransportException;
 
 public abstract class AbstractQueue<X extends AbstractQueue<X>>
         extends AbstractConfiguredObject<X>
@@ -166,8 +165,6 @@ public abstract class AbstractQueue<X ex
 
     private final AtomicInteger _activeSubscriberCount = new AtomicInteger();
 
-    private final AtomicLong _totalMessagesReceived = new AtomicLong();
-
     private final AtomicLong _dequeueCount = new AtomicLong();
     private final AtomicLong _dequeueSize = new AtomicLong();
     private final AtomicLong _enqueueCount = new AtomicLong();
@@ -224,7 +221,6 @@ public abstract class AbstractQueue<X ex
 
     private volatile long _estimatedAverageMessageHeaderSize;
 
-    private AtomicInteger _deliveredMessages = new AtomicInteger();
     private AtomicBoolean _stopped = new AtomicBoolean(false);
 
     private final Set<AMQSessionModel> _blockedChannels = new ConcurrentSkipListSet<AMQSessionModel>();
@@ -1113,8 +1109,6 @@ public abstract class AbstractQueue<X ex
         incrementQueueCount();
         incrementQueueSize(message);
 
-        _totalMessagesReceived.incrementAndGet();
-
         if(_recovering.get() != RECOVERED)
         {
             _enqueuingWhileRecovering.incrementAndGet();
@@ -1155,9 +1149,6 @@ public abstract class AbstractQueue<X ex
     {
         incrementQueueCount();
         incrementQueueSize(message);
-
-        _totalMessagesReceived.incrementAndGet();
-
         doEnqueue(message, null, enqueueRecord);
     }
 
@@ -1254,125 +1245,6 @@ public abstract class AbstractQueue<X ex
         }
     }
 
-    /**
-     * iterate over consumers and if any is at the end of the queue and can deliver this message,
-     * then deliver the message
-     */
-    private void tryDeliverStraightThrough(final QueueEntry entry)
-    {
-        try
-        {
-            ConsumerNode node = _consumerList.getMarkedNode();
-            ConsumerNode nextNode = node.findNext();
-            if (nextNode == null)
-            {
-                nextNode = _consumerList.getHead().findNext();
-            }
-            while (nextNode != null)
-            {
-                if (_consumerList.updateMarkedNode(node, nextNode))
-                {
-                    break;
-                }
-                else
-                {
-                    node = _consumerList.getMarkedNode();
-                    nextNode = node.findNext();
-                    if (nextNode == null)
-                    {
-                        nextNode = _consumerList.getHead().findNext();
-                    }
-                }
-            }
-            // always do one extra loop after we believe we've finished
-            // this catches the case where we *just* miss an update
-            int loops = 2;
-
-            while (entry.isAvailable() && loops != 0)
-            {
-                if (nextNode == null)
-                {
-                    loops--;
-                    nextNode = _consumerList.getHead();
-                }
-                else
-                {
-                    // if consumer at end, and active, offer
-                    final QueueConsumer<?> sub = nextNode.getConsumer();
-
-                    if(sub.getPriority() == Integer.MAX_VALUE)
-                    {
-                        deliverToConsumer(sub, entry);
-                    }
-
-                }
-                nextNode = nextNode.findNext();
-
-            }
-        }
-        catch (ConnectionScopedRuntimeException | TransportException e)
-        {
-            String errorMessage = "Suppressing " + e.getClass().getSimpleName() +
-                              " during straight through delivery, as this" +
-                              " can only indicate an issue with a consumer.";
-            if(_logger.isDebugEnabled())
-            {
-                _logger.debug(errorMessage, e);
-            }
-            else
-            {
-                _logger.info(errorMessage + ' ' + e.getMessage());
-            }
-        }
-    }
-
-    private void deliverToConsumer(final QueueConsumer<?> sub, final QueueEntry entry)
-    {
-
-        if(sub.trySendLock())
-        {
-            try
-            {
-                // get available queue entry first in order to avoid referring old deleted queue entry in sub._queueContext._lastSeen
-                if ((getNextAvailableEntry(sub) == entry)
-                    && !sub.isSuspended()
-                    && sub.hasInterest(entry)
-                    && mightAssign(sub, entry)
-                    && !sub.wouldSuspend(entry))
-                {
-
-                    MessageReference messageReference = null;
-                    try
-                    {
-
-                        if ((sub.acquires() && !assign(sub, entry))
-                            || (!sub.acquires() && (messageReference = entry.newMessageReference()) == null))
-                        {
-                            // restore credit here that would have been taken away by wouldSuspend since we didn't manage
-                            // to acquire the entry for this consumer
-                            sub.restoreCredit(entry);
-                        }
-                        else
-                        {
-                            deliverMessage(sub, entry, false, true);
-                        }
-                    }
-                    finally
-                    {
-                        if (messageReference != null)
-                        {
-                            messageReference.release();
-                        }
-                    }
-                }
-            }
-            finally
-            {
-                sub.releaseSendLock();
-            }
-        }
-    }
-
     private boolean assign(final QueueConsumer<?> sub, final QueueEntry entry)
     {
         if(_messageGroupManager == null)
@@ -1435,22 +1307,6 @@ public abstract class AbstractQueue<X ex
         getAtomicQueueCount().incrementAndGet();
     }
 
-    private void deliverMessage(final QueueConsumer<?> sub,
-                                final QueueEntry entry,
-                                boolean batch,
-                                final boolean updateLastSeen)
-    {
-        if(updateLastSeen)
-        {
-            setLastSeenEntry(sub, entry);
-        }
-
-        _deliveredMessages.incrementAndGet();
-
-        sub.send(entry, batch);
-    }
-
-
     private void setLastSeenEntry(final QueueConsumer<?> sub, final QueueEntry entry)
     {
         QueueContext subContext = sub.getQueueContext();
@@ -1507,13 +1363,7 @@ public abstract class AbstractQueue<X ex
     {
         decrementQueueCount();
         decrementQueueSize(entry);
-        if (entry.acquiredByConsumer())
-        {
-            _deliveredMessages.decrementAndGet();
-        }
-
         checkCapacity();
-
     }
 
     private void decrementQueueSize(final QueueEntry entry)
@@ -1535,31 +1385,6 @@ public abstract class AbstractQueue<X ex
         _dequeueCount.incrementAndGet();
     }
 
-    public boolean resend(final QueueEntry entry, final QueueConsumer<?> consumer)
-    {
-        /* TODO : This is wrong as the consumer may be suspended, we should instead change the state of the message
-                  entry to resend and move back the consumer pointer. */
-
-        consumer.getSendLock();
-        try
-        {
-            if (!consumer.isClosed())
-            {
-                deliverMessage(consumer, entry, false, false);
-                return true;
-            }
-            else
-            {
-                return false;
-            }
-        }
-        finally
-        {
-            consumer.releaseSendLock();
-        }
-    }
-
-
 
     public int getConsumerCount()
     {
@@ -1592,24 +1417,6 @@ public abstract class AbstractQueue<X ex
         return getAtomicQueueSize().get();
     }
 
-    public int getUndeliveredMessageCount()
-    {
-        int count = getQueueDepthMessages() - _deliveredMessages.get();
-        if (count < 0)
-        {
-            return 0;
-        }
-        else
-        {
-            return count;
-        }
-    }
-
-    public long getReceivedMessageCount()
-    {
-        return _totalMessagesReceived.get();
-    }
-
     @Override
     public long getOldestMessageArrivalTime()
     {
@@ -2118,26 +1925,23 @@ public abstract class AbstractQueue<X ex
         }
     }
 
-    boolean deliverSingleMessage(QueueConsumer<?> sub)
+    MessageContainer deliverSingleMessage(QueueConsumer<?> sub)
     {
-        boolean atTail = false;
         boolean queueEmpty = false;
-        boolean deliveryAttempted = false;
+        MessageContainer messageContainer = null;
 
         sub.getSendLock();
         try
         {
             if (!sub.isSuspended())
             {
-                atTail = attemptDelivery(sub, true);
-                deliveryAttempted = true;
-                if (atTail && getNextAvailableEntry(sub) == null)
+                messageContainer = attemptDelivery(sub);
+                if (messageContainer == null && getNextAvailableEntry(sub) == null)
                 {
                     queueEmpty = true;
                 }
             }
-
-            if (!deliveryAttempted )
+            else
             {
                 // avoid referring old deleted queue entry in sub._queueContext._lastSeen
                 getNextAvailableEntry(sub);
@@ -2163,7 +1967,20 @@ public abstract class AbstractQueue<X ex
         {
             advanceAllConsumers();
         }
-        return atTail;
+        return messageContainer;
+    }
+
+    public static class MessageContainer
+    {
+        public final MessageInstance _messageInstance;
+        public final MessageReference<?> _messageReference;
+
+        public MessageContainer(final MessageInstance messageInstance,
+                                final MessageReference<?> messageReference)
+        {
+            _messageInstance = messageInstance;
+            _messageReference = messageReference;
+        }
     }
 
     /**
@@ -2173,13 +1990,11 @@ public abstract class AbstractQueue<X ex
      *
      *
      * @param sub the consumer
-     * @param batch true if processing can be batched
      * @return true if we have completed all possible deliveries for this sub.
      */
-    private boolean attemptDelivery(QueueConsumer<?> sub, boolean batch)
+    private MessageContainer attemptDelivery(QueueConsumer<?> sub)
     {
-        boolean atTail = false;
-
+        MessageContainer messageContainer = null;
         // avoid referring old deleted queue entry in sub._queueContext._lastSeen
         QueueEntry node  = getNextAvailableEntry(sub);
         boolean subActive = sub.isActive() && !sub.isSuspended();
@@ -2212,7 +2027,8 @@ public abstract class AbstractQueue<X ex
                             }
                             else
                             {
-                                deliverMessage(sub, node, batch, true);
+                                setLastSeenEntry(sub, node);
+                                messageContainer = new MessageContainer(node, messageReference);
                             }
                         }
                         finally
@@ -2232,11 +2048,9 @@ public abstract class AbstractQueue<X ex
 
                     }
                 }
-
             }
-            atTail = (node == null) || (getNextAvailableEntry(sub) == null);
         }
-        return atTail || !subActive;
+        return messageContainer;
     }
 
     private boolean noHigherPriorityWithCredit(final QueueConsumer<?> sub)
@@ -2334,7 +2148,12 @@ public abstract class AbstractQueue<X ex
 
     boolean hasAvailableMessages(final QueueConsumer queueConsumer)
     {
-        return getNextAvailableEntry(queueConsumer) != null;
+        boolean hasAvailableMessages = getNextAvailableEntry(queueConsumer) != null;
+        if (!hasAvailableMessages)
+        {
+            queueConsumer.queueEmpty();
+        }
+        return hasAvailableMessages;
     }
 
     public void checkMessageStatus()

Modified: qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumer.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumer.java?rev=1767917&r1=1767916&r2=1767917&view=diff
==============================================================================
--- qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumer.java (original)
+++ qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumer.java Thu Nov  3 16:03:13 2016
@@ -36,16 +36,12 @@ public interface QueueConsumer<X extends
 
     void restoreCredit(QueueEntry entry);
 
-    void send(QueueEntry entry, boolean batch);
-
     void acquisitionRemoved(QueueEntry node);
 
     void queueDeleted();
 
     Queue<?> getQueue();
 
-    boolean resend(QueueEntry e);
-
     MessageInstance.StealableConsumerAcquiredState<X> getOwningState();
 
     QueueContext getQueueContext();

Modified: qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java?rev=1767917&r1=1767916&r2=1767917&view=diff
==============================================================================
--- qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java (original)
+++ qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java Thu Nov  3 16:03:13 2016
@@ -389,19 +389,15 @@ class QueueConsumerImpl
     }
 
     @Override
-    public void pullMessage()
+    public AbstractQueue.MessageContainer pullMessage()
     {
-        _queue.deliverSingleMessage(this);
-    }
-
-    public boolean resend(final QueueEntry entry)
-    {
-        boolean messageWasResent = getQueue().resend(entry, this);
-        if (messageWasResent)
+        AbstractQueue.MessageContainer messageContainer = _queue.deliverSingleMessage(this);
+        if (messageContainer != null)
         {
-            _target.processPending();
+            _deliveredCount.incrementAndGet();
+            _deliveredBytes.addAndGet(messageContainer._messageInstance.getMessage().getSize());
         }
-        return messageWasResent;
+        return messageContainer;
     }
 
     public final long getConsumerNumber()
@@ -578,13 +574,6 @@ class QueueConsumerImpl
         return _deliveredCount.longValue();
     }
 
-    public final void send(final QueueEntry entry, final boolean batch)
-    {
-        _deliveredCount.incrementAndGet();
-        long size = _target.send(this, entry, batch);
-        _deliveredBytes.addAndGet(size);
-    }
-
     @Override
     public void acquisitionRemoved(final QueueEntry node)
     {

Modified: qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java?rev=1767917&r1=1767916&r2=1767917&view=diff
==============================================================================
--- qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java (original)
+++ qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java Thu Nov  3 16:03:13 2016
@@ -721,17 +721,6 @@ public abstract class QueueEntryImpl imp
     }
 
     @Override
-    public boolean resend()
-    {
-        QueueConsumer sub = getDeliveredConsumer();
-        if(sub != null)
-        {
-            return sub.resend(this);
-        }
-        return false;
-    }
-
-    @Override
     public TransactionLogResource getOwningResource()
     {
         return getQueue();

Modified: qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractSystemMessageSource.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractSystemMessageSource.java?rev=1767917&r1=1767916&r2=1767917&view=diff
==============================================================================
--- qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractSystemMessageSource.java (original)
+++ qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractSystemMessageSource.java Thu Nov  3 16:03:13 2016
@@ -40,6 +40,7 @@ import org.apache.qpid.server.message.Se
 import org.apache.qpid.server.message.internal.InternalMessage;
 import org.apache.qpid.server.model.NamedAddressSpace;
 import org.apache.qpid.server.protocol.AMQSessionModel;
+import org.apache.qpid.server.queue.AbstractQueue;
 import org.apache.qpid.server.store.MessageDurability;
 import org.apache.qpid.server.store.MessageEnqueueRecord;
 import org.apache.qpid.server.store.TransactionLogResource;
@@ -116,15 +117,12 @@ public abstract class AbstractSystemMess
                 Collections.synchronizedList(new ArrayList<PropertiesMessageInstance>());
         private final ConsumerTarget _target;
         private final String _name;
-        private final StateChangeListener<ConsumerTarget, ConsumerTarget.State> _targetChangeListener =
-                new Consumer.TargetChangeListener();
 
 
         public Consumer(final String consumerName, ConsumerTarget target)
         {
             _name = consumerName;
             _target = target;
-            target.addStateListener(_targetChangeListener);
         }
 
         @Override
@@ -146,11 +144,9 @@ public abstract class AbstractSystemMess
         }
 
         @Override
-        public void pullMessage()
+        public AbstractQueue.MessageContainer pullMessage()
         {
-            AMQPConnection<?> connection = _target.getSessionModel().getAMQPConnection();
             _target.getSendLock();
-
             try
             {
                 if (!_queue.isEmpty())
@@ -159,7 +155,7 @@ public abstract class AbstractSystemMess
                     if (!_target.isSuspended() && _target.allocateCredit(propertiesMessageInstance.getMessage()))
                     {
                         _queue.remove(0);
-                        _target.send(this, propertiesMessageInstance, false);
+                        return new AbstractQueue.MessageContainer(propertiesMessageInstance, null);
                     }
                 }
             }
@@ -167,8 +163,7 @@ public abstract class AbstractSystemMess
             {
                 _target.releaseSendLock();
             }
-
-
+            return null;
         }
 
         @Override
@@ -276,66 +271,9 @@ public abstract class AbstractSystemMess
 
         public void send(final InternalMessage response)
         {
-            _target.getSendLock();
-            try
-            {
-                final PropertiesMessageInstance
-                        responseEntry = new PropertiesMessageInstance(this, response);
-                if (_queue.isEmpty() && _target.allocateCredit(response))
-                {
-                    _target.send(this, responseEntry, false);
-                }
-                else
-                {
-                    _queue.add(responseEntry);
-                }
-            }
-            finally
-            {
-                _target.releaseSendLock();
-            }
-        }
-
-        private class TargetChangeListener implements StateChangeListener<ConsumerTarget, ConsumerTarget.State>
-        {
-            @Override
-            public void stateChanged(final ConsumerTarget object,
-                                     final ConsumerTarget.State oldState,
-                                     final ConsumerTarget.State newState)
-            {
-                if (newState == ConsumerTarget.State.ACTIVE)
-                {
-                    deliverMessages();
-                }
-            }
-        }
-
-        private void deliverMessages()
-        {
-            _target.getSendLock();
-            try
-            {
-                while (!_queue.isEmpty())
-                {
-
-                    final PropertiesMessageInstance propertiesMessageInstance = _queue.get(0);
-                    if (!_target.isSuspended() && _target.allocateCredit(propertiesMessageInstance.getMessage()))
-                    {
-                        _queue.remove(0);
-                        _target.send(this, propertiesMessageInstance, false);
-                    }
-                    else
-                    {
-                        break;
-                    }
-                }
-            }
-            finally
-            {
-                _target.releaseSendLock();
-            }
+            _queue.add(new PropertiesMessageInstance(this, response));
+            _target.notifyWork();
         }
-
     }
 
     class PropertiesMessageInstance implements MessageInstance
@@ -525,12 +463,6 @@ public abstract class AbstractSystemMess
         }
 
         @Override
-        public boolean resend()
-        {
-            return false;
-        }
-
-        @Override
         public void delete()
         {
             _isDeleted = true;

Copied: qpid/java/branches/remove-queue-runner/broker-core/src/test/java/org/apache/qpid/server/consumer/TestConsumerTarget.java (from r1767916, qpid/java/branches/remove-queue-runner/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java)
URL: http://svn.apache.org/viewvc/qpid/java/branches/remove-queue-runner/broker-core/src/test/java/org/apache/qpid/server/consumer/TestConsumerTarget.java?p2=qpid/java/branches/remove-queue-runner/broker-core/src/test/java/org/apache/qpid/server/consumer/TestConsumerTarget.java&p1=qpid/java/branches/remove-queue-runner/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java&r1=1767916&r2=1767917&rev=1767917&view=diff
==============================================================================
--- qpid/java/branches/remove-queue-runner/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java (original)
+++ qpid/java/branches/remove-queue-runner/broker-core/src/test/java/org/apache/qpid/server/consumer/TestConsumerTarget.java Thu Nov  3 16:03:13 2016
@@ -45,12 +45,13 @@ import org.apache.qpid.server.model.Queu
 import org.apache.qpid.server.model.Session;
 import org.apache.qpid.server.protocol.AMQSessionModel;
 import org.apache.qpid.server.protocol.ConsumerListener;
+import org.apache.qpid.server.queue.AbstractQueue;
 import org.apache.qpid.server.transport.AMQPConnection;
 import org.apache.qpid.server.util.Action;
 import org.apache.qpid.server.util.StateChangeListener;
 import org.apache.qpid.transport.network.Ticker;
 
-public class MockConsumer implements ConsumerTarget
+public class TestConsumerTarget implements ConsumerTarget
 {
 
     private final List<String> _messageIds;
@@ -59,7 +60,7 @@ public class MockConsumer implements Con
     private Queue<?> queue = null;
     private StateChangeListener<ConsumerTarget, State> _listener = null;
     private State _state = State.ACTIVE;
-    private ArrayList<MessageInstance> messages = new ArrayList<MessageInstance>();
+    private ArrayList<MessageInstance> _messages = new ArrayList<MessageInstance>();
     private final Lock _stateChangeLock = new ReentrantLock();
 
     private boolean _isActive = true;
@@ -67,12 +68,12 @@ public class MockConsumer implements Con
     private boolean _messageSent;
     private MockSessionModel _sessionModel = new MockSessionModel();
 
-    public MockConsumer()
+    public TestConsumerTarget()
     {
         _messageIds = null;
     }
 
-    public MockConsumer(List<String> messageIds)
+    public TestConsumerTarget(List<String> messageIds)
     {
         _messageIds = messageIds;
     }
@@ -143,11 +144,11 @@ public class MockConsumer implements Con
     {
         _messageSent = true;
         long size = entry.getMessage().getSize();
-        if (messages.contains(entry))
+        if (_messages.contains(entry))
         {
             entry.setRedelivered();
         }
-        messages.add(entry);
+        _messages.add(entry);
         return size;
     }
 
@@ -237,16 +238,14 @@ public class MockConsumer implements Con
     @Override
     public boolean processPending()
     {
-        _consumer.pullMessage();
-        if(_messageSent)
-        {
-            _messageSent = false;
-            return true;
-        }
-        else
+        AbstractQueue.MessageContainer messageContainter = _consumer.pullMessage();
+        if (messageContainter == null)
         {
             return false;
         }
+
+        send(_consumer, messageContainter._messageInstance, false);
+        return true;
     }
 
     @Override
@@ -257,7 +256,7 @@ public class MockConsumer implements Con
 
     public ArrayList<MessageInstance> getMessages()
     {
-        return messages;
+        return _messages;
     }
 
 

Modified: qpid/java/branches/remove-queue-runner/broker-core/src/test/java/org/apache/qpid/server/queue/AbstractQueueTestBase.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/remove-queue-runner/broker-core/src/test/java/org/apache/qpid/server/queue/AbstractQueueTestBase.java?rev=1767917&r1=1767916&r2=1767917&view=diff
==============================================================================
--- qpid/java/branches/remove-queue-runner/broker-core/src/test/java/org/apache/qpid/server/queue/AbstractQueueTestBase.java (original)
+++ qpid/java/branches/remove-queue-runner/broker-core/src/test/java/org/apache/qpid/server/queue/AbstractQueueTestBase.java Thu Nov  3 16:03:13 2016
@@ -48,7 +48,7 @@ import org.slf4j.LoggerFactory;
 
 import org.apache.qpid.exchange.ExchangeDefaults;
 import org.apache.qpid.server.consumer.ConsumerImpl;
-import org.apache.qpid.server.consumer.MockConsumer;
+import org.apache.qpid.server.consumer.TestConsumerTarget;
 import org.apache.qpid.server.exchange.DirectExchange;
 import org.apache.qpid.server.message.AMQMessageHeader;
 import org.apache.qpid.server.message.InstanceProperties;
@@ -77,7 +77,7 @@ abstract class AbstractQueueTestBase ext
     private String _owner = "owner";
     private String _routingKey = "routing key";
     private DirectExchange _exchange;
-    private MockConsumer _consumerTarget = new MockConsumer();
+    private TestConsumerTarget _consumerTarget = new TestConsumerTarget();
     private QueueConsumer<?> _consumer;
     private Map<String,Object> _arguments = Collections.emptyMap();
 
@@ -369,7 +369,7 @@ abstract class AbstractQueueTestBase ext
     {
         ServerMessage messageA = createMessage(new Long(24));
         final CountDownLatch sendIndicator = new CountDownLatch(1);
-        _consumerTarget = new MockConsumer()
+        _consumerTarget = new TestConsumerTarget()
         {
 
             @Override
@@ -512,8 +512,8 @@ abstract class AbstractQueueTestBase ext
         ServerMessage messageA = createMessage(new Long(24));
         ServerMessage messageB = createMessage(new Long(25));
 
-        MockConsumer target1 = new MockConsumer();
-        MockConsumer target2 = new MockConsumer();
+        TestConsumerTarget target1 = new TestConsumerTarget();
+        TestConsumerTarget target2 = new TestConsumerTarget();
 
 
         QueueConsumer consumer1 = (QueueConsumer) _queue.addConsumer(target1, null, messageA.getClass(), "test",
@@ -578,7 +578,7 @@ abstract class AbstractQueueTestBase ext
                      messageA, _consumer.getQueueContext().getLastSeenEntry().getMessage());
 
         // Check we cannot add a second subscriber to the queue
-        MockConsumer subB = new MockConsumer();
+        TestConsumerTarget subB = new TestConsumerTarget();
         Exception ex = null;
         try
         {
@@ -615,32 +615,6 @@ abstract class AbstractQueueTestBase ext
         assertNotNull(ex);
     }
 
-
-    public void testResend() throws Exception
-    {
-        Long id = new Long(26);
-        ServerMessage message = createMessage(id);
-
-        _consumer = (QueueConsumer<?>) _queue.addConsumer(_consumerTarget, null, message.getClass(), "test",
-                                                          EnumSet.of(ConsumerImpl.Option.ACQUIRES, ConsumerImpl.Option.SEES_REQUEUES),
-                                                          0);
-
-        _queue.enqueue(message, new Action<MessageInstance>()
-        {
-            @Override
-            public void performAction(final MessageInstance object)
-            {
-                QueueEntryImpl entry = (QueueEntryImpl) object;
-                entry.setRedelivered();
-                _consumer.resend(entry);
-
-            }
-        }, null);
-
-
-
-    }
-
     public void testGetFirstMessageId() throws Exception
     {
         // Create message
@@ -1105,7 +1079,7 @@ abstract class AbstractQueueTestBase ext
         _queue = queue;
     }
 
-    public MockConsumer getConsumer()
+    public TestConsumerTarget getConsumer()
     {
         return _consumerTarget;
     }
@@ -1222,7 +1196,7 @@ abstract class AbstractQueueTestBase ext
         return _exchange;
     }
 
-    public MockConsumer getConsumerTarget()
+    public TestConsumerTarget getConsumerTarget()
     {
         return _consumerTarget;
     }

Modified: qpid/java/branches/remove-queue-runner/broker-core/src/test/java/org/apache/qpid/server/queue/MockMessageInstance.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/remove-queue-runner/broker-core/src/test/java/org/apache/qpid/server/queue/MockMessageInstance.java?rev=1767917&r1=1767916&r2=1767917&view=diff
==============================================================================
--- qpid/java/branches/remove-queue-runner/broker-core/src/test/java/org/apache/qpid/server/queue/MockMessageInstance.java (original)
+++ qpid/java/branches/remove-queue-runner/broker-core/src/test/java/org/apache/qpid/server/queue/MockMessageInstance.java Thu Nov  3 16:03:13 2016
@@ -162,12 +162,6 @@ public class MockMessageInstance impleme
     {
     }
 
-    @Override
-    public boolean resend()
-    {
-        return false;
-    }
-
 
     public void setRedelivered()
     {

Modified: qpid/java/branches/remove-queue-runner/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueTest.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/remove-queue-runner/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueTest.java?rev=1767917&r1=1767916&r2=1767917&view=diff
==============================================================================
--- qpid/java/branches/remove-queue-runner/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueTest.java (original)
+++ qpid/java/branches/remove-queue-runner/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueTest.java Thu Nov  3 16:03:13 2016
@@ -30,7 +30,7 @@ import java.util.concurrent.TimeUnit;
 
 import org.apache.qpid.server.consumer.ConsumerImpl;
 import org.apache.qpid.server.consumer.ConsumerTarget;
-import org.apache.qpid.server.consumer.MockConsumer;
+import org.apache.qpid.server.consumer.TestConsumerTarget;
 import org.apache.qpid.server.message.MessageInstance;
 import org.apache.qpid.server.message.ServerMessage;
 import org.apache.qpid.server.model.LifetimePolicy;
@@ -74,7 +74,7 @@ public class StandardQueueTest extends A
         final StandardQueueImpl queue = new StandardQueueImpl(queueAttributes, getVirtualHost());
         queue.open();
         //verify adding an active consumer increases the count
-        final MockConsumer consumer1 = new MockConsumer();
+        final TestConsumerTarget consumer1 = new TestConsumerTarget();
         consumer1.setActive(true);
         consumer1.setState(ConsumerTarget.State.ACTIVE);
         assertEquals("Unexpected active consumer count", 0, queue.getConsumerCountWithCredit());
@@ -87,7 +87,7 @@ public class StandardQueueTest extends A
         assertEquals("Unexpected active consumer count", 1, queue.getConsumerCountWithCredit());
 
         //verify adding an inactive consumer doesn't increase the count
-        final MockConsumer consumer2 = new MockConsumer();
+        final TestConsumerTarget consumer2 = new TestConsumerTarget();
         consumer2.setActive(false);
         consumer2.setState(ConsumerTarget.State.SUSPENDED);
         assertEquals("Unexpected active consumer count", 1, queue.getConsumerCountWithCredit());
@@ -143,7 +143,7 @@ public class StandardQueueTest extends A
         AbstractQueue queue = new DequeuedQueue(getVirtualHost());
         queue.create();
         // create a consumer
-        MockConsumer consumer = new MockConsumer();
+        TestConsumerTarget consumer = new TestConsumerTarget();
 
         // register consumer
         queue.addConsumer(consumer,
@@ -192,7 +192,7 @@ public class StandardQueueTest extends A
         final CountDownLatch latch = new CountDownLatch(messageNumber -1);
 
         // create a consumer
-        MockConsumer consumer = new MockConsumer()
+        TestConsumerTarget consumer = new TestConsumerTarget()
         {
 
             @Override

Modified: qpid/java/branches/remove-queue-runner/broker-core/src/test/java/org/apache/qpid/server/security/TrustStoreMessageSourceTest.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/remove-queue-runner/broker-core/src/test/java/org/apache/qpid/server/security/TrustStoreMessageSourceTest.java?rev=1767917&r1=1767916&r2=1767917&view=diff
==============================================================================
--- qpid/java/branches/remove-queue-runner/broker-core/src/test/java/org/apache/qpid/server/security/TrustStoreMessageSourceTest.java (original)
+++ qpid/java/branches/remove-queue-runner/broker-core/src/test/java/org/apache/qpid/server/security/TrustStoreMessageSourceTest.java Thu Nov  3 16:03:13 2016
@@ -19,6 +19,7 @@
 package org.apache.qpid.server.security;
 
 import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertNotNull;
 import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.anyBoolean;
 import static org.mockito.Mockito.mock;
@@ -42,11 +43,14 @@ import org.apache.qpid.server.consumer.C
 import org.apache.qpid.server.consumer.ConsumerTarget;
 import org.apache.qpid.server.message.MessageInstance;
 import org.apache.qpid.server.message.ServerMessage;
+import org.apache.qpid.server.model.Consumer;
 import org.apache.qpid.server.model.State;
 import org.apache.qpid.server.model.TrustStore;
 import org.apache.qpid.server.model.VirtualHost;
+import org.apache.qpid.server.queue.AbstractQueue;
 import org.apache.qpid.server.store.MessageStore;
 import org.apache.qpid.server.store.TestMemoryMessageStore;
+import org.apache.qpid.server.virtualhost.AbstractSystemMessageSource;
 import org.apache.qpid.test.utils.QpidTestCase;
 
 
@@ -78,11 +82,10 @@ public class TrustStoreMessageSourceTest
         final ConsumerTarget target = mock(ConsumerTarget.class);
         when(target.allocateCredit(any(ServerMessage.class))).thenReturn(true);
 
-        _trustStoreMessageSource.addConsumer(target, null, ServerMessage.class, getTestName(), options, 0);
-
-        ArgumentCaptor<MessageInstance> argumentCaptor = ArgumentCaptor.forClass(MessageInstance.class);
-        verify(target).send(any(ConsumerImpl.class), argumentCaptor.capture(), anyBoolean());
-        final ServerMessage message = argumentCaptor.getValue().getMessage();
+        ConsumerImpl consumer = _trustStoreMessageSource.addConsumer(target, null, ServerMessage.class, getTestName(), options, 0);
+        final AbstractQueue.MessageContainer messageContainer = consumer.pullMessage();
+        assertNotNull("Could not pull message of TrustStore", messageContainer);
+        final ServerMessage message = messageContainer._messageInstance.getMessage();
         assertCertificates(getCertificatesFromMessage(message));
     }
 

Modified: qpid/java/branches/remove-queue-runner/broker-core/src/test/java/org/apache/qpid/server/virtualhost/VirtualHostPropertiesNodeTest.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/remove-queue-runner/broker-core/src/test/java/org/apache/qpid/server/virtualhost/VirtualHostPropertiesNodeTest.java?rev=1767917&r1=1767916&r2=1767917&view=diff
==============================================================================
--- qpid/java/branches/remove-queue-runner/broker-core/src/test/java/org/apache/qpid/server/virtualhost/VirtualHostPropertiesNodeTest.java (original)
+++ qpid/java/branches/remove-queue-runner/broker-core/src/test/java/org/apache/qpid/server/virtualhost/VirtualHostPropertiesNodeTest.java Thu Nov  3 16:03:13 2016
@@ -31,6 +31,7 @@ import org.apache.qpid.server.consumer.C
 import org.apache.qpid.server.message.MessageInstance;
 import org.apache.qpid.server.message.ServerMessage;
 import org.apache.qpid.server.model.VirtualHost;
+import org.apache.qpid.server.queue.AbstractQueue;
 import org.apache.qpid.server.store.MessageStore;
 import org.apache.qpid.server.store.TestMemoryMessageStore;
 import org.apache.qpid.test.utils.QpidTestCase;
@@ -56,7 +57,8 @@ public class VirtualHostPropertiesNodeTe
         final ConsumerTarget target = mock(ConsumerTarget.class);
         when(target.allocateCredit(any(ServerMessage.class))).thenReturn(true);
 
-        _virtualHostPropertiesNode.addConsumer(target, null, ServerMessage.class, getTestName(), options, 0);
-        verify(target).send(any(ConsumerImpl.class), any(MessageInstance.class), anyBoolean());
+        ConsumerImpl consumer = _virtualHostPropertiesNode.addConsumer(target, null, ServerMessage.class, getTestName(), options, 0);
+        final AbstractQueue.MessageContainer messageContainer = consumer.pullMessage();
+        assertNotNull("Could not pull message from VirtualHostPropertyNode", messageContainer);
     }
 }

Modified: qpid/java/branches/remove-queue-runner/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/remove-queue-runner/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java?rev=1767917&r1=1767916&r2=1767917&view=diff
==============================================================================
--- qpid/java/branches/remove-queue-runner/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java (original)
+++ qpid/java/branches/remove-queue-runner/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java Thu Nov  3 16:03:13 2016
@@ -98,6 +98,8 @@ import org.apache.qpid.server.protocol.C
 import org.apache.qpid.server.protocol.ConsumerListener;
 import org.apache.qpid.server.protocol.PublishAuthorisationCache;
 import org.apache.qpid.server.queue.QueueArgumentsConverter;
+import org.apache.qpid.server.queue.QueueConsumer;
+import org.apache.qpid.server.queue.QueueEntry;
 import org.apache.qpid.server.security.SecurityToken;
 import org.apache.qpid.server.store.MessageHandle;
 import org.apache.qpid.server.store.MessageStore;
@@ -1066,6 +1068,29 @@ public class AMQChannel
         return false;
     }
 
+    private boolean resendInternal(MessageInstance messageInstance)
+    {
+        ConsumerImpl subscriber = messageInstance.getDeliveredConsumer();
+        if (subscriber != null && subscriber.getSessionModel() == this)
+        {
+            ConsumerTarget target = subscriber.getTarget();
+            target.getSendLock();
+            try
+            {
+                if (target.getState() != ConsumerTarget.State.CLOSED)
+                {
+                    target.send(subscriber, messageInstance, false);
+                    return true;
+                }
+            }
+            finally
+            {
+                target.releaseSendLock();
+            }
+        }
+        return false;
+    }
+
     /**
      * Called to resend all outstanding unacknowledged messages to this same channel.
      *
@@ -1116,7 +1141,7 @@ public class AMQChannel
             // all messages in the unacked map as redelivered.
             message.setRedelivered();
 
-            if (!message.resend())
+            if (!resendInternal(message))
             {
                 msgToRequeue.put(deliveryTag, message);
             }
@@ -1326,7 +1351,7 @@ public class AMQChannel
             }
             else
             {
-                entry.resend();
+                resendInternal(entry);
             }
         }
         _resendList.clear();
@@ -2417,7 +2442,14 @@ public class AMQChannel
             _logger.debug("RECV[" + _channelId + "] BasicRecover[" + " requeue: " + requeue + " sync: " + sync + " ]");
         }
 
-        resend();
+        if (requeue)
+        {
+            requeue();
+        }
+        else
+        {
+            resend();
+        }
 
         if (sync)
         {
@@ -3688,11 +3720,6 @@ public class AMQChannel
         };
 
         rollback(task);
-
-        //Now resend all the unacknowledged messages back to the original subscribers.
-        //(Must be done after the TxnRollback-ok response).
-        // Why, are we not allowed to send messages back to client before the ok method?
-        resend();
     }
 
     @Override

Modified: qpid/java/branches/remove-queue-runner/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNode.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/remove-queue-runner/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNode.java?rev=1767917&r1=1767916&r2=1767917&view=diff
==============================================================================
--- qpid/java/branches/remove-queue-runner/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNode.java (original)
+++ qpid/java/branches/remove-queue-runner/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNode.java Thu Nov  3 16:03:13 2016
@@ -1233,12 +1233,6 @@ class ManagementNode implements MessageS
         }
 
         @Override
-        public boolean resend()
-        {
-            return false;
-        }
-
-        @Override
         public void delete()
         {
 

Modified: qpid/java/branches/remove-queue-runner/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNodeConsumer.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/remove-queue-runner/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNodeConsumer.java?rev=1767917&r1=1767916&r2=1767917&view=diff
==============================================================================
--- qpid/java/branches/remove-queue-runner/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNodeConsumer.java (original)
+++ qpid/java/branches/remove-queue-runner/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNodeConsumer.java Thu Nov  3 16:03:13 2016
@@ -36,11 +36,11 @@ import org.apache.qpid.server.message.Se
 import org.apache.qpid.server.message.internal.InternalMessage;
 import org.apache.qpid.server.model.NamedAddressSpace;
 import org.apache.qpid.server.protocol.AMQSessionModel;
+import org.apache.qpid.server.queue.AbstractQueue;
 import org.apache.qpid.server.security.SecurityToken;
 import org.apache.qpid.server.store.StorableMessageMetaData;
 import org.apache.qpid.server.txn.ServerTransaction;
 import org.apache.qpid.server.util.Action;
-import org.apache.qpid.server.util.StateChangeListener;
 
 class ManagementNodeConsumer implements ConsumerImpl, MessageDestination
 {
@@ -49,7 +49,6 @@ class ManagementNodeConsumer implements
     private final List<ManagementResponse> _queue = Collections.synchronizedList(new ArrayList<ManagementResponse>());
     private final ConsumerTarget _target;
     private final String _name;
-    private final StateChangeListener<ConsumerTarget, ConsumerTarget.State> _targetChangeListener = new TargetChangeListener();
 
 
     public ManagementNodeConsumer(final String consumerName, final ManagementNode managementNode, ConsumerTarget target)
@@ -57,7 +56,6 @@ class ManagementNodeConsumer implements
         _name = consumerName;
         _managementNode = managementNode;
         _target = target;
-        target.addStateListener(_targetChangeListener);
     }
 
     @Override
@@ -73,9 +71,27 @@ class ManagementNodeConsumer implements
     }
 
     @Override
-    public void pullMessage()
+    public AbstractQueue.MessageContainer pullMessage()
     {
+        _target.getSendLock();
+        try
+        {
+            if (!_queue.isEmpty())
+            {
 
+                final ManagementResponse managementResponse = _queue.get(0);
+                if (!_target.isSuspended() && _target.allocateCredit(managementResponse.getMessage()))
+                {
+                    _queue.remove(0);
+                    return new AbstractQueue.MessageContainer(managementResponse, null);
+                }
+            }
+        }
+        finally
+        {
+            _target.releaseSendLock();
+        }
+        return null;
     }
 
     @Override
@@ -217,62 +233,8 @@ class ManagementNodeConsumer implements
 
     void send(final InternalMessage response)
     {
-        _target.getSendLock();
-        try
-        {
-            final ManagementResponse responseEntry = new ManagementResponse(this, response);
-            if(_queue.isEmpty() && _target.allocateCredit(response))
-            {
-                _target.send(this, responseEntry, false);
-            }
-            else
-            {
-                _queue.add(responseEntry);
-            }
-        }
-        finally
-        {
-            _target.releaseSendLock();
-        }
-    }
-
-    private class TargetChangeListener implements StateChangeListener<ConsumerTarget, ConsumerTarget.State>
-    {
-        @Override
-        public void stateChanged(final ConsumerTarget object,
-                                 final ConsumerTarget.State oldState,
-                                 final ConsumerTarget.State newState)
-        {
-            if(newState == ConsumerTarget.State.ACTIVE)
-            {
-                deliverMessages();
-            }
-        }
-    }
-
-    private void deliverMessages()
-    {
-        _target.getSendLock();
-        try
-        {
-            while(!_queue.isEmpty())
-            {
-
-                final ManagementResponse managementResponse = _queue.get(0);
-                if(!_target.isSuspended() && _target.allocateCredit(managementResponse.getMessage()))
-                {
-                    _queue.remove(0);
-                    _target.send(this, managementResponse, false);
-                }
-                else
-                {
-                    break;
-                }
-            }
-        }
-        finally
-        {
-            _target.releaseSendLock();
-        }
+        final ManagementResponse responseEntry = new ManagementResponse(this, response);
+        _queue.add(responseEntry);
+        _target.notifyWork();
     }
 }

Modified: qpid/java/branches/remove-queue-runner/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementResponse.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/remove-queue-runner/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementResponse.java?rev=1767917&r1=1767916&r2=1767917&view=diff
==============================================================================
--- qpid/java/branches/remove-queue-runner/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementResponse.java (original)
+++ qpid/java/branches/remove-queue-runner/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementResponse.java Thu Nov  3 16:03:13 2016
@@ -219,12 +219,6 @@ class ManagementResponse implements Mess
     }
 
     @Override
-    public boolean resend()
-    {
-        return false;
-    }
-
-    @Override
     public void delete()
     {
         _isDeleted = true;

Modified: qpid/java/branches/remove-queue-runner/broker-plugins/websocket/src/main/java/org/apache/qpid/server/transport/websocket/WebSocketProvider.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/remove-queue-runner/broker-plugins/websocket/src/main/java/org/apache/qpid/server/transport/websocket/WebSocketProvider.java?rev=1767917&r1=1767916&r2=1767917&view=diff
==============================================================================
--- qpid/java/branches/remove-queue-runner/broker-plugins/websocket/src/main/java/org/apache/qpid/server/transport/websocket/WebSocketProvider.java (original)
+++ qpid/java/branches/remove-queue-runner/broker-plugins/websocket/src/main/java/org/apache/qpid/server/transport/websocket/WebSocketProvider.java Thu Nov  3 16:03:13 2016
@@ -81,7 +81,6 @@ class WebSocketProvider implements Accep
     private final Protocol _defaultSupportedProtocolReply;
     private final MultiVersionProtocolEngineFactory _factory;
     private Server _server;
-    private final long _outboundMessageBufferLimit;
 
     WebSocketProvider(final Transport transport,
                       final SSLContext sslContext,
@@ -95,8 +94,6 @@ class WebSocketProvider implements Accep
         _supported = supported;
         _defaultSupportedProtocolReply = defaultSupportedProtocolReply;
 
-        _outboundMessageBufferLimit = (long) _port.getContextValue(Long.class,
-                                                                   AmqpPort.PORT_AMQP_OUTBOUND_MESSAGE_BUFFER_SIZE);
         _factory = new MultiVersionProtocolEngineFactory(
                         _port.getParent(Broker.class),
                         _supported,
@@ -463,15 +460,6 @@ class WebSocketProvider implements Accep
         }
 
         @Override
-        public void reserveOutboundMessageSpace(final long size)
-        {
-            if (_usedOutboundMessageSpace.addAndGet(size) > _outboundMessageBufferLimit)
-            {
-                // RG - TODO
-            }
-        }
-
-        @Override
         public String getTransportInfo()
         {
             return _connection.getProtocol();



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org