You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2016/11/15 10:51:41 UTC
svn commit: r1769773 - in /qpid/java/branches/remove-queue-runner: ./
broker-core/src/main/java/org/apache/qpid/server/consumer/
broker-core/src/main/java/org/apache/qpid/server/model/
broker-core/src/main/java/org/apache/qpid/server/protocol/ broker-c...
Author: rgodfrey
Date: Tue Nov 15 10:51:40 2016
New Revision: 1769773
URL: http://svn.apache.org/viewvc?rev=1769773&view=rev
Log:
Merged from trunk up to r1769383
Added:
qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueueEntryList.java
- copied unchanged from r1769383, qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueueEntryList.java
qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/QueueStatistics.java
- copied unchanged from r1769383, qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueStatistics.java
Removed:
qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryListBase.java
Modified:
qpid/java/branches/remove-queue-runner/ (props changed)
qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerTarget.java
qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java
qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/protocol/CapacityChecker.java
qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/LastValueQueueImpl.java
qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/LastValueQueueList.java
qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/OrderedQueueEntryList.java
qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/PriorityQueueList.java
qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java
qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryList.java
qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueueEntryList.java
qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueueImpl.java
qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/StandardQueueEntryList.java
qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/StandardQueueImpl.java
qpid/java/branches/remove-queue-runner/broker-core/src/test/java/org/apache/qpid/server/queue/QueueMessageRecoveryTest.java
qpid/java/branches/remove-queue-runner/broker-core/src/test/java/org/apache/qpid/server/queue/SelfValidatingSortedQueueEntryList.java
qpid/java/branches/remove-queue-runner/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueEntryListTest.java
qpid/java/branches/remove-queue-runner/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueTest.java
qpid/java/branches/remove-queue-runner/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java
qpid/java/branches/remove-queue-runner/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java
Propchange: qpid/java/branches/remove-queue-runner/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Tue Nov 15 10:51:40 2016
@@ -9,5 +9,5 @@
/qpid/branches/java-broker-vhost-refactor/java:1493674-1494547
/qpid/branches/java-network-refactor/qpid/java:805429-821809
/qpid/branches/qpid-2935/qpid/java:1061302-1072333
-/qpid/java/trunk:1767741-1769382
+/qpid/java/trunk:1767741-1769383
/qpid/trunk/qpid:796646-796653
Modified: qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerTarget.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerTarget.java?rev=1769773&r1=1769772&r2=1769773&view=diff
==============================================================================
--- qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerTarget.java (original)
+++ qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerTarget.java Tue Nov 15 10:51:40 2016
@@ -57,7 +57,7 @@ public interface ConsumerTarget
long getUnacknowledgedMessages();
- AMQSessionModel getSessionModel();
+ AMQSessionModel<?> getSessionModel();
long send(final ConsumerImpl consumer, MessageInstance entry, boolean batch);
Modified: qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java?rev=1769773&r1=1769772&r2=1769773&view=diff
==============================================================================
--- qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java (original)
+++ qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java Tue Nov 15 10:51:40 2016
@@ -367,12 +367,6 @@ public interface Queue<X extends Queue<X
void requeue(QueueEntry entry);
- void dequeue(QueueEntry entry);
-
- void decrementUnackedMsgCount(QueueEntry queueEntry);
-
- void incrementUnackedMsgCount(QueueEntry entry);
-
List<? extends QueueEntry> getMessagesOnTheQueue();
List<Long> getMessagesOnTheQueue(int num);
@@ -410,4 +404,6 @@ public interface Queue<X extends Queue<X
long getPotentialMemoryFootprint();
boolean isHeld(QueueEntry queueEntry, final long evaluationTime);
+
+ void checkCapacity();
}
Modified: qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/protocol/CapacityChecker.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/protocol/CapacityChecker.java?rev=1769773&r1=1769772&r2=1769773&view=diff
==============================================================================
--- qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/protocol/CapacityChecker.java (original)
+++ qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/protocol/CapacityChecker.java Tue Nov 15 10:51:40 2016
@@ -22,5 +22,5 @@ package org.apache.qpid.server.protocol;
public interface CapacityChecker
{
- void checkCapacity(AMQSessionModel channel);
+ void checkCapacity(AMQSessionModel<?> channel);
}
Modified: qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java?rev=1769773&r1=1769772&r2=1769773&view=diff
==============================================================================
--- qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java (original)
+++ qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java Tue Nov 15 10:51:40 2016
@@ -154,26 +154,11 @@ public abstract class AbstractQueue<X ex
private volatile QueueConsumer<?> _exclusiveSubscriber;
- private final AtomicInteger _atomicQueueCount = new AtomicInteger(0);
-
- private final AtomicLong _atomicQueueSize = new AtomicLong(0L);
-
private final AtomicLong _targetQueueSize = new AtomicLong(INITIAL_TARGET_QUEUE_SIZE);
private final AtomicInteger _activeSubscriberCount = new AtomicInteger();
- private final AtomicLong _dequeueCount = new AtomicLong();
- private final AtomicLong _dequeueSize = new AtomicLong();
- private final AtomicLong _enqueueCount = new AtomicLong();
- private final AtomicLong _enqueueSize = new AtomicLong();
- private final AtomicLong _persistentMessageEnqueueSize = new AtomicLong();
- private final AtomicLong _persistentMessageDequeueSize = new AtomicLong();
- private final AtomicLong _persistentMessageEnqueueCount = new AtomicLong();
- private final AtomicLong _persistentMessageDequeueCount = new AtomicLong();
- private final AtomicLong _unackedMsgCount = new AtomicLong(0);
- private final AtomicLong _unackedMsgBytes = new AtomicLong();
-
- private final AtomicInteger _bindingCountHigh = new AtomicInteger();
+ private final QueueStatistics _queueStatistics = new QueueStatistics();
/** max allowed size(KB) of a single message */
@ManagedAttributeField( afterSet = "updateAlertChecks" )
@@ -390,7 +375,7 @@ public abstract class AbstractQueue<X ex
Map<String,Object> attributes = getActualAttributes();
- final LinkedHashMap<String, Object> arguments = new LinkedHashMap<String, Object>(attributes);
+ final LinkedHashMap<String, Object> arguments = new LinkedHashMap<>(attributes);
arguments.put(Queue.EXCLUSIVE, _exclusive);
arguments.put(Queue.LIFETIME_POLICY, getLifetimePolicy());
@@ -1077,15 +1062,6 @@ public abstract class AbstractQueue<X ex
public void addBinding(final Binding<?> binding)
{
_bindings.add(binding);
- int bindingCount = _bindings.size();
- int bindingCountHigh;
- while(bindingCount > (bindingCountHigh = _bindingCountHigh.get()))
- {
- if(_bindingCountHigh.compareAndSet(bindingCountHigh, bindingCount))
- {
- break;
- }
- }
childAdded(binding);
}
@@ -1114,8 +1090,6 @@ public abstract class AbstractQueue<X ex
public final void enqueue(ServerMessage message, Action<? super MessageInstance> action, MessageEnqueueRecord enqueueRecord)
{
- incrementQueueCount();
- incrementQueueSize(message);
if(_recovering.get() != RECOVERED)
{
@@ -1148,15 +1122,13 @@ public abstract class AbstractQueue<X ex
doEnqueue(message, action, enqueueRecord);
}
- long estimatedQueueSize = _atomicQueueSize.get() + _atomicQueueCount.get() * _estimatedAverageMessageHeaderSize;
+ long estimatedQueueSize = _queueStatistics.getQueueSize() + _queueStatistics.getQueueCount() * _estimatedAverageMessageHeaderSize;
_flowToDiskChecker.flowToDiskAndReportIfNecessary(message.getStoredMessage(), estimatedQueueSize,
_targetQueueSize.get());
}
public final void recover(ServerMessage message, final MessageEnqueueRecord enqueueRecord)
{
- incrementQueueCount();
- incrementQueueSize(message);
doEnqueue(message, null, enqueueRecord);
}
@@ -1229,7 +1201,7 @@ public abstract class AbstractQueue<X ex
{
arrivalTime = System.currentTimeMillis();
}
- if(expiration != 0l)
+ if(expiration != 0L)
{
long calculatedExpiration = arrivalTime+_minimumMessageTtl;
if(calculatedExpiration > expiration)
@@ -1239,14 +1211,14 @@ public abstract class AbstractQueue<X ex
}
}
}
- if(_maximumMessageTtl != 0l)
+ if(_maximumMessageTtl != 0L)
{
if(arrivalTime == 0)
{
arrivalTime = System.currentTimeMillis();
}
long calculatedExpiration = arrivalTime+_maximumMessageTtl;
- if(expiration == 0l || expiration > calculatedExpiration)
+ if(expiration == 0L || expiration > calculatedExpiration)
{
entry.setExpiration(calculatedExpiration);
}
@@ -1278,19 +1250,6 @@ public abstract class AbstractQueue<X ex
// Simple Queues don't :-)
}
- private void incrementQueueSize(final ServerMessage message)
- {
- long size = message.getSize();
- getAtomicQueueSize().addAndGet(size);
- _enqueueCount.incrementAndGet();
- _enqueueSize.addAndGet(size);
- if(message.isPersistent() && isDurable())
- {
- _persistentMessageEnqueueSize.addAndGet(size);
- _persistentMessageEnqueueCount.incrementAndGet();
- }
- }
-
@Override
public void setTargetSize(final long targetSize)
{
@@ -1302,17 +1261,12 @@ public abstract class AbstractQueue<X ex
public long getTotalDequeuedMessages()
{
- return _dequeueCount.get();
+ return _queueStatistics.getDequeueCount();
}
public long getTotalEnqueuedMessages()
{
- return _enqueueCount.get();
- }
-
- private void incrementQueueCount()
- {
- getAtomicQueueCount().incrementAndGet();
+ return _queueStatistics.getEnqueueCount();
}
private void setLastSeenEntry(final QueueConsumer<?> sub, final QueueEntry entry)
@@ -1377,35 +1331,6 @@ public abstract class AbstractQueue<X ex
}
}
-
- @Override
- public void dequeue(QueueEntry entry)
- {
- decrementQueueCount();
- decrementQueueSize(entry);
- checkCapacity();
- }
-
- private void decrementQueueSize(final QueueEntry entry)
- {
- final ServerMessage message = entry.getMessage();
- long size = message.getSize();
- getAtomicQueueSize().addAndGet(-size);
- _dequeueSize.addAndGet(size);
- if(message.isPersistent() && isDurable())
- {
- _persistentMessageDequeueSize.addAndGet(size);
- _persistentMessageDequeueCount.incrementAndGet();
- }
- }
-
- void decrementQueueCount()
- {
- getAtomicQueueCount().decrementAndGet();
- _dequeueCount.incrementAndGet();
- }
-
-
public int getConsumerCount()
{
return _queueConsumerManager.getAllSize();
@@ -1429,20 +1354,20 @@ public abstract class AbstractQueue<X ex
@Override
public int getQueueDepthMessages()
{
- return getAtomicQueueCount().get();
+ return _queueStatistics.getQueueCount();
}
public long getQueueDepthBytes()
{
- return getAtomicQueueSize().get();
+ return _queueStatistics.getQueueSize();
}
@Override
public long getOldestMessageArrivalTime()
{
- long oldestMessageArrivalTime = -1l;
+ long oldestMessageArrivalTime = -1L;
- while(oldestMessageArrivalTime == -1l)
+ while(oldestMessageArrivalTime == -1L)
{
QueueEntry entry = getEntries().getOldestEntry();
if (entry != null)
@@ -1512,16 +1437,6 @@ public abstract class AbstractQueue<X ex
return getName().compareTo(o.getName());
}
- public AtomicInteger getAtomicQueueCount()
- {
- return _atomicQueueCount;
- }
-
- public AtomicLong getAtomicQueueSize()
- {
- return _atomicQueueSize;
- }
-
private boolean hasExclusiveConsumer()
{
return _exclusiveSubscriber != null;
@@ -1535,6 +1450,11 @@ public abstract class AbstractQueue<X ex
/** Used to track bindings to exchanges so that on deletion they can easily be cancelled. */
abstract QueueEntryList getEntries();
+ final QueueStatistics getQueueStatistics()
+ {
+ return _queueStatistics;
+ }
+
protected final QueueConsumerManagerImpl getQueueConsumerManager()
{
return _queueConsumerManager;
@@ -1545,33 +1465,14 @@ public abstract class AbstractQueue<X ex
return _virtualHost.getEventLogger();
}
- public static interface QueueEntryFilter
+ public interface QueueEntryFilter
{
- public boolean accept(QueueEntry entry);
+ boolean accept(QueueEntry entry);
- public boolean filterComplete();
+ boolean filterComplete();
}
-
- public List<QueueEntry> getMessagesOnTheQueue(final long fromMessageId, final long toMessageId)
- {
- return getMessagesOnTheQueue(new QueueEntryFilter()
- {
-
- public boolean accept(QueueEntry entry)
- {
- final long messageId = entry.getMessage().getMessageNumber();
- return messageId >= fromMessageId && messageId <= toMessageId;
- }
-
- public boolean filterComplete()
- {
- return false;
- }
- });
- }
-
public QueueEntry getMessageOnTheQueue(final long messageId)
{
List<QueueEntry> entries = getMessagesOnTheQueue(new QueueEntryFilter()
@@ -1862,27 +1763,28 @@ public abstract class AbstractQueue<X ex
return Futures.immediateFuture(null);
}
- public void checkCapacity(AMQSessionModel channel)
+ @Override
+ public void checkCapacity(AMQSessionModel<?> channel)
{
- if(_queueFlowControlSizeBytes != 0l)
+ if(_queueFlowControlSizeBytes != 0L)
{
- if(_atomicQueueSize.get() > _queueFlowControlSizeBytes)
+ if(_queueStatistics.getQueueSize() > _queueFlowControlSizeBytes)
{
_overfull.set(true);
//Overfull log message
- getEventLogger().message(_logSubject, QueueMessages.OVERFULL(_atomicQueueSize.get(),
+ getEventLogger().message(_logSubject, QueueMessages.OVERFULL(_queueStatistics.getQueueSize(),
_queueFlowControlSizeBytes));
_blockedChannels.add(channel);
channel.block(this);
- if(_atomicQueueSize.get() <= _queueFlowResumeSizeBytes)
+ if(_queueStatistics.getQueueSize() <= _queueFlowResumeSizeBytes)
{
//Underfull log message
getEventLogger().message(_logSubject,
- QueueMessages.UNDERFULL(_atomicQueueSize.get(), _queueFlowResumeSizeBytes));
+ QueueMessages.UNDERFULL(_queueStatistics.getQueueSize(), _queueFlowResumeSizeBytes));
channel.unblock(this);
_blockedChannels.remove(channel);
@@ -1895,22 +1797,27 @@ public abstract class AbstractQueue<X ex
}
}
- private void checkCapacity()
+ public void checkCapacity()
{
- if(_queueFlowControlSizeBytes != 0L)
+ if(getEntries() != null)
{
- if(_overfull.get() && _atomicQueueSize.get() <= _queueFlowResumeSizeBytes)
+ if (_queueFlowControlSizeBytes != 0L)
{
- if(_overfull.compareAndSet(true,false))
- {//Underfull log message
- getEventLogger().message(_logSubject,
- QueueMessages.UNDERFULL(_atomicQueueSize.get(), _queueFlowResumeSizeBytes));
- }
-
- for(final AMQSessionModel blockedChannel : _blockedChannels)
+ if (_overfull.get() && _queueStatistics.getQueueSize() <= _queueFlowResumeSizeBytes)
{
- blockedChannel.unblock(this);
- _blockedChannels.remove(blockedChannel);
+ if (_overfull.compareAndSet(true, false))
+ {
+ //Underfull log message
+ getEventLogger().message(_logSubject,
+ QueueMessages.UNDERFULL(_queueStatistics.getQueueSize(),
+ _queueFlowResumeSizeBytes));
+ }
+
+ for (final AMQSessionModel<?> blockedChannel : _blockedChannels)
+ {
+ blockedChannel.unblock(this);
+ _blockedChannels.remove(blockedChannel);
+ }
}
}
}
@@ -2174,7 +2081,7 @@ public abstract class AbstractQueue<X ex
{
QueueEntryIterator queueListIterator = getEntries().iterator();
- final long estimatedQueueSize = _atomicQueueSize.get() + _atomicQueueCount.get() * _estimatedAverageMessageHeaderSize;
+ final long estimatedQueueSize = _queueStatistics.getQueueSize() + _queueStatistics.getQueueCount() * _estimatedAverageMessageHeaderSize;
_flowToDiskChecker.reportFlowToDiskStatusIfNecessary(estimatedQueueSize, _targetQueueSize.get());
final Set<NotificationCheck> perMessageChecks = new HashSet<>();
@@ -2490,8 +2397,7 @@ public abstract class AbstractQueue<X ex
try
{
- long foo = ByteStreams.copy(inputStream, outputStream);
- foo = foo +1 -1;
+ ByteStreams.copy(inputStream, outputStream);
}
finally
{
@@ -2538,32 +2444,32 @@ public abstract class AbstractQueue<X ex
public long getTotalEnqueuedBytes()
{
- return _enqueueSize.get();
+ return _queueStatistics.getEnqueueSize();
}
public long getTotalDequeuedBytes()
{
- return _dequeueSize.get();
+ return _queueStatistics.getDequeueSize();
}
public long getPersistentEnqueuedBytes()
{
- return _persistentMessageEnqueueSize.get();
+ return _queueStatistics.getPersistentEnqueueSize();
}
public long getPersistentDequeuedBytes()
{
- return _persistentMessageDequeueSize.get();
+ return _queueStatistics.getPersistentDequeueSize();
}
public long getPersistentEnqueuedMessages()
{
- return _persistentMessageEnqueueCount.get();
+ return _queueStatistics.getPersistentEnqueueCount();
}
public long getPersistentDequeuedMessages()
{
- return _persistentMessageDequeueCount.get();
+ return _queueStatistics.getPersistentDequeueCount();
}
@Override
@@ -2611,26 +2517,12 @@ public abstract class AbstractQueue<X ex
public long getUnacknowledgedMessages()
{
- return _unackedMsgCount.get();
+ return _queueStatistics.getUnackedCount();
}
public long getUnacknowledgedBytes()
{
- return _unackedMsgBytes.get();
- }
-
- @Override
- public void decrementUnackedMsgCount(QueueEntry queueEntry)
- {
- _unackedMsgCount.decrementAndGet();
- _unackedMsgBytes.addAndGet(-queueEntry.getSize());
- }
-
- @Override
- public void incrementUnackedMsgCount(QueueEntry entry)
- {
- _unackedMsgCount.incrementAndGet();
- _unackedMsgBytes.addAndGet(entry.getSize());
+ return _queueStatistics.getUnackedSize();
}
@Override
Modified: qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/LastValueQueueImpl.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/LastValueQueueImpl.java?rev=1769773&r1=1769772&r2=1769773&view=diff
==============================================================================
--- qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/LastValueQueueImpl.java (original)
+++ qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/LastValueQueueImpl.java Tue Nov 15 10:51:40 2016
@@ -44,7 +44,7 @@ public class LastValueQueueImpl extends
protected void onOpen()
{
super.onOpen();
- _entries = new LastValueQueueList(this);
+ _entries = new LastValueQueueList(this, getQueueStatistics());
}
@Override
Modified: qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/LastValueQueueList.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/LastValueQueueList.java?rev=1769773&r1=1769772&r2=1769773&view=diff
==============================================================================
--- qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/LastValueQueueList.java (original)
+++ qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/LastValueQueueList.java Tue Nov 15 10:51:40 2016
@@ -57,9 +57,9 @@ public class LastValueQueueList extends
private final ConflationQueueEntry _deleteInProgress = new ConflationQueueEntry(this);
private final ConflationQueueEntry _newerEntryAlreadyBeenAndGone = new ConflationQueueEntry(this);
- public LastValueQueueList(LastValueQueueImpl queue)
+ public LastValueQueueList(LastValueQueue<?> queue, QueueStatistics queueStatistics)
{
- super(queue, HEAD_CREATOR);
+ super(queue, queueStatistics, HEAD_CREATOR);
_conflationKey = queue.getLvqKey();
}
Modified: qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/OrderedQueueEntryList.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/OrderedQueueEntryList.java?rev=1769773&r1=1769772&r2=1769773&view=diff
==============================================================================
--- qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/OrderedQueueEntryList.java (original)
+++ qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/OrderedQueueEntryList.java Tue Nov 15 10:51:40 2016
@@ -30,7 +30,7 @@ import org.apache.qpid.server.message.Se
import org.apache.qpid.server.model.Queue;
import org.apache.qpid.server.store.MessageEnqueueRecord;
-public abstract class OrderedQueueEntryList implements QueueEntryList
+public abstract class OrderedQueueEntryList extends AbstractQueueEntryList
{
private final OrderedQueueEntry _head;
@@ -53,8 +53,11 @@ public abstract class OrderedQueueEntryL
private final AtomicReference<QueueEntry> _unscavengedHWM = new AtomicReference<QueueEntry>();
- public OrderedQueueEntryList(Queue<?> queue, HeadCreator headCreator)
+ public OrderedQueueEntryList(Queue<?> queue,
+ final QueueStatistics queueStatistics,
+ HeadCreator headCreator)
{
+ super(queue, queueStatistics);
_queue = queue;
_scavengeCount = _queue.getContextValue(Integer.class, QUEUE_SCAVANGE_COUNT);
_head = headCreator.createHead(this);
@@ -84,7 +87,8 @@ public abstract class OrderedQueueEntryL
public QueueEntry add(ServerMessage message, final MessageEnqueueRecord enqueueRecord)
{
- OrderedQueueEntry node = createQueueEntry(message, enqueueRecord);
+ final OrderedQueueEntry node = createQueueEntry(message, enqueueRecord);
+ updateStatsOnEnqueue(node);
for (;;)
{
OrderedQueueEntry tail = _tail;
Modified: qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/PriorityQueueList.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/PriorityQueueList.java?rev=1769773&r1=1769772&r2=1769773&view=diff
==============================================================================
--- qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/PriorityQueueList.java (original)
+++ qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/PriorityQueueList.java Tue Nov 15 10:51:40 2016
@@ -35,7 +35,7 @@ abstract public class PriorityQueueList
public PriorityQueueList(final PriorityQueueImpl queue,
final HeadCreator headCreator)
{
- super(queue, headCreator);
+ super(queue, queue.getQueueStatistics(), headCreator);
}
static class PriorityQueueMasterList extends PriorityQueueList
Modified: qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java?rev=1769773&r1=1769772&r2=1769773&view=diff
==============================================================================
--- qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java (original)
+++ qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java Tue Nov 15 10:51:40 2016
@@ -481,7 +481,6 @@ class QueueConsumerImpl
public void acquisitionRemoved(final QueueEntry node)
{
_target.acquisitionRemoved(node);
- _queue.decrementUnackedMsgCount(node);
}
@Override
Modified: qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java?rev=1769773&r1=1769772&r2=1769773&view=diff
==============================================================================
--- qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java (original)
+++ qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java Tue Nov 15 10:51:40 2016
@@ -296,7 +296,7 @@ public abstract class QueueEntryImpl imp
}
}
- if(acquired && _stateChangeListeners != null)
+ if(acquired)
{
notifyStateChange(AVAILABLE_STATE, state);
}
@@ -310,7 +310,6 @@ public abstract class QueueEntryImpl imp
if(acquired)
{
_deliveryCountUpdater.compareAndSet(this,-1,0);
- getQueue().incrementUnackedMsgCount(this);
}
return acquired;
}
@@ -404,7 +403,7 @@ public abstract class QueueEntryImpl imp
{
EntryState state = _state;
- if((state.getState() == State.ACQUIRED) &&_stateUpdater.compareAndSet(this, state, AVAILABLE_STATE))
+ if((state.getState() == State.ACQUIRED) && _stateUpdater.compareAndSet(this, state, AVAILABLE_STATE))
{
postRelease(state);
}
@@ -422,15 +421,11 @@ public abstract class QueueEntryImpl imp
private void postRelease(final EntryState previousState)
{
- if (previousState instanceof ConsumerAcquiredState)
- {
- getQueue().decrementUnackedMsgCount(this);
- }
if(!getQueue().isDeleted())
{
getQueue().requeue(this);
- if (_stateChangeListeners != null && previousState.getState() == State.ACQUIRED)
+ if (previousState.getState() == State.ACQUIRED)
{
notifyStateChange(previousState, AVAILABLE_STATE);
}
@@ -523,16 +518,7 @@ public abstract class QueueEntryImpl imp
if(state.getState() == State.ACQUIRED)
{
- if (state instanceof ConsumerAcquiredState)
- {
- getQueue().decrementUnackedMsgCount(this);
- }
-
- getQueue().dequeue(this);
- if(_stateChangeListeners != null)
- {
- notifyStateChange(state, DEQUEUED_STATE);
- }
+ notifyStateChange(state, DEQUEUED_STATE);
return true;
}
else
@@ -544,6 +530,7 @@ public abstract class QueueEntryImpl imp
private void notifyStateChange(final EntryState oldState, final EntryState newState)
{
+ _queueEntryList.updateStatsOnStateChange(this, oldState, newState);
StateChangeListenerEntry<? super QueueEntry, EntryState> entry = _listenersUpdater.get(this);
while(entry != null)
{
@@ -562,6 +549,7 @@ public abstract class QueueEntryImpl imp
if(state != DELETED_STATE && _stateUpdater.compareAndSet(this,state,DELETED_STATE))
{
+ notifyStateChange(state, DELETED_STATE);
_queueEntryList.entryDeleted(this);
onDelete();
_message.release();
Modified: qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryList.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryList.java?rev=1769773&r1=1769772&r2=1769773&view=diff
==============================================================================
--- qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryList.java (original)
+++ qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryList.java Tue Nov 15 10:51:40 2016
@@ -24,7 +24,7 @@ import org.apache.qpid.server.message.Se
import org.apache.qpid.server.model.Queue;
import org.apache.qpid.server.store.MessageEnqueueRecord;
-public interface QueueEntryList
+interface QueueEntryList
{
Queue<?> getQueue();
@@ -44,4 +44,6 @@ public interface QueueEntryList
int getPriorities();
+ void updateStatsOnStateChange(QueueEntry entry, QueueEntry.EntryState fromState, QueueEntry.EntryState toState);
+
}
Modified: qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueueEntryList.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueueEntryList.java?rev=1769773&r1=1769772&r2=1769773&view=diff
==============================================================================
--- qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueueEntryList.java (original)
+++ qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueueEntryList.java Tue Nov 15 10:51:40 2016
@@ -31,7 +31,7 @@ import org.apache.qpid.server.store.Mess
* ISBN-13: 978-0262033848
* see http://en.wikipedia.org/wiki/Red-black_tree
*/
-public class SortedQueueEntryList implements QueueEntryList
+public class SortedQueueEntryList extends AbstractQueueEntryList
{
private final SortedQueueEntry _head;
private SortedQueueEntry _root;
@@ -40,8 +40,9 @@ public class SortedQueueEntryList implem
private final SortedQueueImpl _queue;
private final String _propertyName;
- public SortedQueueEntryList(final SortedQueueImpl queue)
+ public SortedQueueEntryList(final SortedQueueImpl queue, final QueueStatistics queueStatistics)
{
+ super(queue, queueStatistics);
_queue = queue;
_head = new SortedQueueEntry(this);
_propertyName = queue.getSortKey();
@@ -64,6 +65,8 @@ public class SortedQueueEntryList implem
}
final SortedQueueEntry entry = new SortedQueueEntry(this,message, ++_entryId, enqueueRecord);
+ updateStatsOnEnqueue(entry);
+
entry.setKey(key);
insert(entry);
Modified: qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueueImpl.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueueImpl.java?rev=1769773&r1=1769772&r2=1769773&view=diff
==============================================================================
--- qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueueImpl.java (original)
+++ qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueueImpl.java Tue Nov 15 10:51:40 2016
@@ -50,7 +50,7 @@ public class SortedQueueImpl extends Out
protected void onOpen()
{
super.onOpen();
- _entries = new SortedQueueEntryList(this);
+ _entries = new SortedQueueEntryList(this, getQueueStatistics());
}
@Override
Modified: qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/StandardQueueEntryList.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/StandardQueueEntryList.java?rev=1769773&r1=1769772&r2=1769773&view=diff
==============================================================================
--- qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/StandardQueueEntryList.java (original)
+++ qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/StandardQueueEntryList.java Tue Nov 15 10:51:40 2016
@@ -35,9 +35,9 @@ public class StandardQueueEntryList exte
}
};
- public StandardQueueEntryList(final StandardQueueImpl queue)
+ public StandardQueueEntryList(final StandardQueue<?> queue, QueueStatistics queueStatistics)
{
- super(queue, HEAD_CREATOR);
+ super(queue, queueStatistics, HEAD_CREATOR);
}
Modified: qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/StandardQueueImpl.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/StandardQueueImpl.java?rev=1769773&r1=1769772&r2=1769773&view=diff
==============================================================================
--- qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/StandardQueueImpl.java (original)
+++ qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/StandardQueueImpl.java Tue Nov 15 10:51:40 2016
@@ -39,7 +39,7 @@ public class StandardQueueImpl extends A
protected void onOpen()
{
super.onOpen();
- _entries = new StandardQueueEntryList(this);
+ _entries = new StandardQueueEntryList(this, getQueueStatistics());
}
@Override
Modified: qpid/java/branches/remove-queue-runner/broker-core/src/test/java/org/apache/qpid/server/queue/QueueMessageRecoveryTest.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/remove-queue-runner/broker-core/src/test/java/org/apache/qpid/server/queue/QueueMessageRecoveryTest.java?rev=1769773&r1=1769772&r2=1769773&view=diff
==============================================================================
--- qpid/java/branches/remove-queue-runner/broker-core/src/test/java/org/apache/qpid/server/queue/QueueMessageRecoveryTest.java (original)
+++ qpid/java/branches/remove-queue-runner/broker-core/src/test/java/org/apache/qpid/server/queue/QueueMessageRecoveryTest.java Tue Nov 15 10:51:40 2016
@@ -193,6 +193,7 @@ public class QueueMessageRecoveryTest ex
{
private final List<ServerMessage<?>> _messageList;
+ private final QueueEntryList _entries = mock(QueueEntryList.class);
protected TestQueue(final Map<String, Object> attributes,
final QueueManagingVirtualHost<?> virtualHost,
@@ -205,7 +206,7 @@ public class QueueMessageRecoveryTest ex
@Override
QueueEntryList getEntries()
{
- return null;
+ return _entries;
}
@Override
Modified: qpid/java/branches/remove-queue-runner/broker-core/src/test/java/org/apache/qpid/server/queue/SelfValidatingSortedQueueEntryList.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/remove-queue-runner/broker-core/src/test/java/org/apache/qpid/server/queue/SelfValidatingSortedQueueEntryList.java?rev=1769773&r1=1769772&r2=1769773&view=diff
==============================================================================
--- qpid/java/branches/remove-queue-runner/broker-core/src/test/java/org/apache/qpid/server/queue/SelfValidatingSortedQueueEntryList.java (original)
+++ qpid/java/branches/remove-queue-runner/broker-core/src/test/java/org/apache/qpid/server/queue/SelfValidatingSortedQueueEntryList.java Tue Nov 15 10:51:40 2016
@@ -33,7 +33,7 @@ public class SelfValidatingSortedQueueEn
{
public SelfValidatingSortedQueueEntryList(SortedQueueImpl queue)
{
- super(queue);
+ super(queue, queue.getQueueStatistics());
}
@Override
Modified: qpid/java/branches/remove-queue-runner/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueEntryListTest.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/remove-queue-runner/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueEntryListTest.java?rev=1769773&r1=1769772&r2=1769773&view=diff
==============================================================================
--- qpid/java/branches/remove-queue-runner/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueEntryListTest.java (original)
+++ qpid/java/branches/remove-queue-runner/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueEntryListTest.java Tue Nov 15 10:51:40 2016
@@ -132,7 +132,12 @@ public class StandardQueueEntryListTest
public ServerMessage getTestMessageToAdd()
{
ServerMessage msg = mock(ServerMessage.class);
+ MessageReference ref = mock(MessageReference.class);
+ when(ref.getMessage()).thenReturn(msg);
when(msg.getMessageNumber()).thenReturn(1l);
+ when(msg.newReference()).thenReturn(ref);
+ when(msg.newReference(any(TransactionLogResource.class))).thenReturn(ref);
+
return msg;
}
@@ -146,7 +151,7 @@ public class StandardQueueEntryListTest
{
StandardQueueImpl mockQueue = mock(StandardQueueImpl.class);
when(mockQueue.getContextValue(Integer.class, QUEUE_SCAVANGE_COUNT)).thenReturn(9);
- OrderedQueueEntryList sqel = new StandardQueueEntryList(mockQueue);
+ OrderedQueueEntryList sqel = new StandardQueueEntryList(mockQueue, new QueueStatistics());
ConcurrentMap<Integer,QueueEntry> entriesMap = new ConcurrentHashMap<Integer,QueueEntry>();
Modified: qpid/java/branches/remove-queue-runner/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueTest.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/remove-queue-runner/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueTest.java?rev=1769773&r1=1769772&r2=1769773&view=diff
==============================================================================
--- qpid/java/branches/remove-queue-runner/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueTest.java (original)
+++ qpid/java/branches/remove-queue-runner/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueTest.java Tue Nov 15 10:51:40 2016
@@ -189,7 +189,7 @@ public class StandardQueueTest extends A
private static class DequeuedQueue extends AbstractQueue
{
- private QueueEntryList _entries = new DequeuedQueueEntryList(this);
+ private QueueEntryList _entries = new DequeuedQueueEntryList(this, getQueueStatistics());
public DequeuedQueue(QueueManagingVirtualHost<?> virtualHost)
{
@@ -225,9 +225,9 @@ public class StandardQueueTest extends A
}
};
- public DequeuedQueueEntryList(final DequeuedQueue queue)
+ public DequeuedQueueEntryList(final DequeuedQueue queue, final QueueStatistics queueStatistics)
{
- super(queue, HEAD_CREATOR);
+ super(queue, queueStatistics, HEAD_CREATOR);
}
/**
Modified: qpid/java/branches/remove-queue-runner/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/remove-queue-runner/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java?rev=1769773&r1=1769772&r2=1769773&view=diff
==============================================================================
--- qpid/java/branches/remove-queue-runner/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java (original)
+++ qpid/java/branches/remove-queue-runner/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java Tue Nov 15 10:51:40 2016
@@ -342,7 +342,7 @@ public abstract class ConsumerTarget_0_8
return _targetAddress;
}
- public AMQSessionModel getSessionModel()
+ public AMQChannel getSessionModel()
{
return _channel;
}
Modified: qpid/java/branches/remove-queue-runner/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/remove-queue-runner/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java?rev=1769773&r1=1769772&r2=1769773&view=diff
==============================================================================
--- qpid/java/branches/remove-queue-runner/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java (original)
+++ qpid/java/branches/remove-queue-runner/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java Tue Nov 15 10:51:40 2016
@@ -32,7 +32,6 @@ import org.apache.qpid.server.consumer.C
import org.apache.qpid.server.message.MessageInstance;
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.plugin.MessageConverter;
-import org.apache.qpid.server.protocol.AMQSessionModel;
import org.apache.qpid.server.protocol.MessageConverterRegistry;
import org.apache.qpid.server.protocol.v1_0.codec.ValueHandler;
import org.apache.qpid.server.protocol.v1_0.messaging.SectionEncoder;
@@ -494,7 +493,7 @@ class ConsumerTarget_1_0 extends Abstrac
}
@Override
- public AMQSessionModel getSessionModel()
+ public Session_1_0 getSessionModel()
{
return getSession();
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org