You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2016/11/12 14:45:53 UTC
svn commit: r1769383 - in /qpid/java/trunk:
broker-core/src/main/java/org/apache/qpid/server/consumer/
broker-core/src/main/java/org/apache/qpid/server/model/
broker-core/src/main/java/org/apache/qpid/server/protocol/
broker-core/src/main/java/org/apac...
Author: rgodfrey
Date: Sat Nov 12 14:45:53 2016
New Revision: 1769383
URL: http://svn.apache.org/viewvc?rev=1769383&view=rev
Log:
QPID-7506 : Refactor queue statistics accounting
Added:
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueueEntryList.java (with props)
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueStatistics.java (with props)
Removed:
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryListBase.java
Modified:
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerTarget.java
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/protocol/CapacityChecker.java
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/LastValueQueueImpl.java
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/LastValueQueueList.java
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/OrderedQueueEntryList.java
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/PriorityQueueList.java
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryList.java
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueueEntryList.java
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueueImpl.java
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/StandardQueueEntryList.java
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/StandardQueueImpl.java
qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/queue/QueueMessageRecoveryTest.java
qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/queue/SelfValidatingSortedQueueEntryList.java
qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueEntryListTest.java
qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueTest.java
qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java
Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerTarget.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerTarget.java?rev=1769383&r1=1769382&r2=1769383&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerTarget.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerTarget.java Sat Nov 12 14:45:53 2016
@@ -62,7 +62,7 @@ public interface ConsumerTarget
long getUnacknowledgedMessages();
- AMQSessionModel getSessionModel();
+ AMQSessionModel<?> getSessionModel();
long send(final ConsumerImpl consumer, MessageInstance entry, boolean batch);
Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java?rev=1769383&r1=1769382&r2=1769383&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java Sat Nov 12 14:45:53 2016
@@ -366,12 +366,6 @@ public interface Queue<X extends Queue<X
void requeue(QueueEntry entry);
- void dequeue(QueueEntry entry);
-
- void decrementUnackedMsgCount(QueueEntry queueEntry);
-
- void incrementUnackedMsgCount(QueueEntry entry);
-
boolean resend(QueueEntry entry, QueueConsumer<?> consumer);
List<? extends QueueEntry> getMessagesOnTheQueue();
@@ -413,4 +407,6 @@ public interface Queue<X extends Queue<X
long getPotentialMemoryFootprint();
boolean isHeld(QueueEntry queueEntry, final long evaluationTime);
+
+ void checkCapacity();
}
Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/protocol/CapacityChecker.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/protocol/CapacityChecker.java?rev=1769383&r1=1769382&r2=1769383&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/protocol/CapacityChecker.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/protocol/CapacityChecker.java Sat Nov 12 14:45:53 2016
@@ -22,5 +22,5 @@ package org.apache.qpid.server.protocol;
public interface CapacityChecker
{
- void checkCapacity(AMQSessionModel channel);
+ void checkCapacity(AMQSessionModel<?> channel);
}
Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java?rev=1769383&r1=1769382&r2=1769383&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java Sat Nov 12 14:45:53 2016
@@ -159,28 +159,11 @@ public abstract class AbstractQueue<X ex
- private final AtomicInteger _atomicQueueCount = new AtomicInteger(0);
-
- private final AtomicLong _atomicQueueSize = new AtomicLong(0L);
-
private final AtomicLong _targetQueueSize = new AtomicLong(INITIAL_TARGET_QUEUE_SIZE);
private final AtomicInteger _activeSubscriberCount = new AtomicInteger();
- private final AtomicLong _totalMessagesReceived = new AtomicLong();
-
- private final AtomicLong _dequeueCount = new AtomicLong();
- private final AtomicLong _dequeueSize = new AtomicLong();
- private final AtomicLong _enqueueCount = new AtomicLong();
- private final AtomicLong _enqueueSize = new AtomicLong();
- private final AtomicLong _persistentMessageEnqueueSize = new AtomicLong();
- private final AtomicLong _persistentMessageDequeueSize = new AtomicLong();
- private final AtomicLong _persistentMessageEnqueueCount = new AtomicLong();
- private final AtomicLong _persistentMessageDequeueCount = new AtomicLong();
- private final AtomicLong _unackedMsgCount = new AtomicLong(0);
- private final AtomicLong _unackedMsgBytes = new AtomicLong();
-
- private final AtomicInteger _bindingCountHigh = new AtomicInteger();
+ private final QueueStatistics _queueStatistics = new QueueStatistics();
/** max allowed size(KB) of a single message */
@ManagedAttributeField( afterSet = "updateAlertChecks" )
@@ -228,7 +211,6 @@ public abstract class AbstractQueue<X ex
private final AtomicLong _stateChangeCount = new AtomicLong(Long.MIN_VALUE);
- private AtomicInteger _deliveredMessages = new AtomicInteger();
private AtomicBoolean _stopped = new AtomicBoolean(false);
private final Set<AMQSessionModel> _blockedChannels = new ConcurrentSkipListSet<AMQSessionModel>();
@@ -366,7 +348,7 @@ public abstract class AbstractQueue<X ex
Map<String,Object> attributes = getActualAttributes();
- final LinkedHashMap<String, Object> arguments = new LinkedHashMap<String, Object>(attributes);
+ final LinkedHashMap<String, Object> arguments = new LinkedHashMap<>(attributes);
arguments.put(Queue.EXCLUSIVE, _exclusive);
arguments.put(Queue.LIFETIME_POLICY, getLifetimePolicy());
@@ -1103,15 +1085,6 @@ public abstract class AbstractQueue<X ex
public void addBinding(final Binding<?> binding)
{
_bindings.add(binding);
- int bindingCount = _bindings.size();
- int bindingCountHigh;
- while(bindingCount > (bindingCountHigh = _bindingCountHigh.get()))
- {
- if(_bindingCountHigh.compareAndSet(bindingCountHigh, bindingCount))
- {
- break;
- }
- }
childAdded(binding);
}
@@ -1140,10 +1113,6 @@ public abstract class AbstractQueue<X ex
public final void enqueue(ServerMessage message, Action<? super MessageInstance> action, MessageEnqueueRecord enqueueRecord)
{
- incrementQueueCount();
- incrementQueueSize(message);
-
- _totalMessagesReceived.incrementAndGet();
if(_recovering.get() != RECOVERED)
{
@@ -1176,17 +1145,13 @@ public abstract class AbstractQueue<X ex
doEnqueue(message, action, enqueueRecord);
}
- long estimatedQueueSize = _atomicQueueSize.get() + _atomicQueueCount.get() * _estimatedAverageMessageHeaderSize;
+ long estimatedQueueSize = _queueStatistics.getQueueSize() + _queueStatistics.getQueueCount() * _estimatedAverageMessageHeaderSize;
_flowToDiskChecker.flowToDiskAndReportIfNecessary(message.getStoredMessage(), estimatedQueueSize,
_targetQueueSize.get());
}
public final void recover(ServerMessage message, final MessageEnqueueRecord enqueueRecord)
{
- incrementQueueCount();
- incrementQueueSize(message);
-
- _totalMessagesReceived.incrementAndGet();
doEnqueue(message, null, enqueueRecord);
}
@@ -1275,7 +1240,7 @@ public abstract class AbstractQueue<X ex
{
arrivalTime = System.currentTimeMillis();
}
- if(expiration != 0l)
+ if(expiration != 0L)
{
long calculatedExpiration = arrivalTime+_minimumMessageTtl;
if(calculatedExpiration > expiration)
@@ -1285,14 +1250,14 @@ public abstract class AbstractQueue<X ex
}
}
}
- if(_maximumMessageTtl != 0l)
+ if(_maximumMessageTtl != 0L)
{
if(arrivalTime == 0)
{
arrivalTime = System.currentTimeMillis();
}
long calculatedExpiration = arrivalTime+_maximumMessageTtl;
- if(expiration == 0l || expiration > calculatedExpiration)
+ if(expiration == 0L || expiration > calculatedExpiration)
{
entry.setExpiration(calculatedExpiration);
}
@@ -1443,19 +1408,6 @@ public abstract class AbstractQueue<X ex
// Simple Queues don't :-)
}
- private void incrementQueueSize(final ServerMessage message)
- {
- long size = message.getSize();
- getAtomicQueueSize().addAndGet(size);
- _enqueueCount.incrementAndGet();
- _enqueueSize.addAndGet(size);
- if(message.isPersistent() && isDurable())
- {
- _persistentMessageEnqueueSize.addAndGet(size);
- _persistentMessageEnqueueCount.incrementAndGet();
- }
- }
-
@Override
public void setTargetSize(final long targetSize)
{
@@ -1467,17 +1419,12 @@ public abstract class AbstractQueue<X ex
public long getTotalDequeuedMessages()
{
- return _dequeueCount.get();
+ return _queueStatistics.getDequeueCount();
}
public long getTotalEnqueuedMessages()
{
- return _enqueueCount.get();
- }
-
- private void incrementQueueCount()
- {
- getAtomicQueueCount().incrementAndGet();
+ return _queueStatistics.getEnqueueCount();
}
private void deliverMessage(final QueueConsumer<?> sub,
@@ -1490,8 +1437,6 @@ public abstract class AbstractQueue<X ex
setLastSeenEntry(sub, entry);
}
- _deliveredMessages.incrementAndGet();
-
sub.send(entry, batch);
}
@@ -1548,39 +1493,6 @@ public abstract class AbstractQueue<X ex
}
- @Override
- public void dequeue(QueueEntry entry)
- {
- decrementQueueCount();
- decrementQueueSize(entry);
- if (entry.acquiredByConsumer())
- {
- _deliveredMessages.decrementAndGet();
- }
-
- checkCapacity();
-
- }
-
- private void decrementQueueSize(final QueueEntry entry)
- {
- final ServerMessage message = entry.getMessage();
- long size = message.getSize();
- getAtomicQueueSize().addAndGet(-size);
- _dequeueSize.addAndGet(size);
- if(message.isPersistent() && isDurable())
- {
- _persistentMessageDequeueSize.addAndGet(size);
- _persistentMessageDequeueCount.incrementAndGet();
- }
- }
-
- void decrementQueueCount()
- {
- getAtomicQueueCount().decrementAndGet();
- _dequeueCount.incrementAndGet();
- }
-
public boolean resend(final QueueEntry entry, final QueueConsumer<?> consumer)
{
/* TODO : This is wrong as the consumer may be suspended, we should instead change the state of the message
@@ -1630,38 +1542,20 @@ public abstract class AbstractQueue<X ex
@Override
public int getQueueDepthMessages()
{
- return getAtomicQueueCount().get();
+ return _queueStatistics.getQueueCount();
}
public long getQueueDepthBytes()
{
- return getAtomicQueueSize().get();
- }
-
- public int getUndeliveredMessageCount()
- {
- int count = getQueueDepthMessages() - _deliveredMessages.get();
- if (count < 0)
- {
- return 0;
- }
- else
- {
- return count;
- }
- }
-
- public long getReceivedMessageCount()
- {
- return _totalMessagesReceived.get();
+ return _queueStatistics.getQueueSize();
}
@Override
public long getOldestMessageArrivalTime()
{
- long oldestMessageArrivalTime = -1l;
+ long oldestMessageArrivalTime = -1L;
- while(oldestMessageArrivalTime == -1l)
+ while(oldestMessageArrivalTime == -1L)
{
QueueEntry entry = getEntries().getOldestEntry();
if (entry != null)
@@ -1756,16 +1650,6 @@ public abstract class AbstractQueue<X ex
return getName().compareTo(o.getName());
}
- public AtomicInteger getAtomicQueueCount()
- {
- return _atomicQueueCount;
- }
-
- public AtomicLong getAtomicQueueSize()
- {
- return _atomicQueueSize;
- }
-
private boolean hasExclusiveConsumer()
{
return _exclusiveSubscriber != null;
@@ -1784,6 +1668,11 @@ public abstract class AbstractQueue<X ex
/** Used to track bindings to exchanges so that on deletion they can easily be cancelled. */
abstract QueueEntryList getEntries();
+ final QueueStatistics getQueueStatistics()
+ {
+ return _queueStatistics;
+ }
+
protected QueueConsumerList getConsumerList()
{
return _consumerList;
@@ -1794,33 +1683,14 @@ public abstract class AbstractQueue<X ex
return _virtualHost.getEventLogger();
}
- public static interface QueueEntryFilter
+ public interface QueueEntryFilter
{
- public boolean accept(QueueEntry entry);
+ boolean accept(QueueEntry entry);
- public boolean filterComplete();
+ boolean filterComplete();
}
-
- public List<QueueEntry> getMessagesOnTheQueue(final long fromMessageId, final long toMessageId)
- {
- return getMessagesOnTheQueue(new QueueEntryFilter()
- {
-
- public boolean accept(QueueEntry entry)
- {
- final long messageId = entry.getMessage().getMessageNumber();
- return messageId >= fromMessageId && messageId <= toMessageId;
- }
-
- public boolean filterComplete()
- {
- return false;
- }
- });
- }
-
public QueueEntry getMessageOnTheQueue(final long messageId)
{
List<QueueEntry> entries = getMessagesOnTheQueue(new QueueEntryFilter()
@@ -2108,27 +1978,28 @@ public abstract class AbstractQueue<X ex
_closing = false;
}
- public void checkCapacity(AMQSessionModel channel)
+ @Override
+ public void checkCapacity(AMQSessionModel<?> channel)
{
- if(_queueFlowControlSizeBytes != 0l)
+ if(_queueFlowControlSizeBytes != 0L)
{
- if(_atomicQueueSize.get() > _queueFlowControlSizeBytes)
+ if(_queueStatistics.getQueueSize() > _queueFlowControlSizeBytes)
{
_overfull.set(true);
//Overfull log message
- getEventLogger().message(_logSubject, QueueMessages.OVERFULL(_atomicQueueSize.get(),
+ getEventLogger().message(_logSubject, QueueMessages.OVERFULL(_queueStatistics.getQueueSize(),
_queueFlowControlSizeBytes));
_blockedChannels.add(channel);
channel.block(this);
- if(_atomicQueueSize.get() <= _queueFlowResumeSizeBytes)
+ if(_queueStatistics.getQueueSize() <= _queueFlowResumeSizeBytes)
{
//Underfull log message
getEventLogger().message(_logSubject,
- QueueMessages.UNDERFULL(_atomicQueueSize.get(), _queueFlowResumeSizeBytes));
+ QueueMessages.UNDERFULL(_queueStatistics.getQueueSize(), _queueFlowResumeSizeBytes));
channel.unblock(this);
_blockedChannels.remove(channel);
@@ -2141,22 +2012,27 @@ public abstract class AbstractQueue<X ex
}
}
- private void checkCapacity()
+ public void checkCapacity()
{
- if(_queueFlowControlSizeBytes != 0L)
+ if(getEntries() != null)
{
- if(_overfull.get() && _atomicQueueSize.get() <= _queueFlowResumeSizeBytes)
+ if (_queueFlowControlSizeBytes != 0L)
{
- if(_overfull.compareAndSet(true,false))
- {//Underfull log message
- getEventLogger().message(_logSubject,
- QueueMessages.UNDERFULL(_atomicQueueSize.get(), _queueFlowResumeSizeBytes));
- }
-
- for(final AMQSessionModel blockedChannel : _blockedChannels)
+ if (_overfull.get() && _queueStatistics.getQueueSize() <= _queueFlowResumeSizeBytes)
{
- blockedChannel.unblock(this);
- _blockedChannels.remove(blockedChannel);
+ if (_overfull.compareAndSet(true, false))
+ {
+ //Underfull log message
+ getEventLogger().message(_logSubject,
+ QueueMessages.UNDERFULL(_queueStatistics.getQueueSize(),
+ _queueFlowResumeSizeBytes));
+ }
+
+ for (final AMQSessionModel<?> blockedChannel : _blockedChannels)
+ {
+ blockedChannel.unblock(this);
+ _blockedChannels.remove(blockedChannel);
+ }
}
}
}
@@ -2588,7 +2464,7 @@ public abstract class AbstractQueue<X ex
{
QueueEntryIterator queueListIterator = getEntries().iterator();
- final long estimatedQueueSize = _atomicQueueSize.get() + _atomicQueueCount.get() * _estimatedAverageMessageHeaderSize;
+ final long estimatedQueueSize = _queueStatistics.getQueueSize() + _queueStatistics.getQueueCount() * _estimatedAverageMessageHeaderSize;
_flowToDiskChecker.reportFlowToDiskStatusIfNecessary(estimatedQueueSize, _targetQueueSize.get());
final Set<NotificationCheck> perMessageChecks = new HashSet<>();
@@ -2904,8 +2780,7 @@ public abstract class AbstractQueue<X ex
try
{
- long foo = ByteStreams.copy(inputStream, outputStream);
- foo = foo +1 -1;
+ ByteStreams.copy(inputStream, outputStream);
}
finally
{
@@ -2952,32 +2827,32 @@ public abstract class AbstractQueue<X ex
public long getTotalEnqueuedBytes()
{
- return _enqueueSize.get();
+ return _queueStatistics.getEnqueueSize();
}
public long getTotalDequeuedBytes()
{
- return _dequeueSize.get();
+ return _queueStatistics.getDequeueSize();
}
public long getPersistentEnqueuedBytes()
{
- return _persistentMessageEnqueueSize.get();
+ return _queueStatistics.getPersistentEnqueueSize();
}
public long getPersistentDequeuedBytes()
{
- return _persistentMessageDequeueSize.get();
+ return _queueStatistics.getPersistentDequeueSize();
}
public long getPersistentEnqueuedMessages()
{
- return _persistentMessageEnqueueCount.get();
+ return _queueStatistics.getPersistentEnqueueCount();
}
public long getPersistentDequeuedMessages()
{
- return _persistentMessageDequeueCount.get();
+ return _queueStatistics.getPersistentDequeueCount();
}
@Override
@@ -3025,26 +2900,12 @@ public abstract class AbstractQueue<X ex
public long getUnacknowledgedMessages()
{
- return _unackedMsgCount.get();
+ return _queueStatistics.getUnackedCount();
}
public long getUnacknowledgedBytes()
{
- return _unackedMsgBytes.get();
- }
-
- @Override
- public void decrementUnackedMsgCount(QueueEntry queueEntry)
- {
- _unackedMsgCount.decrementAndGet();
- _unackedMsgBytes.addAndGet(-queueEntry.getSize());
- }
-
- @Override
- public void incrementUnackedMsgCount(QueueEntry entry)
- {
- _unackedMsgCount.incrementAndGet();
- _unackedMsgBytes.addAndGet(entry.getSize());
+ return _queueStatistics.getUnackedSize();
}
@Override
Added: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueueEntryList.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueueEntryList.java?rev=1769383&view=auto
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueueEntryList.java (added)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueueEntryList.java Sat Nov 12 14:45:53 2016
@@ -0,0 +1,102 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.queue;
+
+import org.apache.qpid.server.message.MessageInstance;
+import org.apache.qpid.server.model.Queue;
+import org.apache.qpid.server.store.MessageDurability;
+
+abstract class AbstractQueueEntryList implements QueueEntryList
+{
+
+ private final boolean _forcePersistent;
+ private final boolean _respectPersistent;
+ private final Queue<?> _queue;
+ private final QueueStatistics _queueStatistics;
+
+ protected AbstractQueueEntryList(final Queue<?> queue, final QueueStatistics queueStatistics)
+ {
+
+ final MessageDurability messageDurability = queue.getMessageDurability();
+ _queue = queue;
+ _queueStatistics = queueStatistics;
+ _forcePersistent = messageDurability == MessageDurability.ALWAYS;
+ _respectPersistent = messageDurability == MessageDurability.DEFAULT;
+ }
+
+
+ void updateStatsOnEnqueue(QueueEntry entry)
+ {
+ final long size = entry.getSize();
+ final QueueStatistics queueStatistics = _queueStatistics;
+ queueStatistics.addToAvailable(size);
+ queueStatistics.addToQueue(size);
+ queueStatistics.addToEnqueued(size);
+ if(_forcePersistent || (_respectPersistent && entry.getMessage().isPersistent()))
+ {
+ queueStatistics.addToPersistentEnqueued(size);
+ }
+ }
+
+ public void updateStatsOnStateChange(QueueEntry entry, QueueEntry.EntryState fromState, QueueEntry.EntryState toState)
+ {
+ final QueueStatistics queueStatistics = _queueStatistics;
+ final long size = entry.getSize();
+
+ final boolean isConsumerAcquired = toState instanceof MessageInstance.ConsumerAcquiredState;
+ final boolean wasConsumerAcquired = fromState instanceof MessageInstance.ConsumerAcquiredState;
+
+ switch(fromState.getState())
+ {
+ case AVAILABLE:
+ queueStatistics.removeFromAvailable(size);
+ break;
+ case ACQUIRED:
+ if(wasConsumerAcquired && !isConsumerAcquired)
+ {
+ queueStatistics.removeFromUnacknowledged(size);
+ }
+ break;
+ }
+ switch(toState.getState())
+ {
+ case AVAILABLE:
+ queueStatistics.addToAvailable(size);
+ break;
+ case ACQUIRED:
+ if(isConsumerAcquired && !wasConsumerAcquired)
+ {
+ queueStatistics.addToUnacknowledged(size);
+ }
+ break;
+ case DELETED:
+ queueStatistics.removeFromQueue(size);
+ queueStatistics.addToDequeued(size);
+ if(_forcePersistent || (_respectPersistent && entry.getMessage().isPersistent()))
+ {
+ queueStatistics.addToPersistentDequeued(size);
+ }
+ _queue.checkCapacity();
+
+ }
+ }
+
+}
Propchange: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueueEntryList.java
------------------------------------------------------------------------------
svn:eol-style = native
Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/LastValueQueueImpl.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/LastValueQueueImpl.java?rev=1769383&r1=1769382&r2=1769383&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/LastValueQueueImpl.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/LastValueQueueImpl.java Sat Nov 12 14:45:53 2016
@@ -44,7 +44,7 @@ public class LastValueQueueImpl extends
protected void onOpen()
{
super.onOpen();
- _entries = new LastValueQueueList(this);
+ _entries = new LastValueQueueList(this, getQueueStatistics());
}
@Override
Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/LastValueQueueList.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/LastValueQueueList.java?rev=1769383&r1=1769382&r2=1769383&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/LastValueQueueList.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/LastValueQueueList.java Sat Nov 12 14:45:53 2016
@@ -57,9 +57,9 @@ public class LastValueQueueList extends
private final ConflationQueueEntry _deleteInProgress = new ConflationQueueEntry(this);
private final ConflationQueueEntry _newerEntryAlreadyBeenAndGone = new ConflationQueueEntry(this);
- public LastValueQueueList(LastValueQueueImpl queue)
+ public LastValueQueueList(LastValueQueue<?> queue, QueueStatistics queueStatistics)
{
- super(queue, HEAD_CREATOR);
+ super(queue, queueStatistics, HEAD_CREATOR);
_conflationKey = queue.getLvqKey();
}
Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/OrderedQueueEntryList.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/OrderedQueueEntryList.java?rev=1769383&r1=1769382&r2=1769383&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/OrderedQueueEntryList.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/OrderedQueueEntryList.java Sat Nov 12 14:45:53 2016
@@ -28,7 +28,7 @@ import org.apache.qpid.server.message.Se
import org.apache.qpid.server.model.Queue;
import org.apache.qpid.server.store.MessageEnqueueRecord;
-public abstract class OrderedQueueEntryList implements QueueEntryList
+public abstract class OrderedQueueEntryList extends AbstractQueueEntryList
{
private final OrderedQueueEntry _head;
@@ -51,8 +51,11 @@ public abstract class OrderedQueueEntryL
private final AtomicReference<QueueEntry> _unscavengedHWM = new AtomicReference<QueueEntry>();
- public OrderedQueueEntryList(Queue<?> queue, HeadCreator headCreator)
+ public OrderedQueueEntryList(Queue<?> queue,
+ final QueueStatistics queueStatistics,
+ HeadCreator headCreator)
{
+ super(queue, queueStatistics);
_queue = queue;
_head = headCreator.createHead(this);
_tail = _head;
@@ -81,7 +84,8 @@ public abstract class OrderedQueueEntryL
public QueueEntry add(ServerMessage message, final MessageEnqueueRecord enqueueRecord)
{
- OrderedQueueEntry node = createQueueEntry(message, enqueueRecord);
+ final OrderedQueueEntry node = createQueueEntry(message, enqueueRecord);
+ updateStatsOnEnqueue(node);
for (;;)
{
OrderedQueueEntry tail = _tail;
Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/PriorityQueueList.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/PriorityQueueList.java?rev=1769383&r1=1769382&r2=1769383&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/PriorityQueueList.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/PriorityQueueList.java Sat Nov 12 14:45:53 2016
@@ -35,7 +35,7 @@ abstract public class PriorityQueueList
public PriorityQueueList(final PriorityQueueImpl queue,
final HeadCreator headCreator)
{
- super(queue, headCreator);
+ super(queue, queue.getQueueStatistics(), headCreator);
}
static class PriorityQueueMasterList extends PriorityQueueList
Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java?rev=1769383&r1=1769382&r2=1769383&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java Sat Nov 12 14:45:53 2016
@@ -624,7 +624,6 @@ class QueueConsumerImpl
public void acquisitionRemoved(final QueueEntry node)
{
_target.acquisitionRemoved(node);
- _queue.decrementUnackedMsgCount(node);
}
@Override
Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java?rev=1769383&r1=1769382&r2=1769383&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java Sat Nov 12 14:45:53 2016
@@ -296,7 +296,7 @@ public abstract class QueueEntryImpl imp
}
}
- if(acquired && _stateChangeListeners != null)
+ if(acquired)
{
notifyStateChange(AVAILABLE_STATE, state);
}
@@ -310,7 +310,6 @@ public abstract class QueueEntryImpl imp
if(acquired)
{
_deliveryCountUpdater.compareAndSet(this,-1,0);
- getQueue().incrementUnackedMsgCount(this);
}
return acquired;
}
@@ -404,7 +403,7 @@ public abstract class QueueEntryImpl imp
{
EntryState state = _state;
- if((state.getState() == State.ACQUIRED) &&_stateUpdater.compareAndSet(this, state, AVAILABLE_STATE))
+ if((state.getState() == State.ACQUIRED) && _stateUpdater.compareAndSet(this, state, AVAILABLE_STATE))
{
postRelease(state);
}
@@ -422,15 +421,11 @@ public abstract class QueueEntryImpl imp
private void postRelease(final EntryState previousState)
{
- if (previousState instanceof ConsumerAcquiredState)
- {
- getQueue().decrementUnackedMsgCount(this);
- }
if(!getQueue().isDeleted())
{
getQueue().requeue(this);
- if (_stateChangeListeners != null && previousState.getState() == State.ACQUIRED)
+ if (previousState.getState() == State.ACQUIRED)
{
notifyStateChange(previousState, AVAILABLE_STATE);
}
@@ -523,16 +518,7 @@ public abstract class QueueEntryImpl imp
if(state.getState() == State.ACQUIRED)
{
- if (state instanceof ConsumerAcquiredState)
- {
- getQueue().decrementUnackedMsgCount(this);
- }
-
- getQueue().dequeue(this);
- if(_stateChangeListeners != null)
- {
- notifyStateChange(state, DEQUEUED_STATE);
- }
+ notifyStateChange(state, DEQUEUED_STATE);
return true;
}
else
@@ -544,6 +530,7 @@ public abstract class QueueEntryImpl imp
private void notifyStateChange(final EntryState oldState, final EntryState newState)
{
+ _queueEntryList.updateStatsOnStateChange(this, oldState, newState);
StateChangeListenerEntry<? super QueueEntry, EntryState> entry = _listenersUpdater.get(this);
while(entry != null)
{
@@ -562,6 +549,7 @@ public abstract class QueueEntryImpl imp
if(state != DELETED_STATE && _stateUpdater.compareAndSet(this,state,DELETED_STATE))
{
+ notifyStateChange(state, DELETED_STATE);
_queueEntryList.entryDeleted(this);
onDelete();
_message.release();
Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryList.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryList.java?rev=1769383&r1=1769382&r2=1769383&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryList.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryList.java Sat Nov 12 14:45:53 2016
@@ -24,7 +24,7 @@ import org.apache.qpid.server.message.Se
import org.apache.qpid.server.model.Queue;
import org.apache.qpid.server.store.MessageEnqueueRecord;
-public interface QueueEntryList
+interface QueueEntryList
{
Queue<?> getQueue();
@@ -44,4 +44,6 @@ public interface QueueEntryList
int getPriorities();
+ void updateStatsOnStateChange(QueueEntry entry, QueueEntry.EntryState fromState, QueueEntry.EntryState toState);
+
}
Added: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueStatistics.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueStatistics.java?rev=1769383&view=auto
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueStatistics.java (added)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueStatistics.java Sat Nov 12 14:45:53 2016
@@ -0,0 +1,181 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.queue;
+
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+
+final class QueueStatistics
+{
+ private final AtomicInteger _queueCount = new AtomicInteger();
+ private final AtomicLong _queueSize = new AtomicLong();
+
+ private final AtomicInteger _unackedCount = new AtomicInteger();
+ private final AtomicLong _unackedSize = new AtomicLong();
+
+ private final AtomicInteger _availableCount = new AtomicInteger();
+ private final AtomicLong _availableSize = new AtomicLong();
+
+ private final AtomicLong _dequeueCount = new AtomicLong();
+ private final AtomicLong _dequeueSize = new AtomicLong();
+
+ private final AtomicLong _enqueueCount = new AtomicLong();
+ private final AtomicLong _enqueueSize = new AtomicLong();
+
+ private final AtomicLong _persistentEnqueueCount = new AtomicLong();
+ private final AtomicLong _persistentEnqueueSize = new AtomicLong();
+
+ private final AtomicLong _persistentDequeueCount = new AtomicLong();
+ private final AtomicLong _persistentDequeueSize = new AtomicLong();
+
+ public final int getQueueCount()
+ {
+ return _queueCount.get();
+ }
+
+ public final long getQueueSize()
+ {
+ return _queueSize.get();
+ }
+
+ public final int getUnackedCount()
+ {
+ return _unackedCount.get();
+ }
+
+ public final long getUnackedSize()
+ {
+ return _unackedSize.get();
+ }
+
+ public final int getAvailableCount()
+ {
+ return _availableCount.get();
+ }
+
+ public final long getAvailableSize()
+ {
+ return _availableSize.get();
+ }
+
+ public final long getEnqueueCount()
+ {
+ return _enqueueCount.get();
+ }
+
+ public final long getEnqueueSize()
+ {
+ return _enqueueSize.get();
+ }
+
+ public final long getDequeueCount()
+ {
+ return _dequeueCount.get();
+ }
+
+ public final long getDequeueSize()
+ {
+ return _dequeueSize.get();
+ }
+
+ public final long getPersistentEnqueueCount()
+ {
+ return _persistentEnqueueCount.get();
+ }
+
+ public final long getPersistentEnqueueSize()
+ {
+ return _persistentEnqueueSize.get();
+ }
+
+ public final long getPersistentDequeueCount()
+ {
+ return _persistentDequeueCount.get();
+ }
+
+ public final long getPersistentDequeueSize()
+ {
+ return _persistentDequeueSize.get();
+ }
+
+
+
+ void addToQueue(long size)
+ {
+ _queueCount.incrementAndGet();
+ _queueSize.addAndGet(size);
+ }
+
+ void removeFromQueue(long size)
+ {
+ _queueCount.decrementAndGet();
+ _queueSize.addAndGet(-size);
+ }
+
+ void addToAvailable(long size)
+ {
+ _availableCount.incrementAndGet();
+ _availableSize.addAndGet(size);
+ }
+
+ void removeFromAvailable(long size)
+ {
+ _availableCount.decrementAndGet();
+ _availableSize.addAndGet(-size);
+ }
+
+ void addToUnacknowledged(long size)
+ {
+ _unackedCount.incrementAndGet();
+ _unackedSize.addAndGet(size);
+ }
+
+ void removeFromUnacknowledged(long size)
+ {
+ _unackedCount.decrementAndGet();
+ _unackedSize.addAndGet(-size);
+ }
+
+ void addToEnqueued(long size)
+ {
+ _enqueueCount.incrementAndGet();
+ _enqueueSize.addAndGet(size);
+ }
+
+ void addToDequeued(long size)
+ {
+ _dequeueCount.incrementAndGet();
+ _dequeueSize.addAndGet(size);
+ }
+
+ void addToPersistentEnqueued(long size)
+ {
+ _persistentEnqueueCount.incrementAndGet();
+ _persistentEnqueueSize.addAndGet(size);
+ }
+
+ void addToPersistentDequeued(long size)
+ {
+ _persistentDequeueCount.incrementAndGet();
+ _persistentDequeueSize.addAndGet(size);
+ }
+
+}
Propchange: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueStatistics.java
------------------------------------------------------------------------------
svn:eol-style = native
Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueueEntryList.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueueEntryList.java?rev=1769383&r1=1769382&r2=1769383&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueueEntryList.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueueEntryList.java Sat Nov 12 14:45:53 2016
@@ -31,7 +31,7 @@ import org.apache.qpid.server.store.Mess
* ISBN-13: 978-0262033848
* see http://en.wikipedia.org/wiki/Red-black_tree
*/
-public class SortedQueueEntryList implements QueueEntryList
+public class SortedQueueEntryList extends AbstractQueueEntryList
{
private final SortedQueueEntry _head;
private SortedQueueEntry _root;
@@ -40,8 +40,9 @@ public class SortedQueueEntryList implem
private final SortedQueueImpl _queue;
private final String _propertyName;
- public SortedQueueEntryList(final SortedQueueImpl queue)
+ public SortedQueueEntryList(final SortedQueueImpl queue, final QueueStatistics queueStatistics)
{
+ super(queue, queueStatistics);
_queue = queue;
_head = new SortedQueueEntry(this);
_propertyName = queue.getSortKey();
@@ -64,6 +65,8 @@ public class SortedQueueEntryList implem
}
final SortedQueueEntry entry = new SortedQueueEntry(this,message, ++_entryId, enqueueRecord);
+ updateStatsOnEnqueue(entry);
+
entry.setKey(key);
insert(entry);
Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueueImpl.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueueImpl.java?rev=1769383&r1=1769382&r2=1769383&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueueImpl.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueueImpl.java Sat Nov 12 14:45:53 2016
@@ -50,7 +50,7 @@ public class SortedQueueImpl extends Out
protected void onOpen()
{
super.onOpen();
- _entries = new SortedQueueEntryList(this);
+ _entries = new SortedQueueEntryList(this, getQueueStatistics());
}
@Override
Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/StandardQueueEntryList.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/StandardQueueEntryList.java?rev=1769383&r1=1769382&r2=1769383&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/StandardQueueEntryList.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/StandardQueueEntryList.java Sat Nov 12 14:45:53 2016
@@ -35,9 +35,9 @@ public class StandardQueueEntryList exte
}
};
- public StandardQueueEntryList(final StandardQueueImpl queue)
+ public StandardQueueEntryList(final StandardQueue<?> queue, QueueStatistics queueStatistics)
{
- super(queue, HEAD_CREATOR);
+ super(queue, queueStatistics, HEAD_CREATOR);
}
Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/StandardQueueImpl.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/StandardQueueImpl.java?rev=1769383&r1=1769382&r2=1769383&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/StandardQueueImpl.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/StandardQueueImpl.java Sat Nov 12 14:45:53 2016
@@ -39,7 +39,7 @@ public class StandardQueueImpl extends A
protected void onOpen()
{
super.onOpen();
- _entries = new StandardQueueEntryList(this);
+ _entries = new StandardQueueEntryList(this, getQueueStatistics());
}
@Override
Modified: qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/queue/QueueMessageRecoveryTest.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/queue/QueueMessageRecoveryTest.java?rev=1769383&r1=1769382&r2=1769383&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/queue/QueueMessageRecoveryTest.java (original)
+++ qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/queue/QueueMessageRecoveryTest.java Sat Nov 12 14:45:53 2016
@@ -193,6 +193,7 @@ public class QueueMessageRecoveryTest ex
{
private final List<ServerMessage<?>> _messageList;
+ private final QueueEntryList _entries = mock(QueueEntryList.class);
protected TestQueue(final Map<String, Object> attributes,
final QueueManagingVirtualHost<?> virtualHost,
@@ -205,7 +206,7 @@ public class QueueMessageRecoveryTest ex
@Override
QueueEntryList getEntries()
{
- return null;
+ return _entries;
}
@Override
Modified: qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/queue/SelfValidatingSortedQueueEntryList.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/queue/SelfValidatingSortedQueueEntryList.java?rev=1769383&r1=1769382&r2=1769383&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/queue/SelfValidatingSortedQueueEntryList.java (original)
+++ qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/queue/SelfValidatingSortedQueueEntryList.java Sat Nov 12 14:45:53 2016
@@ -33,7 +33,7 @@ public class SelfValidatingSortedQueueEn
{
public SelfValidatingSortedQueueEntryList(SortedQueueImpl queue)
{
- super(queue);
+ super(queue, queue.getQueueStatistics());
}
@Override
Modified: qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueEntryListTest.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueEntryListTest.java?rev=1769383&r1=1769382&r2=1769383&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueEntryListTest.java (original)
+++ qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueEntryListTest.java Sat Nov 12 14:45:53 2016
@@ -148,7 +148,12 @@ public class StandardQueueEntryListTest
public ServerMessage getTestMessageToAdd()
{
ServerMessage msg = mock(ServerMessage.class);
+ MessageReference ref = mock(MessageReference.class);
+ when(ref.getMessage()).thenReturn(msg);
when(msg.getMessageNumber()).thenReturn(1l);
+ when(msg.newReference()).thenReturn(ref);
+ when(msg.newReference(any(TransactionLogResource.class))).thenReturn(ref);
+
return msg;
}
@@ -160,7 +165,7 @@ public class StandardQueueEntryListTest
public void testScavenge() throws Exception
{
- OrderedQueueEntryList sqel = new StandardQueueEntryList(mock(StandardQueueImpl.class));
+ OrderedQueueEntryList sqel = new StandardQueueEntryList(mock(StandardQueue.class), new QueueStatistics());
ConcurrentMap<Integer,QueueEntry> entriesMap = new ConcurrentHashMap<Integer,QueueEntry>();
Modified: qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueTest.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueTest.java?rev=1769383&r1=1769382&r2=1769383&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueTest.java (original)
+++ qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueTest.java Sat Nov 12 14:45:53 2016
@@ -259,7 +259,7 @@ public class StandardQueueTest extends A
private static class DequeuedQueue extends AbstractQueue
{
- private QueueEntryList _entries = new DequeuedQueueEntryList(this);
+ private QueueEntryList _entries = new DequeuedQueueEntryList(this, getQueueStatistics());
public DequeuedQueue(QueueManagingVirtualHost<?> virtualHost)
{
@@ -295,9 +295,9 @@ public class StandardQueueTest extends A
}
};
- public DequeuedQueueEntryList(final DequeuedQueue queue)
+ public DequeuedQueueEntryList(final DequeuedQueue queue, final QueueStatistics queueStatistics)
{
- super(queue, HEAD_CREATOR);
+ super(queue, queueStatistics, HEAD_CREATOR);
}
/**
Modified: qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java?rev=1769383&r1=1769382&r2=1769383&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java Sat Nov 12 14:45:53 2016
@@ -359,7 +359,7 @@ public abstract class ConsumerTarget_0_8
return _targetAddress;
}
- public AMQSessionModel getSessionModel()
+ public AMQChannel getSessionModel()
{
return _channel;
}
Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java?rev=1769383&r1=1769382&r2=1769383&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java Sat Nov 12 14:45:53 2016
@@ -27,6 +27,7 @@ import java.util.Collection;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.qpid.server.model.Session;
import org.apache.qpid.server.protocol.v1_0.codec.ValueHandler;
import org.apache.qpid.server.protocol.v1_0.messaging.SectionEncoder;
import org.apache.qpid.server.protocol.v1_0.messaging.SectionEncoderImpl;
@@ -504,7 +505,7 @@ class ConsumerTarget_1_0 extends Abstrac
}
@Override
- public AMQSessionModel getSessionModel()
+ public Session_1_0 getSessionModel()
{
return getSession();
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org