You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by lq...@apache.org on 2016/11/03 16:03:13 UTC
svn commit: r1767917 - in /qpid/java/branches/remove-queue-runner:
broker-core/src/main/java/org/apache/qpid/server/consumer/
broker-core/src/main/java/org/apache/qpid/server/message/
broker-core/src/main/java/org/apache/qpid/server/model/ broker-core/...
Author: lquack
Date: Thu Nov 3 16:03:13 2016
New Revision: 1767917
URL: http://svn.apache.org/viewvc?rev=1767917&view=rev
Log:
move ConsumerTarget to proper pull model
Added:
qpid/java/branches/remove-queue-runner/broker-core/src/test/java/org/apache/qpid/server/consumer/TestConsumerTarget.java
- copied, changed from r1767916, qpid/java/branches/remove-queue-runner/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java
Removed:
qpid/java/branches/remove-queue-runner/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java
Modified:
qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java
qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerImpl.java
qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/message/MessageInstance.java
qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java
qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumer.java
qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java
qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractSystemMessageSource.java
qpid/java/branches/remove-queue-runner/broker-core/src/test/java/org/apache/qpid/server/queue/AbstractQueueTestBase.java
qpid/java/branches/remove-queue-runner/broker-core/src/test/java/org/apache/qpid/server/queue/MockMessageInstance.java
qpid/java/branches/remove-queue-runner/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueTest.java
qpid/java/branches/remove-queue-runner/broker-core/src/test/java/org/apache/qpid/server/security/TrustStoreMessageSourceTest.java
qpid/java/branches/remove-queue-runner/broker-core/src/test/java/org/apache/qpid/server/virtualhost/VirtualHostPropertiesNodeTest.java
qpid/java/branches/remove-queue-runner/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
qpid/java/branches/remove-queue-runner/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNode.java
qpid/java/branches/remove-queue-runner/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNodeConsumer.java
qpid/java/branches/remove-queue-runner/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementResponse.java
qpid/java/branches/remove-queue-runner/broker-plugins/websocket/src/main/java/org/apache/qpid/server/transport/websocket/WebSocketProvider.java
Modified: qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java?rev=1767917&r1=1767916&r2=1767917&view=diff
==============================================================================
--- qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java (original)
+++ qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java Thu Nov 3 16:03:13 2016
@@ -24,7 +24,6 @@ import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
-import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -40,6 +39,7 @@ import org.apache.qpid.server.logging.Lo
import org.apache.qpid.server.logging.messages.SubscriptionMessages;
import org.apache.qpid.server.message.MessageInstance;
import org.apache.qpid.server.model.Consumer;
+import org.apache.qpid.server.queue.AbstractQueue;
import org.apache.qpid.server.queue.SuspendedConsumerLoggingTicker;
import org.apache.qpid.server.transport.AMQPConnection;
import org.apache.qpid.server.util.StateChangeListener;
@@ -56,7 +56,6 @@ public abstract class AbstractConsumerTa
private final AtomicInteger _stateActivates = new AtomicInteger();
private final boolean _isMultiQueue;
private final SuspendedConsumerLoggingTicker _suspendedConsumerLoggingTicker;
- private ConcurrentLinkedQueue<ConsumerMessageInstancePair> _queue = new ConcurrentLinkedQueue();
private final List<ConsumerImpl> _consumers = new CopyOnWriteArrayList<>();
private Iterator<ConsumerImpl> _pullIterator;
@@ -269,7 +268,12 @@ public abstract class AbstractConsumerTa
@Override
public final long send(final ConsumerImpl consumer, MessageInstance entry, boolean batch)
{
- _queue.add(new ConsumerMessageInstancePair(consumer, entry, batch));
+ doSend(consumer, entry, batch);
+
+ if (consumer.acquires())
+ {
+ entry.makeAcquisitionStealable();
+ }
return entry.getMessage().getSize();
}
@@ -278,12 +282,7 @@ public abstract class AbstractConsumerTa
@Override
public boolean hasMessagesToSend()
{
- return !_queue.isEmpty() || messagesAvailable();
- }
-
- private boolean messagesAvailable()
- {
- if(!_waitingOnStateChange.get() && hasCredit())
+ if (!_waitingOnStateChange.get() && hasCredit())
{
for (ConsumerImpl consumer : _consumers)
{
@@ -299,8 +298,12 @@ public abstract class AbstractConsumerTa
@Override
public boolean sendNextMessage()
{
+ _waitingOnStateChange.set(true);
+
+ AbstractQueue.MessageContainer messageContainer = null;
+ ConsumerImpl consumer = null;
boolean iteratedCompleteList = false;
- while (_queue.isEmpty())
+ while (messageContainer == null)
{
if (_pullIterator == null || !_pullIterator.hasNext())
{
@@ -314,34 +317,25 @@ public abstract class AbstractConsumerTa
}
if (_pullIterator.hasNext())
{
- ConsumerImpl consumer = _pullIterator.next();
-
- _waitingOnStateChange.set(true);
-
- consumer.pullMessage();
+ consumer = _pullIterator.next();
+ messageContainer = consumer.pullMessage();
}
}
- ConsumerMessageInstancePair consumerMessage = _queue.poll();
- if (consumerMessage != null)
+ if (messageContainer != null)
{
_waitingOnStateChange.set(false);
+ MessageInstance entry = messageContainer._messageInstance;
try
{
-
- ConsumerImpl consumer = consumerMessage.getConsumer();
- MessageInstance entry = consumerMessage.getEntry();
- boolean batch = consumerMessage.isBatch();
- doSend(consumer, entry, batch);
-
- if (consumer.acquires())
- {
- entry.makeAcquisitionStealable();
- }
+ send(consumer, entry, false);
}
finally
{
- consumerMessage.release();
+ if (messageContainer._messageReference != null)
+ {
+ messageContainer._messageReference.release();
+ }
}
return true;
}
@@ -370,12 +364,6 @@ public abstract class AbstractConsumerTa
}
}
ConsumerMessageInstancePair instance;
- while((instance = _queue.poll()) != null)
- {
- MessageInstance entry = instance.getEntry();
- entry.release(instance.getConsumer());
- instance.release();
- }
doCloseInternal();
}
finally
Modified: qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerImpl.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerImpl.java?rev=1767917&r1=1767916&r2=1767917&view=diff
==============================================================================
--- qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerImpl.java (original)
+++ qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerImpl.java Thu Nov 3 16:03:13 2016
@@ -24,6 +24,7 @@ import java.util.concurrent.atomic.Atomi
import org.apache.qpid.server.message.MessageSource;
import org.apache.qpid.server.protocol.AMQSessionModel;
+import org.apache.qpid.server.queue.AbstractQueue;
public interface ConsumerImpl
{
@@ -35,7 +36,7 @@ public interface ConsumerImpl
boolean hasAvailableMessages();
- void pullMessage();
+ AbstractQueue.MessageContainer pullMessage();
enum Option
{
Modified: qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/message/MessageInstance.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/message/MessageInstance.java?rev=1767917&r1=1767916&r2=1767917&view=diff
==============================================================================
--- qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/message/MessageInstance.java (original)
+++ qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/message/MessageInstance.java Thu Nov 3 16:03:13 2016
@@ -248,8 +248,6 @@ public interface MessageInstance
void release(ConsumerImpl release);
- boolean resend();
-
void delete();
boolean isDeleted();
Modified: qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java?rev=1767917&r1=1767916&r2=1767917&view=diff
==============================================================================
--- qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java (original)
+++ qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java Thu Nov 3 16:03:13 2016
@@ -369,8 +369,6 @@ public interface Queue<X extends Queue<X
void incrementUnackedMsgCount(QueueEntry entry);
- boolean resend(QueueEntry entry, QueueConsumer<?> consumer);
-
List<? extends QueueEntry> getMessagesOnTheQueue();
List<Long> getMessagesOnTheQueue(int num);
Modified: qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java?rev=1767917&r1=1767916&r2=1767917&view=diff
==============================================================================
--- qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java (original)
+++ qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java Thu Nov 3 16:03:13 2016
@@ -115,7 +115,6 @@ import org.apache.qpid.server.util.MapVa
import org.apache.qpid.server.util.ServerScopedRuntimeException;
import org.apache.qpid.server.util.StateChangeListener;
import org.apache.qpid.server.virtualhost.VirtualHostUnavailableException;
-import org.apache.qpid.transport.TransportException;
public abstract class AbstractQueue<X extends AbstractQueue<X>>
extends AbstractConfiguredObject<X>
@@ -166,8 +165,6 @@ public abstract class AbstractQueue<X ex
private final AtomicInteger _activeSubscriberCount = new AtomicInteger();
- private final AtomicLong _totalMessagesReceived = new AtomicLong();
-
private final AtomicLong _dequeueCount = new AtomicLong();
private final AtomicLong _dequeueSize = new AtomicLong();
private final AtomicLong _enqueueCount = new AtomicLong();
@@ -224,7 +221,6 @@ public abstract class AbstractQueue<X ex
private volatile long _estimatedAverageMessageHeaderSize;
- private AtomicInteger _deliveredMessages = new AtomicInteger();
private AtomicBoolean _stopped = new AtomicBoolean(false);
private final Set<AMQSessionModel> _blockedChannels = new ConcurrentSkipListSet<AMQSessionModel>();
@@ -1113,8 +1109,6 @@ public abstract class AbstractQueue<X ex
incrementQueueCount();
incrementQueueSize(message);
- _totalMessagesReceived.incrementAndGet();
-
if(_recovering.get() != RECOVERED)
{
_enqueuingWhileRecovering.incrementAndGet();
@@ -1155,9 +1149,6 @@ public abstract class AbstractQueue<X ex
{
incrementQueueCount();
incrementQueueSize(message);
-
- _totalMessagesReceived.incrementAndGet();
-
doEnqueue(message, null, enqueueRecord);
}
@@ -1254,125 +1245,6 @@ public abstract class AbstractQueue<X ex
}
}
- /**
- * iterate over consumers and if any is at the end of the queue and can deliver this message,
- * then deliver the message
- */
- private void tryDeliverStraightThrough(final QueueEntry entry)
- {
- try
- {
- ConsumerNode node = _consumerList.getMarkedNode();
- ConsumerNode nextNode = node.findNext();
- if (nextNode == null)
- {
- nextNode = _consumerList.getHead().findNext();
- }
- while (nextNode != null)
- {
- if (_consumerList.updateMarkedNode(node, nextNode))
- {
- break;
- }
- else
- {
- node = _consumerList.getMarkedNode();
- nextNode = node.findNext();
- if (nextNode == null)
- {
- nextNode = _consumerList.getHead().findNext();
- }
- }
- }
- // always do one extra loop after we believe we've finished
- // this catches the case where we *just* miss an update
- int loops = 2;
-
- while (entry.isAvailable() && loops != 0)
- {
- if (nextNode == null)
- {
- loops--;
- nextNode = _consumerList.getHead();
- }
- else
- {
- // if consumer at end, and active, offer
- final QueueConsumer<?> sub = nextNode.getConsumer();
-
- if(sub.getPriority() == Integer.MAX_VALUE)
- {
- deliverToConsumer(sub, entry);
- }
-
- }
- nextNode = nextNode.findNext();
-
- }
- }
- catch (ConnectionScopedRuntimeException | TransportException e)
- {
- String errorMessage = "Suppressing " + e.getClass().getSimpleName() +
- " during straight through delivery, as this" +
- " can only indicate an issue with a consumer.";
- if(_logger.isDebugEnabled())
- {
- _logger.debug(errorMessage, e);
- }
- else
- {
- _logger.info(errorMessage + ' ' + e.getMessage());
- }
- }
- }
-
- private void deliverToConsumer(final QueueConsumer<?> sub, final QueueEntry entry)
- {
-
- if(sub.trySendLock())
- {
- try
- {
- // get available queue entry first in order to avoid referring old deleted queue entry in sub._queueContext._lastSeen
- if ((getNextAvailableEntry(sub) == entry)
- && !sub.isSuspended()
- && sub.hasInterest(entry)
- && mightAssign(sub, entry)
- && !sub.wouldSuspend(entry))
- {
-
- MessageReference messageReference = null;
- try
- {
-
- if ((sub.acquires() && !assign(sub, entry))
- || (!sub.acquires() && (messageReference = entry.newMessageReference()) == null))
- {
- // restore credit here that would have been taken away by wouldSuspend since we didn't manage
- // to acquire the entry for this consumer
- sub.restoreCredit(entry);
- }
- else
- {
- deliverMessage(sub, entry, false, true);
- }
- }
- finally
- {
- if (messageReference != null)
- {
- messageReference.release();
- }
- }
- }
- }
- finally
- {
- sub.releaseSendLock();
- }
- }
- }
-
private boolean assign(final QueueConsumer<?> sub, final QueueEntry entry)
{
if(_messageGroupManager == null)
@@ -1435,22 +1307,6 @@ public abstract class AbstractQueue<X ex
getAtomicQueueCount().incrementAndGet();
}
- private void deliverMessage(final QueueConsumer<?> sub,
- final QueueEntry entry,
- boolean batch,
- final boolean updateLastSeen)
- {
- if(updateLastSeen)
- {
- setLastSeenEntry(sub, entry);
- }
-
- _deliveredMessages.incrementAndGet();
-
- sub.send(entry, batch);
- }
-
-
private void setLastSeenEntry(final QueueConsumer<?> sub, final QueueEntry entry)
{
QueueContext subContext = sub.getQueueContext();
@@ -1507,13 +1363,7 @@ public abstract class AbstractQueue<X ex
{
decrementQueueCount();
decrementQueueSize(entry);
- if (entry.acquiredByConsumer())
- {
- _deliveredMessages.decrementAndGet();
- }
-
checkCapacity();
-
}
private void decrementQueueSize(final QueueEntry entry)
@@ -1535,31 +1385,6 @@ public abstract class AbstractQueue<X ex
_dequeueCount.incrementAndGet();
}
- public boolean resend(final QueueEntry entry, final QueueConsumer<?> consumer)
- {
- /* TODO : This is wrong as the consumer may be suspended, we should instead change the state of the message
- entry to resend and move back the consumer pointer. */
-
- consumer.getSendLock();
- try
- {
- if (!consumer.isClosed())
- {
- deliverMessage(consumer, entry, false, false);
- return true;
- }
- else
- {
- return false;
- }
- }
- finally
- {
- consumer.releaseSendLock();
- }
- }
-
-
public int getConsumerCount()
{
@@ -1592,24 +1417,6 @@ public abstract class AbstractQueue<X ex
return getAtomicQueueSize().get();
}
- public int getUndeliveredMessageCount()
- {
- int count = getQueueDepthMessages() - _deliveredMessages.get();
- if (count < 0)
- {
- return 0;
- }
- else
- {
- return count;
- }
- }
-
- public long getReceivedMessageCount()
- {
- return _totalMessagesReceived.get();
- }
-
@Override
public long getOldestMessageArrivalTime()
{
@@ -2118,26 +1925,23 @@ public abstract class AbstractQueue<X ex
}
}
- boolean deliverSingleMessage(QueueConsumer<?> sub)
+ MessageContainer deliverSingleMessage(QueueConsumer<?> sub)
{
- boolean atTail = false;
boolean queueEmpty = false;
- boolean deliveryAttempted = false;
+ MessageContainer messageContainer = null;
sub.getSendLock();
try
{
if (!sub.isSuspended())
{
- atTail = attemptDelivery(sub, true);
- deliveryAttempted = true;
- if (atTail && getNextAvailableEntry(sub) == null)
+ messageContainer = attemptDelivery(sub);
+ if (messageContainer == null && getNextAvailableEntry(sub) == null)
{
queueEmpty = true;
}
}
-
- if (!deliveryAttempted )
+ else
{
// avoid referring old deleted queue entry in sub._queueContext._lastSeen
getNextAvailableEntry(sub);
@@ -2163,7 +1967,20 @@ public abstract class AbstractQueue<X ex
{
advanceAllConsumers();
}
- return atTail;
+ return messageContainer;
+ }
+
+ public static class MessageContainer
+ {
+ public final MessageInstance _messageInstance;
+ public final MessageReference<?> _messageReference;
+
+ public MessageContainer(final MessageInstance messageInstance,
+ final MessageReference<?> messageReference)
+ {
+ _messageInstance = messageInstance;
+ _messageReference = messageReference;
+ }
}
/**
@@ -2173,13 +1990,11 @@ public abstract class AbstractQueue<X ex
*
*
* @param sub the consumer
- * @param batch true if processing can be batched
* @return true if we have completed all possible deliveries for this sub.
*/
- private boolean attemptDelivery(QueueConsumer<?> sub, boolean batch)
+ private MessageContainer attemptDelivery(QueueConsumer<?> sub)
{
- boolean atTail = false;
-
+ MessageContainer messageContainer = null;
// avoid referring old deleted queue entry in sub._queueContext._lastSeen
QueueEntry node = getNextAvailableEntry(sub);
boolean subActive = sub.isActive() && !sub.isSuspended();
@@ -2212,7 +2027,8 @@ public abstract class AbstractQueue<X ex
}
else
{
- deliverMessage(sub, node, batch, true);
+ setLastSeenEntry(sub, node);
+ messageContainer = new MessageContainer(node, messageReference);
}
}
finally
@@ -2232,11 +2048,9 @@ public abstract class AbstractQueue<X ex
}
}
-
}
- atTail = (node == null) || (getNextAvailableEntry(sub) == null);
}
- return atTail || !subActive;
+ return messageContainer;
}
private boolean noHigherPriorityWithCredit(final QueueConsumer<?> sub)
@@ -2334,7 +2148,12 @@ public abstract class AbstractQueue<X ex
boolean hasAvailableMessages(final QueueConsumer queueConsumer)
{
- return getNextAvailableEntry(queueConsumer) != null;
+ boolean hasAvailableMessages = getNextAvailableEntry(queueConsumer) != null;
+ if (!hasAvailableMessages)
+ {
+ queueConsumer.queueEmpty();
+ }
+ return hasAvailableMessages;
}
public void checkMessageStatus()
Modified: qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumer.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumer.java?rev=1767917&r1=1767916&r2=1767917&view=diff
==============================================================================
--- qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumer.java (original)
+++ qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumer.java Thu Nov 3 16:03:13 2016
@@ -36,16 +36,12 @@ public interface QueueConsumer<X extends
void restoreCredit(QueueEntry entry);
- void send(QueueEntry entry, boolean batch);
-
void acquisitionRemoved(QueueEntry node);
void queueDeleted();
Queue<?> getQueue();
- boolean resend(QueueEntry e);
-
MessageInstance.StealableConsumerAcquiredState<X> getOwningState();
QueueContext getQueueContext();
Modified: qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java?rev=1767917&r1=1767916&r2=1767917&view=diff
==============================================================================
--- qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java (original)
+++ qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java Thu Nov 3 16:03:13 2016
@@ -389,19 +389,15 @@ class QueueConsumerImpl
}
@Override
- public void pullMessage()
+ public AbstractQueue.MessageContainer pullMessage()
{
- _queue.deliverSingleMessage(this);
- }
-
- public boolean resend(final QueueEntry entry)
- {
- boolean messageWasResent = getQueue().resend(entry, this);
- if (messageWasResent)
+ AbstractQueue.MessageContainer messageContainer = _queue.deliverSingleMessage(this);
+ if (messageContainer != null)
{
- _target.processPending();
+ _deliveredCount.incrementAndGet();
+ _deliveredBytes.addAndGet(messageContainer._messageInstance.getMessage().getSize());
}
- return messageWasResent;
+ return messageContainer;
}
public final long getConsumerNumber()
@@ -578,13 +574,6 @@ class QueueConsumerImpl
return _deliveredCount.longValue();
}
- public final void send(final QueueEntry entry, final boolean batch)
- {
- _deliveredCount.incrementAndGet();
- long size = _target.send(this, entry, batch);
- _deliveredBytes.addAndGet(size);
- }
-
@Override
public void acquisitionRemoved(final QueueEntry node)
{
Modified: qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java?rev=1767917&r1=1767916&r2=1767917&view=diff
==============================================================================
--- qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java (original)
+++ qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java Thu Nov 3 16:03:13 2016
@@ -721,17 +721,6 @@ public abstract class QueueEntryImpl imp
}
@Override
- public boolean resend()
- {
- QueueConsumer sub = getDeliveredConsumer();
- if(sub != null)
- {
- return sub.resend(this);
- }
- return false;
- }
-
- @Override
public TransactionLogResource getOwningResource()
{
return getQueue();
Modified: qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractSystemMessageSource.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractSystemMessageSource.java?rev=1767917&r1=1767916&r2=1767917&view=diff
==============================================================================
--- qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractSystemMessageSource.java (original)
+++ qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractSystemMessageSource.java Thu Nov 3 16:03:13 2016
@@ -40,6 +40,7 @@ import org.apache.qpid.server.message.Se
import org.apache.qpid.server.message.internal.InternalMessage;
import org.apache.qpid.server.model.NamedAddressSpace;
import org.apache.qpid.server.protocol.AMQSessionModel;
+import org.apache.qpid.server.queue.AbstractQueue;
import org.apache.qpid.server.store.MessageDurability;
import org.apache.qpid.server.store.MessageEnqueueRecord;
import org.apache.qpid.server.store.TransactionLogResource;
@@ -116,15 +117,12 @@ public abstract class AbstractSystemMess
Collections.synchronizedList(new ArrayList<PropertiesMessageInstance>());
private final ConsumerTarget _target;
private final String _name;
- private final StateChangeListener<ConsumerTarget, ConsumerTarget.State> _targetChangeListener =
- new Consumer.TargetChangeListener();
public Consumer(final String consumerName, ConsumerTarget target)
{
_name = consumerName;
_target = target;
- target.addStateListener(_targetChangeListener);
}
@Override
@@ -146,11 +144,9 @@ public abstract class AbstractSystemMess
}
@Override
- public void pullMessage()
+ public AbstractQueue.MessageContainer pullMessage()
{
- AMQPConnection<?> connection = _target.getSessionModel().getAMQPConnection();
_target.getSendLock();
-
try
{
if (!_queue.isEmpty())
@@ -159,7 +155,7 @@ public abstract class AbstractSystemMess
if (!_target.isSuspended() && _target.allocateCredit(propertiesMessageInstance.getMessage()))
{
_queue.remove(0);
- _target.send(this, propertiesMessageInstance, false);
+ return new AbstractQueue.MessageContainer(propertiesMessageInstance, null);
}
}
}
@@ -167,8 +163,7 @@ public abstract class AbstractSystemMess
{
_target.releaseSendLock();
}
-
-
+ return null;
}
@Override
@@ -276,66 +271,9 @@ public abstract class AbstractSystemMess
public void send(final InternalMessage response)
{
- _target.getSendLock();
- try
- {
- final PropertiesMessageInstance
- responseEntry = new PropertiesMessageInstance(this, response);
- if (_queue.isEmpty() && _target.allocateCredit(response))
- {
- _target.send(this, responseEntry, false);
- }
- else
- {
- _queue.add(responseEntry);
- }
- }
- finally
- {
- _target.releaseSendLock();
- }
- }
-
- private class TargetChangeListener implements StateChangeListener<ConsumerTarget, ConsumerTarget.State>
- {
- @Override
- public void stateChanged(final ConsumerTarget object,
- final ConsumerTarget.State oldState,
- final ConsumerTarget.State newState)
- {
- if (newState == ConsumerTarget.State.ACTIVE)
- {
- deliverMessages();
- }
- }
- }
-
- private void deliverMessages()
- {
- _target.getSendLock();
- try
- {
- while (!_queue.isEmpty())
- {
-
- final PropertiesMessageInstance propertiesMessageInstance = _queue.get(0);
- if (!_target.isSuspended() && _target.allocateCredit(propertiesMessageInstance.getMessage()))
- {
- _queue.remove(0);
- _target.send(this, propertiesMessageInstance, false);
- }
- else
- {
- break;
- }
- }
- }
- finally
- {
- _target.releaseSendLock();
- }
+ _queue.add(new PropertiesMessageInstance(this, response));
+ _target.notifyWork();
}
-
}
class PropertiesMessageInstance implements MessageInstance
@@ -525,12 +463,6 @@ public abstract class AbstractSystemMess
}
@Override
- public boolean resend()
- {
- return false;
- }
-
- @Override
public void delete()
{
_isDeleted = true;
Copied: qpid/java/branches/remove-queue-runner/broker-core/src/test/java/org/apache/qpid/server/consumer/TestConsumerTarget.java (from r1767916, qpid/java/branches/remove-queue-runner/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java)
URL: http://svn.apache.org/viewvc/qpid/java/branches/remove-queue-runner/broker-core/src/test/java/org/apache/qpid/server/consumer/TestConsumerTarget.java?p2=qpid/java/branches/remove-queue-runner/broker-core/src/test/java/org/apache/qpid/server/consumer/TestConsumerTarget.java&p1=qpid/java/branches/remove-queue-runner/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java&r1=1767916&r2=1767917&rev=1767917&view=diff
==============================================================================
--- qpid/java/branches/remove-queue-runner/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java (original)
+++ qpid/java/branches/remove-queue-runner/broker-core/src/test/java/org/apache/qpid/server/consumer/TestConsumerTarget.java Thu Nov 3 16:03:13 2016
@@ -45,12 +45,13 @@ import org.apache.qpid.server.model.Queu
import org.apache.qpid.server.model.Session;
import org.apache.qpid.server.protocol.AMQSessionModel;
import org.apache.qpid.server.protocol.ConsumerListener;
+import org.apache.qpid.server.queue.AbstractQueue;
import org.apache.qpid.server.transport.AMQPConnection;
import org.apache.qpid.server.util.Action;
import org.apache.qpid.server.util.StateChangeListener;
import org.apache.qpid.transport.network.Ticker;
-public class MockConsumer implements ConsumerTarget
+public class TestConsumerTarget implements ConsumerTarget
{
private final List<String> _messageIds;
@@ -59,7 +60,7 @@ public class MockConsumer implements Con
private Queue<?> queue = null;
private StateChangeListener<ConsumerTarget, State> _listener = null;
private State _state = State.ACTIVE;
- private ArrayList<MessageInstance> messages = new ArrayList<MessageInstance>();
+ private ArrayList<MessageInstance> _messages = new ArrayList<MessageInstance>();
private final Lock _stateChangeLock = new ReentrantLock();
private boolean _isActive = true;
@@ -67,12 +68,12 @@ public class MockConsumer implements Con
private boolean _messageSent;
private MockSessionModel _sessionModel = new MockSessionModel();
- public MockConsumer()
+ public TestConsumerTarget()
{
_messageIds = null;
}
- public MockConsumer(List<String> messageIds)
+ public TestConsumerTarget(List<String> messageIds)
{
_messageIds = messageIds;
}
@@ -143,11 +144,11 @@ public class MockConsumer implements Con
{
_messageSent = true;
long size = entry.getMessage().getSize();
- if (messages.contains(entry))
+ if (_messages.contains(entry))
{
entry.setRedelivered();
}
- messages.add(entry);
+ _messages.add(entry);
return size;
}
@@ -237,16 +238,14 @@ public class MockConsumer implements Con
@Override
public boolean processPending()
{
- _consumer.pullMessage();
- if(_messageSent)
- {
- _messageSent = false;
- return true;
- }
- else
+ AbstractQueue.MessageContainer messageContainter = _consumer.pullMessage();
+ if (messageContainter == null)
{
return false;
}
+
+ send(_consumer, messageContainter._messageInstance, false);
+ return true;
}
@Override
@@ -257,7 +256,7 @@ public class MockConsumer implements Con
public ArrayList<MessageInstance> getMessages()
{
- return messages;
+ return _messages;
}
Modified: qpid/java/branches/remove-queue-runner/broker-core/src/test/java/org/apache/qpid/server/queue/AbstractQueueTestBase.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/remove-queue-runner/broker-core/src/test/java/org/apache/qpid/server/queue/AbstractQueueTestBase.java?rev=1767917&r1=1767916&r2=1767917&view=diff
==============================================================================
--- qpid/java/branches/remove-queue-runner/broker-core/src/test/java/org/apache/qpid/server/queue/AbstractQueueTestBase.java (original)
+++ qpid/java/branches/remove-queue-runner/broker-core/src/test/java/org/apache/qpid/server/queue/AbstractQueueTestBase.java Thu Nov 3 16:03:13 2016
@@ -48,7 +48,7 @@ import org.slf4j.LoggerFactory;
import org.apache.qpid.exchange.ExchangeDefaults;
import org.apache.qpid.server.consumer.ConsumerImpl;
-import org.apache.qpid.server.consumer.MockConsumer;
+import org.apache.qpid.server.consumer.TestConsumerTarget;
import org.apache.qpid.server.exchange.DirectExchange;
import org.apache.qpid.server.message.AMQMessageHeader;
import org.apache.qpid.server.message.InstanceProperties;
@@ -77,7 +77,7 @@ abstract class AbstractQueueTestBase ext
private String _owner = "owner";
private String _routingKey = "routing key";
private DirectExchange _exchange;
- private MockConsumer _consumerTarget = new MockConsumer();
+ private TestConsumerTarget _consumerTarget = new TestConsumerTarget();
private QueueConsumer<?> _consumer;
private Map<String,Object> _arguments = Collections.emptyMap();
@@ -369,7 +369,7 @@ abstract class AbstractQueueTestBase ext
{
ServerMessage messageA = createMessage(new Long(24));
final CountDownLatch sendIndicator = new CountDownLatch(1);
- _consumerTarget = new MockConsumer()
+ _consumerTarget = new TestConsumerTarget()
{
@Override
@@ -512,8 +512,8 @@ abstract class AbstractQueueTestBase ext
ServerMessage messageA = createMessage(new Long(24));
ServerMessage messageB = createMessage(new Long(25));
- MockConsumer target1 = new MockConsumer();
- MockConsumer target2 = new MockConsumer();
+ TestConsumerTarget target1 = new TestConsumerTarget();
+ TestConsumerTarget target2 = new TestConsumerTarget();
QueueConsumer consumer1 = (QueueConsumer) _queue.addConsumer(target1, null, messageA.getClass(), "test",
@@ -578,7 +578,7 @@ abstract class AbstractQueueTestBase ext
messageA, _consumer.getQueueContext().getLastSeenEntry().getMessage());
// Check we cannot add a second subscriber to the queue
- MockConsumer subB = new MockConsumer();
+ TestConsumerTarget subB = new TestConsumerTarget();
Exception ex = null;
try
{
@@ -615,32 +615,6 @@ abstract class AbstractQueueTestBase ext
assertNotNull(ex);
}
-
- public void testResend() throws Exception
- {
- Long id = new Long(26);
- ServerMessage message = createMessage(id);
-
- _consumer = (QueueConsumer<?>) _queue.addConsumer(_consumerTarget, null, message.getClass(), "test",
- EnumSet.of(ConsumerImpl.Option.ACQUIRES, ConsumerImpl.Option.SEES_REQUEUES),
- 0);
-
- _queue.enqueue(message, new Action<MessageInstance>()
- {
- @Override
- public void performAction(final MessageInstance object)
- {
- QueueEntryImpl entry = (QueueEntryImpl) object;
- entry.setRedelivered();
- _consumer.resend(entry);
-
- }
- }, null);
-
-
-
- }
-
public void testGetFirstMessageId() throws Exception
{
// Create message
@@ -1105,7 +1079,7 @@ abstract class AbstractQueueTestBase ext
_queue = queue;
}
- public MockConsumer getConsumer()
+ public TestConsumerTarget getConsumer()
{
return _consumerTarget;
}
@@ -1222,7 +1196,7 @@ abstract class AbstractQueueTestBase ext
return _exchange;
}
- public MockConsumer getConsumerTarget()
+ public TestConsumerTarget getConsumerTarget()
{
return _consumerTarget;
}
Modified: qpid/java/branches/remove-queue-runner/broker-core/src/test/java/org/apache/qpid/server/queue/MockMessageInstance.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/remove-queue-runner/broker-core/src/test/java/org/apache/qpid/server/queue/MockMessageInstance.java?rev=1767917&r1=1767916&r2=1767917&view=diff
==============================================================================
--- qpid/java/branches/remove-queue-runner/broker-core/src/test/java/org/apache/qpid/server/queue/MockMessageInstance.java (original)
+++ qpid/java/branches/remove-queue-runner/broker-core/src/test/java/org/apache/qpid/server/queue/MockMessageInstance.java Thu Nov 3 16:03:13 2016
@@ -162,12 +162,6 @@ public class MockMessageInstance impleme
{
}
- @Override
- public boolean resend()
- {
- return false;
- }
-
public void setRedelivered()
{
Modified: qpid/java/branches/remove-queue-runner/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueTest.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/remove-queue-runner/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueTest.java?rev=1767917&r1=1767916&r2=1767917&view=diff
==============================================================================
--- qpid/java/branches/remove-queue-runner/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueTest.java (original)
+++ qpid/java/branches/remove-queue-runner/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueTest.java Thu Nov 3 16:03:13 2016
@@ -30,7 +30,7 @@ import java.util.concurrent.TimeUnit;
import org.apache.qpid.server.consumer.ConsumerImpl;
import org.apache.qpid.server.consumer.ConsumerTarget;
-import org.apache.qpid.server.consumer.MockConsumer;
+import org.apache.qpid.server.consumer.TestConsumerTarget;
import org.apache.qpid.server.message.MessageInstance;
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.model.LifetimePolicy;
@@ -74,7 +74,7 @@ public class StandardQueueTest extends A
final StandardQueueImpl queue = new StandardQueueImpl(queueAttributes, getVirtualHost());
queue.open();
//verify adding an active consumer increases the count
- final MockConsumer consumer1 = new MockConsumer();
+ final TestConsumerTarget consumer1 = new TestConsumerTarget();
consumer1.setActive(true);
consumer1.setState(ConsumerTarget.State.ACTIVE);
assertEquals("Unexpected active consumer count", 0, queue.getConsumerCountWithCredit());
@@ -87,7 +87,7 @@ public class StandardQueueTest extends A
assertEquals("Unexpected active consumer count", 1, queue.getConsumerCountWithCredit());
//verify adding an inactive consumer doesn't increase the count
- final MockConsumer consumer2 = new MockConsumer();
+ final TestConsumerTarget consumer2 = new TestConsumerTarget();
consumer2.setActive(false);
consumer2.setState(ConsumerTarget.State.SUSPENDED);
assertEquals("Unexpected active consumer count", 1, queue.getConsumerCountWithCredit());
@@ -143,7 +143,7 @@ public class StandardQueueTest extends A
AbstractQueue queue = new DequeuedQueue(getVirtualHost());
queue.create();
// create a consumer
- MockConsumer consumer = new MockConsumer();
+ TestConsumerTarget consumer = new TestConsumerTarget();
// register consumer
queue.addConsumer(consumer,
@@ -192,7 +192,7 @@ public class StandardQueueTest extends A
final CountDownLatch latch = new CountDownLatch(messageNumber -1);
// create a consumer
- MockConsumer consumer = new MockConsumer()
+ TestConsumerTarget consumer = new TestConsumerTarget()
{
@Override
Modified: qpid/java/branches/remove-queue-runner/broker-core/src/test/java/org/apache/qpid/server/security/TrustStoreMessageSourceTest.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/remove-queue-runner/broker-core/src/test/java/org/apache/qpid/server/security/TrustStoreMessageSourceTest.java?rev=1767917&r1=1767916&r2=1767917&view=diff
==============================================================================
--- qpid/java/branches/remove-queue-runner/broker-core/src/test/java/org/apache/qpid/server/security/TrustStoreMessageSourceTest.java (original)
+++ qpid/java/branches/remove-queue-runner/broker-core/src/test/java/org/apache/qpid/server/security/TrustStoreMessageSourceTest.java Thu Nov 3 16:03:13 2016
@@ -19,6 +19,7 @@
package org.apache.qpid.server.security;
import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertNotNull;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyBoolean;
import static org.mockito.Mockito.mock;
@@ -42,11 +43,14 @@ import org.apache.qpid.server.consumer.C
import org.apache.qpid.server.consumer.ConsumerTarget;
import org.apache.qpid.server.message.MessageInstance;
import org.apache.qpid.server.message.ServerMessage;
+import org.apache.qpid.server.model.Consumer;
import org.apache.qpid.server.model.State;
import org.apache.qpid.server.model.TrustStore;
import org.apache.qpid.server.model.VirtualHost;
+import org.apache.qpid.server.queue.AbstractQueue;
import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.store.TestMemoryMessageStore;
+import org.apache.qpid.server.virtualhost.AbstractSystemMessageSource;
import org.apache.qpid.test.utils.QpidTestCase;
@@ -78,11 +82,10 @@ public class TrustStoreMessageSourceTest
final ConsumerTarget target = mock(ConsumerTarget.class);
when(target.allocateCredit(any(ServerMessage.class))).thenReturn(true);
- _trustStoreMessageSource.addConsumer(target, null, ServerMessage.class, getTestName(), options, 0);
-
- ArgumentCaptor<MessageInstance> argumentCaptor = ArgumentCaptor.forClass(MessageInstance.class);
- verify(target).send(any(ConsumerImpl.class), argumentCaptor.capture(), anyBoolean());
- final ServerMessage message = argumentCaptor.getValue().getMessage();
+ ConsumerImpl consumer = _trustStoreMessageSource.addConsumer(target, null, ServerMessage.class, getTestName(), options, 0);
+ final AbstractQueue.MessageContainer messageContainer = consumer.pullMessage();
+ assertNotNull("Could not pull message of TrustStore", messageContainer);
+ final ServerMessage message = messageContainer._messageInstance.getMessage();
assertCertificates(getCertificatesFromMessage(message));
}
Modified: qpid/java/branches/remove-queue-runner/broker-core/src/test/java/org/apache/qpid/server/virtualhost/VirtualHostPropertiesNodeTest.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/remove-queue-runner/broker-core/src/test/java/org/apache/qpid/server/virtualhost/VirtualHostPropertiesNodeTest.java?rev=1767917&r1=1767916&r2=1767917&view=diff
==============================================================================
--- qpid/java/branches/remove-queue-runner/broker-core/src/test/java/org/apache/qpid/server/virtualhost/VirtualHostPropertiesNodeTest.java (original)
+++ qpid/java/branches/remove-queue-runner/broker-core/src/test/java/org/apache/qpid/server/virtualhost/VirtualHostPropertiesNodeTest.java Thu Nov 3 16:03:13 2016
@@ -31,6 +31,7 @@ import org.apache.qpid.server.consumer.C
import org.apache.qpid.server.message.MessageInstance;
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.model.VirtualHost;
+import org.apache.qpid.server.queue.AbstractQueue;
import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.store.TestMemoryMessageStore;
import org.apache.qpid.test.utils.QpidTestCase;
@@ -56,7 +57,8 @@ public class VirtualHostPropertiesNodeTe
final ConsumerTarget target = mock(ConsumerTarget.class);
when(target.allocateCredit(any(ServerMessage.class))).thenReturn(true);
- _virtualHostPropertiesNode.addConsumer(target, null, ServerMessage.class, getTestName(), options, 0);
- verify(target).send(any(ConsumerImpl.class), any(MessageInstance.class), anyBoolean());
+ ConsumerImpl consumer = _virtualHostPropertiesNode.addConsumer(target, null, ServerMessage.class, getTestName(), options, 0);
+ final AbstractQueue.MessageContainer messageContainer = consumer.pullMessage();
+ assertNotNull("Could not pull message from VirtualHostPropertyNode", messageContainer);
}
}
Modified: qpid/java/branches/remove-queue-runner/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/remove-queue-runner/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java?rev=1767917&r1=1767916&r2=1767917&view=diff
==============================================================================
--- qpid/java/branches/remove-queue-runner/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java (original)
+++ qpid/java/branches/remove-queue-runner/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java Thu Nov 3 16:03:13 2016
@@ -98,6 +98,8 @@ import org.apache.qpid.server.protocol.C
import org.apache.qpid.server.protocol.ConsumerListener;
import org.apache.qpid.server.protocol.PublishAuthorisationCache;
import org.apache.qpid.server.queue.QueueArgumentsConverter;
+import org.apache.qpid.server.queue.QueueConsumer;
+import org.apache.qpid.server.queue.QueueEntry;
import org.apache.qpid.server.security.SecurityToken;
import org.apache.qpid.server.store.MessageHandle;
import org.apache.qpid.server.store.MessageStore;
@@ -1066,6 +1068,29 @@ public class AMQChannel
return false;
}
+ private boolean resendInternal(MessageInstance messageInstance)
+ {
+ ConsumerImpl subscriber = messageInstance.getDeliveredConsumer();
+ if (subscriber != null && subscriber.getSessionModel() == this)
+ {
+ ConsumerTarget target = subscriber.getTarget();
+ target.getSendLock();
+ try
+ {
+ if (target.getState() != ConsumerTarget.State.CLOSED)
+ {
+ target.send(subscriber, messageInstance, false);
+ return true;
+ }
+ }
+ finally
+ {
+ target.releaseSendLock();
+ }
+ }
+ return false;
+ }
+
/**
* Called to resend all outstanding unacknowledged messages to this same channel.
*
@@ -1116,7 +1141,7 @@ public class AMQChannel
// all messages in the unacked map as redelivered.
message.setRedelivered();
- if (!message.resend())
+ if (!resendInternal(message))
{
msgToRequeue.put(deliveryTag, message);
}
@@ -1326,7 +1351,7 @@ public class AMQChannel
}
else
{
- entry.resend();
+ resendInternal(entry);
}
}
_resendList.clear();
@@ -2417,7 +2442,14 @@ public class AMQChannel
_logger.debug("RECV[" + _channelId + "] BasicRecover[" + " requeue: " + requeue + " sync: " + sync + " ]");
}
- resend();
+ if (requeue)
+ {
+ requeue();
+ }
+ else
+ {
+ resend();
+ }
if (sync)
{
@@ -3688,11 +3720,6 @@ public class AMQChannel
};
rollback(task);
-
- //Now resend all the unacknowledged messages back to the original subscribers.
- //(Must be done after the TxnRollback-ok response).
- // Why, are we not allowed to send messages back to client before the ok method?
- resend();
}
@Override
Modified: qpid/java/branches/remove-queue-runner/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNode.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/remove-queue-runner/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNode.java?rev=1767917&r1=1767916&r2=1767917&view=diff
==============================================================================
--- qpid/java/branches/remove-queue-runner/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNode.java (original)
+++ qpid/java/branches/remove-queue-runner/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNode.java Thu Nov 3 16:03:13 2016
@@ -1233,12 +1233,6 @@ class ManagementNode implements MessageS
}
@Override
- public boolean resend()
- {
- return false;
- }
-
- @Override
public void delete()
{
Modified: qpid/java/branches/remove-queue-runner/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNodeConsumer.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/remove-queue-runner/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNodeConsumer.java?rev=1767917&r1=1767916&r2=1767917&view=diff
==============================================================================
--- qpid/java/branches/remove-queue-runner/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNodeConsumer.java (original)
+++ qpid/java/branches/remove-queue-runner/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNodeConsumer.java Thu Nov 3 16:03:13 2016
@@ -36,11 +36,11 @@ import org.apache.qpid.server.message.Se
import org.apache.qpid.server.message.internal.InternalMessage;
import org.apache.qpid.server.model.NamedAddressSpace;
import org.apache.qpid.server.protocol.AMQSessionModel;
+import org.apache.qpid.server.queue.AbstractQueue;
import org.apache.qpid.server.security.SecurityToken;
import org.apache.qpid.server.store.StorableMessageMetaData;
import org.apache.qpid.server.txn.ServerTransaction;
import org.apache.qpid.server.util.Action;
-import org.apache.qpid.server.util.StateChangeListener;
class ManagementNodeConsumer implements ConsumerImpl, MessageDestination
{
@@ -49,7 +49,6 @@ class ManagementNodeConsumer implements
private final List<ManagementResponse> _queue = Collections.synchronizedList(new ArrayList<ManagementResponse>());
private final ConsumerTarget _target;
private final String _name;
- private final StateChangeListener<ConsumerTarget, ConsumerTarget.State> _targetChangeListener = new TargetChangeListener();
public ManagementNodeConsumer(final String consumerName, final ManagementNode managementNode, ConsumerTarget target)
@@ -57,7 +56,6 @@ class ManagementNodeConsumer implements
_name = consumerName;
_managementNode = managementNode;
_target = target;
- target.addStateListener(_targetChangeListener);
}
@Override
@@ -73,9 +71,27 @@ class ManagementNodeConsumer implements
}
@Override
- public void pullMessage()
+ public AbstractQueue.MessageContainer pullMessage()
{
+ _target.getSendLock();
+ try
+ {
+ if (!_queue.isEmpty())
+ {
+ final ManagementResponse managementResponse = _queue.get(0);
+ if (!_target.isSuspended() && _target.allocateCredit(managementResponse.getMessage()))
+ {
+ _queue.remove(0);
+ return new AbstractQueue.MessageContainer(managementResponse, null);
+ }
+ }
+ }
+ finally
+ {
+ _target.releaseSendLock();
+ }
+ return null;
}
@Override
@@ -217,62 +233,8 @@ class ManagementNodeConsumer implements
void send(final InternalMessage response)
{
- _target.getSendLock();
- try
- {
- final ManagementResponse responseEntry = new ManagementResponse(this, response);
- if(_queue.isEmpty() && _target.allocateCredit(response))
- {
- _target.send(this, responseEntry, false);
- }
- else
- {
- _queue.add(responseEntry);
- }
- }
- finally
- {
- _target.releaseSendLock();
- }
- }
-
- private class TargetChangeListener implements StateChangeListener<ConsumerTarget, ConsumerTarget.State>
- {
- @Override
- public void stateChanged(final ConsumerTarget object,
- final ConsumerTarget.State oldState,
- final ConsumerTarget.State newState)
- {
- if(newState == ConsumerTarget.State.ACTIVE)
- {
- deliverMessages();
- }
- }
- }
-
- private void deliverMessages()
- {
- _target.getSendLock();
- try
- {
- while(!_queue.isEmpty())
- {
-
- final ManagementResponse managementResponse = _queue.get(0);
- if(!_target.isSuspended() && _target.allocateCredit(managementResponse.getMessage()))
- {
- _queue.remove(0);
- _target.send(this, managementResponse, false);
- }
- else
- {
- break;
- }
- }
- }
- finally
- {
- _target.releaseSendLock();
- }
+ final ManagementResponse responseEntry = new ManagementResponse(this, response);
+ _queue.add(responseEntry);
+ _target.notifyWork();
}
}
Modified: qpid/java/branches/remove-queue-runner/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementResponse.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/remove-queue-runner/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementResponse.java?rev=1767917&r1=1767916&r2=1767917&view=diff
==============================================================================
--- qpid/java/branches/remove-queue-runner/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementResponse.java (original)
+++ qpid/java/branches/remove-queue-runner/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementResponse.java Thu Nov 3 16:03:13 2016
@@ -219,12 +219,6 @@ class ManagementResponse implements Mess
}
@Override
- public boolean resend()
- {
- return false;
- }
-
- @Override
public void delete()
{
_isDeleted = true;
Modified: qpid/java/branches/remove-queue-runner/broker-plugins/websocket/src/main/java/org/apache/qpid/server/transport/websocket/WebSocketProvider.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/remove-queue-runner/broker-plugins/websocket/src/main/java/org/apache/qpid/server/transport/websocket/WebSocketProvider.java?rev=1767917&r1=1767916&r2=1767917&view=diff
==============================================================================
--- qpid/java/branches/remove-queue-runner/broker-plugins/websocket/src/main/java/org/apache/qpid/server/transport/websocket/WebSocketProvider.java (original)
+++ qpid/java/branches/remove-queue-runner/broker-plugins/websocket/src/main/java/org/apache/qpid/server/transport/websocket/WebSocketProvider.java Thu Nov 3 16:03:13 2016
@@ -81,7 +81,6 @@ class WebSocketProvider implements Accep
private final Protocol _defaultSupportedProtocolReply;
private final MultiVersionProtocolEngineFactory _factory;
private Server _server;
- private final long _outboundMessageBufferLimit;
WebSocketProvider(final Transport transport,
final SSLContext sslContext,
@@ -95,8 +94,6 @@ class WebSocketProvider implements Accep
_supported = supported;
_defaultSupportedProtocolReply = defaultSupportedProtocolReply;
- _outboundMessageBufferLimit = (long) _port.getContextValue(Long.class,
- AmqpPort.PORT_AMQP_OUTBOUND_MESSAGE_BUFFER_SIZE);
_factory = new MultiVersionProtocolEngineFactory(
_port.getParent(Broker.class),
_supported,
@@ -463,15 +460,6 @@ class WebSocketProvider implements Accep
}
@Override
- public void reserveOutboundMessageSpace(final long size)
- {
- if (_usedOutboundMessageSpace.addAndGet(size) > _outboundMessageBufferLimit)
- {
- // RG - TODO
- }
- }
-
- @Override
public String getTransportInfo()
{
return _connection.getProtocol();
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org