You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by lq...@apache.org on 2017/04/28 13:51:12 UTC

[5/5] qpid-broker-j git commit: QPID-7689: [Java Broker] Use queue statistics instead of estimate when querying queue size

QPID-7689: [Java Broker] Use queue statistics instead of estimate when querying queue size


Project: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/commit/e490aa08
Tree: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/tree/e490aa08
Diff: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/diff/e490aa08

Branch: refs/heads/master
Commit: e490aa08316221868b9868d31078084ca265f786
Parents: 36bab8f
Author: Lorenz Quack <lq...@apache.org>
Authored: Wed Apr 26 13:58:39 2017 +0100
Committer: Lorenz Quack <lq...@apache.org>
Committed: Fri Apr 28 14:50:33 2017 +0100

----------------------------------------------------------------------
 .../org/apache/qpid/server/model/Queue.java     | 12 -----
 .../apache/qpid/server/queue/AbstractQueue.java | 48 ++++++++++----------
 .../server/virtualhost/AbstractVirtualHost.java | 15 ++++--
 3 files changed, 34 insertions(+), 41 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/e490aa08/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java b/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java
index be8a11f..89a687e 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java
@@ -83,16 +83,6 @@ public interface Queue<X extends Queue<X>> extends ConfiguredObject<X>,
     String MAXIMUM_QUEUE_DEPTH_MESSAGES = "maximumQueueDepthMessages";
     String MAXIMUM_QUEUE_DEPTH_BYTES = "maximumQueueDepthBytes";
 
-    String QUEUE_MINIMUM_ESTIMATED_MEMORY_FOOTPRINT = "queue.minimumEstimatedMemoryFootprint";
-    @SuppressWarnings("unused")
-    @ManagedContextDefault( name = QUEUE_MINIMUM_ESTIMATED_MEMORY_FOOTPRINT)
-    long DEFAULT_MINIMUM_ESTIMATED_MEMORY_FOOTPRINT = 102400L;
-
-    String QUEUE_ESTIMATED_MESSAGE_MEMORY_OVERHEAD = "queue.estimatedMessageMemoryOverhead";
-    @SuppressWarnings("unused")
-    @ManagedContextDefault( name = QUEUE_ESTIMATED_MESSAGE_MEMORY_OVERHEAD)
-    long DEFAULT_ESTIMATED_MESSAGE_MEMORY_OVERHEAD = 1024L;
-
     String QUEUE_SCAVANGE_COUNT = "qpid.queue.scavenge_count";
     @SuppressWarnings("unused")
     @ManagedContextDefault( name = QUEUE_SCAVANGE_COUNT)
@@ -483,8 +473,6 @@ public interface Queue<X extends Queue<X>> extends ConfiguredObject<X>,
 
     void setTargetSize(long targetSize);
 
-    long getPotentialMemoryFootprint();
-
     boolean isHeld(QueueEntry queueEntry, final long evaluationTime);
 
     void checkCapacity();

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/e490aa08/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java b/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
index 1fa8aec..92a994d 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
@@ -195,11 +195,6 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>>
     private final Set<NotificationCheck> _notificationChecks =
             Collections.synchronizedSet(EnumSet.noneOf(NotificationCheck.class));
 
-
-    private volatile long _estimatedAverageMessageHeaderSize;
-    private volatile long _estimatedMessageMemoryOverhead;
-    private volatile long _estimatedMinimumMemoryFootprint;
-
     private AtomicBoolean _stopped = new AtomicBoolean(false);
 
     private final AtomicBoolean _deleted = new AtomicBoolean(false);
@@ -350,9 +345,6 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>>
 
         _logSubject = new QueueLogSubject(this);
 
-        _estimatedMinimumMemoryFootprint = getContextValue(Long.class, QUEUE_MINIMUM_ESTIMATED_MEMORY_FOOTPRINT);
-        _estimatedMessageMemoryOverhead = getContextValue(Long.class, QUEUE_ESTIMATED_MESSAGE_MEMORY_OVERHEAD);
-
         _overflowPolicyHandler = createOverflowPolicyHandler(getOverflowPolicy());
 
         _queueHouseKeepingTask = new AdvanceConsumersTask();
@@ -476,7 +468,6 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>>
             _messageGroupManager = null;
         }
 
-        _estimatedAverageMessageHeaderSize = getContextValue(Long.class, QUEUE_ESTIMATED_MESSAGE_MEMORY_OVERHEAD);
         _mimeTypeToFileExtension = getContextValue(Map.class, MAP_OF_STRING_STRING, MIME_TYPE_TO_FILE_EXTENSION);
 
         if(_defaultFilters != null)
@@ -1107,7 +1098,7 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>>
             doEnqueue(message, action, enqueueRecord);
         }
 
-        long estimatedQueueSize = _queueStatistics.getQueueSize() + _queueStatistics.getQueueCount() * _estimatedAverageMessageHeaderSize;
+        long estimatedQueueSize = _queueStatistics.getQueueSizeIncludingHeader();
         _flowToDiskChecker.flowToDiskAndReportIfNecessary(message.getStoredMessage(), estimatedQueueSize,
                                                           _targetQueueSize.get());
     }
@@ -2038,7 +2029,7 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>>
     {
         QueueEntryIterator queueListIterator = getEntries().iterator();
 
-        final long estimatedQueueSize = _queueStatistics.getQueueSize() + _queueStatistics.getQueueCount() * _estimatedAverageMessageHeaderSize;
+        final long estimatedQueueSize = _queueStatistics.getQueueSizeIncludingHeader();
         _flowToDiskChecker.reportFlowToDiskStatusIfNecessary(estimatedQueueSize, _targetQueueSize.get());
 
         final Set<NotificationCheck> perMessageChecks = new HashSet<>();
@@ -2079,16 +2070,31 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>>
                     // the time the check actually occurs. So verify we
                     // can actually get the message to perform the check.
                     ServerMessage msg = node.getMessage();
-
                     if (msg != null)
                     {
-                        cumulativeQueueSize += msg.getSize() + _estimatedAverageMessageHeaderSize;
-                        _flowToDiskChecker.flowToDiskIfNecessary(msg.getStoredMessage(), cumulativeQueueSize,
-                                                                 _targetQueueSize.get());
+                        MessageReference messageReference = null;
+                        try
+                        {
+                            messageReference = msg.newReference();
+                            cumulativeQueueSize += msg.getSizeIncludingHeader();
+                            _flowToDiskChecker.flowToDiskIfNecessary(msg.getStoredMessage(), cumulativeQueueSize,
+                                                                     _targetQueueSize.get());
 
-                        for(NotificationCheck check : perMessageChecks)
+                            for(NotificationCheck check : perMessageChecks)
+                            {
+                                checkForNotification(msg, listener, currentTime, thresholdTime, check);
+                            }
+                        }
+                        catch(MessageDeletedException e)
                         {
-                            checkForNotification(msg, listener, currentTime, thresholdTime, check);
+                            // Ignore
+                        }
+                        finally
+                        {
+                            if (messageReference != null)
+                            {
+                                messageReference.release();
+                            }
                         }
                     }
                 }
@@ -2181,14 +2187,6 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>>
         }
     }
 
-    @Override
-    public long getPotentialMemoryFootprint()
-    {
-
-        return Math.max(_estimatedMinimumMemoryFootprint,
-                        getQueueDepthBytes() + _estimatedMessageMemoryOverhead * getQueueDepthMessages());
-    }
-
     public long getAlertRepeatGap()
     {
         return _alertRepeatGap;

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/e490aa08/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java b/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
index e54e70c..394cd32 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
@@ -2399,12 +2399,19 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte
         Collection<Queue> queues = getChildren(Queue.class);
         long totalSize = calculateTotalEnqueuedSize(queues);
         _logger.debug("Allocating target size to queues, total target: {} ; total enqueued size {}", targetSize, totalSize);
-        if(targetSize > 0l)
+        if (targetSize > 0l)
         {
             for (Queue<?> q : queues)
             {
-                long size = (long) ((((double) q.getPotentialMemoryFootprint() / (double) totalSize))
-                                             * (double) targetSize);
+                long size;
+                if (totalSize == 0)
+                {
+                    size = targetSize / queues.size();
+                }
+                else
+                {
+                    size = (long) ((q.getQueueDepthBytesIncludingHeader() / (double) totalSize) * targetSize);
+                }
 
                 q.setTargetSize(size);
             }
@@ -2525,7 +2532,7 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte
         long total = 0;
         for(Queue<?> queue : queues)
         {
-            total += queue.getPotentialMemoryFootprint();
+            total += queue.getQueueDepthBytesIncludingHeader();
         }
         return total;
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org