You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by lq...@apache.org on 2017/04/28 13:51:12 UTC
[5/5] qpid-broker-j git commit: QPID-7689: [Java Broker] Use queue
statistics instead of estimate when querying queue size
QPID-7689: [Java Broker] Use queue statistics instead of estimate when querying queue size
Project: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/commit/e490aa08
Tree: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/tree/e490aa08
Diff: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/diff/e490aa08
Branch: refs/heads/master
Commit: e490aa08316221868b9868d31078084ca265f786
Parents: 36bab8f
Author: Lorenz Quack <lq...@apache.org>
Authored: Wed Apr 26 13:58:39 2017 +0100
Committer: Lorenz Quack <lq...@apache.org>
Committed: Fri Apr 28 14:50:33 2017 +0100
----------------------------------------------------------------------
.../org/apache/qpid/server/model/Queue.java | 12 -----
.../apache/qpid/server/queue/AbstractQueue.java | 48 ++++++++++----------
.../server/virtualhost/AbstractVirtualHost.java | 15 ++++--
3 files changed, 34 insertions(+), 41 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/e490aa08/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java b/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java
index be8a11f..89a687e 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java
@@ -83,16 +83,6 @@ public interface Queue<X extends Queue<X>> extends ConfiguredObject<X>,
String MAXIMUM_QUEUE_DEPTH_MESSAGES = "maximumQueueDepthMessages";
String MAXIMUM_QUEUE_DEPTH_BYTES = "maximumQueueDepthBytes";
- String QUEUE_MINIMUM_ESTIMATED_MEMORY_FOOTPRINT = "queue.minimumEstimatedMemoryFootprint";
- @SuppressWarnings("unused")
- @ManagedContextDefault( name = QUEUE_MINIMUM_ESTIMATED_MEMORY_FOOTPRINT)
- long DEFAULT_MINIMUM_ESTIMATED_MEMORY_FOOTPRINT = 102400L;
-
- String QUEUE_ESTIMATED_MESSAGE_MEMORY_OVERHEAD = "queue.estimatedMessageMemoryOverhead";
- @SuppressWarnings("unused")
- @ManagedContextDefault( name = QUEUE_ESTIMATED_MESSAGE_MEMORY_OVERHEAD)
- long DEFAULT_ESTIMATED_MESSAGE_MEMORY_OVERHEAD = 1024L;
-
String QUEUE_SCAVANGE_COUNT = "qpid.queue.scavenge_count";
@SuppressWarnings("unused")
@ManagedContextDefault( name = QUEUE_SCAVANGE_COUNT)
@@ -483,8 +473,6 @@ public interface Queue<X extends Queue<X>> extends ConfiguredObject<X>,
void setTargetSize(long targetSize);
- long getPotentialMemoryFootprint();
-
boolean isHeld(QueueEntry queueEntry, final long evaluationTime);
void checkCapacity();
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/e490aa08/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java b/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
index 1fa8aec..92a994d 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
@@ -195,11 +195,6 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>>
private final Set<NotificationCheck> _notificationChecks =
Collections.synchronizedSet(EnumSet.noneOf(NotificationCheck.class));
-
- private volatile long _estimatedAverageMessageHeaderSize;
- private volatile long _estimatedMessageMemoryOverhead;
- private volatile long _estimatedMinimumMemoryFootprint;
-
private AtomicBoolean _stopped = new AtomicBoolean(false);
private final AtomicBoolean _deleted = new AtomicBoolean(false);
@@ -350,9 +345,6 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>>
_logSubject = new QueueLogSubject(this);
- _estimatedMinimumMemoryFootprint = getContextValue(Long.class, QUEUE_MINIMUM_ESTIMATED_MEMORY_FOOTPRINT);
- _estimatedMessageMemoryOverhead = getContextValue(Long.class, QUEUE_ESTIMATED_MESSAGE_MEMORY_OVERHEAD);
-
_overflowPolicyHandler = createOverflowPolicyHandler(getOverflowPolicy());
_queueHouseKeepingTask = new AdvanceConsumersTask();
@@ -476,7 +468,6 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>>
_messageGroupManager = null;
}
- _estimatedAverageMessageHeaderSize = getContextValue(Long.class, QUEUE_ESTIMATED_MESSAGE_MEMORY_OVERHEAD);
_mimeTypeToFileExtension = getContextValue(Map.class, MAP_OF_STRING_STRING, MIME_TYPE_TO_FILE_EXTENSION);
if(_defaultFilters != null)
@@ -1107,7 +1098,7 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>>
doEnqueue(message, action, enqueueRecord);
}
- long estimatedQueueSize = _queueStatistics.getQueueSize() + _queueStatistics.getQueueCount() * _estimatedAverageMessageHeaderSize;
+ long estimatedQueueSize = _queueStatistics.getQueueSizeIncludingHeader();
_flowToDiskChecker.flowToDiskAndReportIfNecessary(message.getStoredMessage(), estimatedQueueSize,
_targetQueueSize.get());
}
@@ -2038,7 +2029,7 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>>
{
QueueEntryIterator queueListIterator = getEntries().iterator();
- final long estimatedQueueSize = _queueStatistics.getQueueSize() + _queueStatistics.getQueueCount() * _estimatedAverageMessageHeaderSize;
+ final long estimatedQueueSize = _queueStatistics.getQueueSizeIncludingHeader();
_flowToDiskChecker.reportFlowToDiskStatusIfNecessary(estimatedQueueSize, _targetQueueSize.get());
final Set<NotificationCheck> perMessageChecks = new HashSet<>();
@@ -2079,16 +2070,31 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>>
// the time the check actually occurs. So verify we
// can actually get the message to perform the check.
ServerMessage msg = node.getMessage();
-
if (msg != null)
{
- cumulativeQueueSize += msg.getSize() + _estimatedAverageMessageHeaderSize;
- _flowToDiskChecker.flowToDiskIfNecessary(msg.getStoredMessage(), cumulativeQueueSize,
- _targetQueueSize.get());
+ MessageReference messageReference = null;
+ try
+ {
+ messageReference = msg.newReference();
+ cumulativeQueueSize += msg.getSizeIncludingHeader();
+ _flowToDiskChecker.flowToDiskIfNecessary(msg.getStoredMessage(), cumulativeQueueSize,
+ _targetQueueSize.get());
- for(NotificationCheck check : perMessageChecks)
+ for(NotificationCheck check : perMessageChecks)
+ {
+ checkForNotification(msg, listener, currentTime, thresholdTime, check);
+ }
+ }
+ catch(MessageDeletedException e)
{
- checkForNotification(msg, listener, currentTime, thresholdTime, check);
+ // Ignore
+ }
+ finally
+ {
+ if (messageReference != null)
+ {
+ messageReference.release();
+ }
}
}
}
@@ -2181,14 +2187,6 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>>
}
}
- @Override
- public long getPotentialMemoryFootprint()
- {
-
- return Math.max(_estimatedMinimumMemoryFootprint,
- getQueueDepthBytes() + _estimatedMessageMemoryOverhead * getQueueDepthMessages());
- }
-
public long getAlertRepeatGap()
{
return _alertRepeatGap;
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/e490aa08/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java b/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
index e54e70c..394cd32 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
@@ -2399,12 +2399,19 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte
Collection<Queue> queues = getChildren(Queue.class);
long totalSize = calculateTotalEnqueuedSize(queues);
_logger.debug("Allocating target size to queues, total target: {} ; total enqueued size {}", targetSize, totalSize);
- if(targetSize > 0l)
+ if (targetSize > 0l)
{
for (Queue<?> q : queues)
{
- long size = (long) ((((double) q.getPotentialMemoryFootprint() / (double) totalSize))
- * (double) targetSize);
+ long size;
+ if (totalSize == 0)
+ {
+ size = targetSize / queues.size();
+ }
+ else
+ {
+ size = (long) ((q.getQueueDepthBytesIncludingHeader() / (double) totalSize) * targetSize);
+ }
q.setTargetSize(size);
}
@@ -2525,7 +2532,7 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte
long total = 0;
for(Queue<?> queue : queues)
{
- total += queue.getPotentialMemoryFootprint();
+ total += queue.getQueueDepthBytesIncludingHeader();
}
return total;
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org