You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by or...@apache.org on 2017/05/08 16:30:43 UTC

qpid-broker-j git commit: QPID-7770 : [Java Broker] Avoid rapid oscillation of flow to disk state

Repository: qpid-broker-j
Updated Branches:
  refs/heads/master 01601b173 -> ac08557c8


QPID-7770 : [Java Broker] Avoid rapid oscillation of flow to disk state


Project: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/commit/ac08557c
Tree: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/tree/ac08557c
Diff: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/diff/ac08557c

Branch: refs/heads/master
Commit: ac08557c89f29641f1b5a42332582379fb19eab6
Parents: 01601b1
Author: Alex Rudyy <or...@apache.org>
Authored: Mon May 8 15:42:35 2017 +0100
Committer: Alex Rudyy <or...@apache.org>
Committed: Mon May 8 15:42:35 2017 +0100

----------------------------------------------------------------------
 .../org/apache/qpid/server/model/Broker.java    |  10 ++
 .../apache/qpid/server/model/BrokerImpl.java    |   8 ++
 .../org/apache/qpid/server/model/Queue.java     |   9 +-
 .../apache/qpid/server/queue/AbstractQueue.java | 105 ++++++++-----------
 4 files changed, 72 insertions(+), 60 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/ac08557c/broker-core/src/main/java/org/apache/qpid/server/model/Broker.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/model/Broker.java b/broker-core/src/main/java/org/apache/qpid/server/model/Broker.java
index 19e6de0..5545f28 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/model/Broker.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/model/Broker.java
@@ -85,6 +85,13 @@ public interface Broker<X extends Broker<X>> extends ConfiguredObject<X>, EventL
     @ManagedContextDefault(name = BROKER_FLOW_TO_DISK_THRESHOLD)
     long DEFAULT_FLOW_TO_DISK_THRESHOLD = (long)(0.75 * (double) BrokerImpl.getMaxDirectMemorySize());
 
+
+    String BROKER_FLOW_TO_DISK_CESSATION_FRACTION = "broker.flowToDiskCessationFraction";
+    @SuppressWarnings("unused")
+    @ManagedContextDefault( name = BROKER_FLOW_TO_DISK_CESSATION_FRACTION,
+            description = "Fraction of the flow to disk threshold where flow to disk will cease.")
+    double DEFAULT_BROKER_FLOW_TO_DISK_CESSATION_FRACTION = 0.8;
+
     String COMPACT_MEMORY_THRESHOLD = "qpid.compact_memory_threshold";
     @SuppressWarnings("unused")
     @ManagedContextDefault(name = COMPACT_MEMORY_THRESHOLD)
@@ -368,6 +375,9 @@ public interface Broker<X extends Broker<X>> extends ConfiguredObject<X>, EventL
     @DerivedAttribute(description = "Threshold direct memory size (in bytes) at which the Broker will start flowing incoming messages to disk.")
     long getFlowToDiskThreshold();
 
+    @DerivedAttribute(description = "Threshold direct memory size (in bytes) at which the Broker will cease flowing incoming messages to disk.")
+    long getFlowToDiskCessationThreshold();
+
     @DerivedAttribute(description = "Threshold direct memory size (in bytes) at which the Broker will start considering to compact sparse buffers. Set to -1 to disable.")
     long getCompactMemoryThreshold();
 

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/ac08557c/broker-core/src/main/java/org/apache/qpid/server/model/BrokerImpl.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/model/BrokerImpl.java b/broker-core/src/main/java/org/apache/qpid/server/model/BrokerImpl.java
index 0840b66..d7182bc 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/model/BrokerImpl.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/model/BrokerImpl.java
@@ -150,6 +150,7 @@ public class BrokerImpl extends AbstractContainer<BrokerImpl> implements Broker<
     private long _compactMemoryThreshold;
     private long _compactMemoryInterval;
     private long _flowToDiskThreshold;
+    private long _flowToDiskCessationThreshold;
     private double _sparsityFraction;
     private long _lastDisposalCounter;
 
@@ -622,6 +623,7 @@ public class BrokerImpl extends AbstractContainer<BrokerImpl> implements Broker<
         getEventLogger().message(BrokerMessages.MAX_MEMORY(heapMemory, directMemory));
 
         _flowToDiskThreshold = getContextValue(Long.class, BROKER_FLOW_TO_DISK_THRESHOLD);
+        _flowToDiskCessationThreshold = (long) (getContextValue(Double.class, BROKER_FLOW_TO_DISK_CESSATION_FRACTION) * _flowToDiskThreshold);
         _compactMemoryThreshold = getContextValue(Long.class, Broker.COMPACT_MEMORY_THRESHOLD);
         _compactMemoryInterval = getContextValue(Long.class, Broker.COMPACT_MEMORY_INTERVAL);
 
@@ -886,6 +888,12 @@ public class BrokerImpl extends AbstractContainer<BrokerImpl> implements Broker<
     }
 
     @Override
+    public long getFlowToDiskCessationThreshold()
+    {
+        return _flowToDiskCessationThreshold;
+    }
+
+    @Override
     public long getNumberOfActivePooledBuffers()
     {
         return QpidByteBuffer.getNumberOfActivePooledBuffers();

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/ac08557c/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java b/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java
index f2cbc25..94d02ee 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java
@@ -210,7 +210,11 @@ public interface Queue<X extends Queue<X>> extends ConfiguredObject<X>,
                           + "none is explicitly set")
     String DEFAULT_MESSAGE_DURABILTY = "DEFAULT";
 
-
+    String QUEUE_FLOW_TO_DISK_QUEUE_CESSATION_FRACTION = "queue.flowToDiskQueueCessationFraction";
+    @SuppressWarnings("unused")
+    @ManagedContextDefault( name = QUEUE_FLOW_TO_DISK_QUEUE_CESSATION_FRACTION,
+            description = "The fraction of this queue's 'target size' where the queue will cease flowing messages to disk")
+    double DEFAULT_QUEUE_FLOW_TO_DISK_QUEUE_CESSATION_FRACTION = 0.80;
 
     @ManagedAttribute( defaultValue = "${queue.defaultMessageDurability}" )
     MessageDurability getMessageDurability();
@@ -276,6 +280,9 @@ public interface Queue<X extends Queue<X>> extends ConfiguredObject<X>,
             mandatory = true)
     OverflowPolicy getOverflowPolicy();
 
+    @DerivedAttribute(description = "Returns 'true' when the broker is flowing transient messages to disk")
+    boolean isFlowingToDisk();
+
     @ManagedOperation(nonModifying = true, changesConfiguredObjectState = false)
     Collection<PublishingLink> getPublishingLinks();
 

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/ac08557c/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java b/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
index 6deda67..9a5f9d7 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
@@ -262,11 +262,14 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>>
     private boolean _closing;
     private final ConcurrentMap<String, Callable<MessageFilter>> _defaultFiltersMap = new ConcurrentHashMap<>();
     private final List<HoldMethod> _holdMethods = new CopyOnWriteArrayList<>();
+    private final AtomicBoolean _flowingToDisk = new AtomicBoolean();
     private Map<String, String> _mimeTypeToFileExtension = Collections.emptyMap();
     private AdvanceConsumersTask _queueHouseKeepingTask;
     private volatile int _bindingCount;
     private volatile OverflowPolicyHandler _overflowPolicyHandler;
-    private long _flowToDiskThreshold;
+    private volatile long _brokerFlowToDiskThreshold;
+    private volatile long _brokerFlowToDiskLowerThreshold;
+    private volatile double _queueFlowToDiskCessationFraction;
 
     private interface HoldMethod
     {
@@ -471,7 +474,9 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>>
         }
 
         _mimeTypeToFileExtension = getContextValue(Map.class, MAP_OF_STRING_STRING, MIME_TYPE_TO_FILE_EXTENSION);
-        _flowToDiskThreshold = getAncestor(Broker.class).getFlowToDiskThreshold();
+        _queueFlowToDiskCessationFraction = getContextValue(Double.class, QUEUE_FLOW_TO_DISK_QUEUE_CESSATION_FRACTION);
+        _brokerFlowToDiskThreshold = getAncestor(Broker.class).getFlowToDiskThreshold();
+        _brokerFlowToDiskLowerThreshold = getAncestor(Broker.class).getFlowToDiskThreshold();
 
         if(_defaultFilters != null)
         {
@@ -1101,9 +1106,8 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>>
             doEnqueue(message, action, enqueueRecord);
         }
 
-        long estimatedQueueSize = _queueStatistics.getQueueSizeIncludingHeader();
-        _flowToDiskChecker.flowToDiskAndReportIfNecessary(message.getStoredMessage(), estimatedQueueSize,
-                                                          _targetQueueSize.get());
+        _flowToDiskChecker.flowToDiskIfNecessary(message.getStoredMessage(),
+                                                 _queueStatistics.getQueueSizeIncludingHeader());
     }
 
     public final void recover(ServerMessage message, final MessageEnqueueRecord enqueueRecord)
@@ -1770,6 +1774,7 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>>
     public void checkCapacity()
     {
         _overflowPolicyHandler.checkOverflow();
+        _flowToDiskChecker.ceaseFlowToDiskIfNecessary();
     }
 
     void notifyConsumers(QueueEntry entry)
@@ -2032,8 +2037,7 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>>
     {
         QueueEntryIterator queueListIterator = getEntries().iterator();
 
-        final long estimatedQueueSize = _queueStatistics.getQueueSizeIncludingHeader();
-        _flowToDiskChecker.reportFlowToDiskStatusIfNecessary(estimatedQueueSize, _targetQueueSize.get());
+        _flowToDiskChecker.ceaseFlowToDiskIfNecessary();
 
         final Set<NotificationCheck> perMessageChecks = new HashSet<>();
         final Set<NotificationCheck> queueLevelChecks = new HashSet<>();
@@ -2080,10 +2084,9 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>>
                         {
                             messageReference = msg.newReference();
                             cumulativeQueueSize += msg.getSizeIncludingHeader();
-                            _flowToDiskChecker.flowToDiskIfNecessary(msg.getStoredMessage(), cumulativeQueueSize,
-                                                                     _targetQueueSize.get());
+                            _flowToDiskChecker.flowToDiskIfNecessary(msg.getStoredMessage(), cumulativeQueueSize);
 
-                            for(NotificationCheck check : perMessageChecks)
+                            for (NotificationCheck check : perMessageChecks)
                             {
                                 checkForNotification(msg, listener, currentTime, thresholdTime, check);
                             }
@@ -2919,6 +2922,12 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>>
     }
 
     @Override
+    public boolean isFlowingToDisk()
+    {
+        return _flowingToDisk.get();
+    }
+
+    @Override
     public <C extends ConfiguredObject> Collection<C> getChildren(final Class<C> clazz)
     {
         if(clazz == org.apache.qpid.server.model.Consumer.class)
@@ -3331,65 +3340,43 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>>
 
     private class FlowToDiskChecker
     {
-        final AtomicBoolean _lastReportedFlowToDiskStatus = new AtomicBoolean(false);
-
-        void flowToDiskIfNecessary(StoredMessage<?> storedMessage, long estimatedQueueSize, final long targetQueueSize)
-        {
-            if ((estimatedQueueSize > targetQueueSize
-                 || QpidByteBuffer.getAllocatedDirectMemorySize() > _flowToDiskThreshold)
-                && storedMessage.isInMemory())
-            {
-                storedMessage.flowToDisk();
-            }
-        }
-
-        void flowToDiskAndReportIfNecessary(StoredMessage<?> storedMessage,
-                                            final long estimatedQueueSize,
-                                            final long targetQueueSize)
-        {
-            flowToDiskIfNecessary(storedMessage, estimatedQueueSize, targetQueueSize);
-            reportFlowToDiskStatusIfNecessary(estimatedQueueSize, targetQueueSize);
-        }
 
-        void reportFlowToDiskStatusIfNecessary(final long estimatedQueueSize, final long targetQueueSize)
+        void flowToDiskIfNecessary(StoredMessage<?> storedMessage, long currentQueueSize)
         {
+            final long targetQueueSize = _targetQueueSize.get();
             final int allocatedDirectMemorySize = QpidByteBuffer.getAllocatedDirectMemorySize();
-            if (estimatedQueueSize > targetQueueSize
-                || allocatedDirectMemorySize > _flowToDiskThreshold)
+            if ((currentQueueSize > targetQueueSize || allocatedDirectMemorySize > _brokerFlowToDiskThreshold) && storedMessage.isInMemory())
             {
-                reportFlowToDiskActiveIfNecessary(estimatedQueueSize, targetQueueSize, allocatedDirectMemorySize, _flowToDiskThreshold);
-            }
-            else
-            {
-                reportFlowToDiskInactiveIfNecessary(estimatedQueueSize, targetQueueSize, allocatedDirectMemorySize, _flowToDiskThreshold);
-            }
-        }
+                if (_flowingToDisk.compareAndSet(false, true))
+                {
 
-        private void reportFlowToDiskActiveIfNecessary(long estimatedQueueSize,
-                                                       long targetQueueSize,
-                                                       long allocatedDirectMemorySize,
-                                                       long flowToDiskThreshold)
-        {
-            if (!_lastReportedFlowToDiskStatus.getAndSet(true))
-            {
-                getEventLogger().message(_logSubject, QueueMessages.FLOW_TO_DISK_ACTIVE(estimatedQueueSize / 1024.0,
-                                                                                        targetQueueSize / 1024.0,
-                                                                                        allocatedDirectMemorySize / 1024.0 / 1024.0,
-                                                                                        flowToDiskThreshold / 1024.0 / 1024.0));
+                    getEventLogger().message(_logSubject, QueueMessages.FLOW_TO_DISK_ACTIVE(getQueueDepthBytesIncludingHeader() / 1024.0,
+                                                                                            targetQueueSize / 1024.0,
+                                                                                            allocatedDirectMemorySize / 1024.0 / 1024.0,
+                                                                                            _brokerFlowToDiskThreshold
+                                                                                            / 1024.0 / 1024.0));
+                }
+                storedMessage.flowToDisk();
             }
         }
 
-        private void reportFlowToDiskInactiveIfNecessary(long estimatedQueueSize,
-                                                         long targetQueueSize,
-                                                         long allocatedDirectMemorySize,
-                                                         long flowToDiskThreshold)
+        void ceaseFlowToDiskIfNecessary()
         {
-            if (_lastReportedFlowToDiskStatus.getAndSet(false))
+            final long currentQueueSize = _queueStatistics.getQueueSizeIncludingHeader();
+            final long targetQueueSize = _targetQueueSize.get();
+            final long allocatedDirectMemorySize = (long) QpidByteBuffer.getAllocatedDirectMemorySize();
+
+            if (currentQueueSize <= (targetQueueSize * _queueFlowToDiskCessationFraction)
+                && allocatedDirectMemorySize <= _brokerFlowToDiskLowerThreshold)
             {
-                getEventLogger().message(_logSubject, QueueMessages.FLOW_TO_DISK_INACTIVE(estimatedQueueSize / 1024.0,
-                                                                                          targetQueueSize / 1024.0,
-                                                                                          allocatedDirectMemorySize / 1024.0 / 1024.0,
-                                                                                          flowToDiskThreshold / 1024.0 / 1024.0));
+                if (_flowingToDisk.compareAndSet(true, false))
+                {
+                    getEventLogger().message(_logSubject, QueueMessages.FLOW_TO_DISK_INACTIVE(currentQueueSize / 1024.0,
+                                                                                              targetQueueSize / 1024.0,
+                                                                                              allocatedDirectMemorySize / 1024.0 / 1024.0,
+                                                                                              _brokerFlowToDiskThreshold
+                                                                                              / 1024.0 / 1024.0));
+                }
             }
         }
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org