You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by or...@apache.org on 2017/05/08 16:30:43 UTC
qpid-broker-j git commit: QPID-7770 : [Java Broker] Avoid rapid
oscillation of flow to disk state
Repository: qpid-broker-j
Updated Branches:
refs/heads/master 01601b173 -> ac08557c8
QPID-7770 : [Java Broker] Avoid rapid oscillation of flow to disk state
Project: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/commit/ac08557c
Tree: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/tree/ac08557c
Diff: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/diff/ac08557c
Branch: refs/heads/master
Commit: ac08557c89f29641f1b5a42332582379fb19eab6
Parents: 01601b1
Author: Alex Rudyy <or...@apache.org>
Authored: Mon May 8 15:42:35 2017 +0100
Committer: Alex Rudyy <or...@apache.org>
Committed: Mon May 8 15:42:35 2017 +0100
----------------------------------------------------------------------
.../org/apache/qpid/server/model/Broker.java | 10 ++
.../apache/qpid/server/model/BrokerImpl.java | 8 ++
.../org/apache/qpid/server/model/Queue.java | 9 +-
.../apache/qpid/server/queue/AbstractQueue.java | 105 ++++++++-----------
4 files changed, 72 insertions(+), 60 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/ac08557c/broker-core/src/main/java/org/apache/qpid/server/model/Broker.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/model/Broker.java b/broker-core/src/main/java/org/apache/qpid/server/model/Broker.java
index 19e6de0..5545f28 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/model/Broker.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/model/Broker.java
@@ -85,6 +85,13 @@ public interface Broker<X extends Broker<X>> extends ConfiguredObject<X>, EventL
@ManagedContextDefault(name = BROKER_FLOW_TO_DISK_THRESHOLD)
long DEFAULT_FLOW_TO_DISK_THRESHOLD = (long)(0.75 * (double) BrokerImpl.getMaxDirectMemorySize());
+
+ String BROKER_FLOW_TO_DISK_CESSATION_FRACTION = "broker.flowToDiskCessationFraction";
+ @SuppressWarnings("unused")
+ @ManagedContextDefault( name = BROKER_FLOW_TO_DISK_CESSATION_FRACTION,
+ description = "Fraction of the flow to disk threshold where flow to disk will cease.")
+ double DEFAULT_BROKER_FLOW_TO_DISK_CESSATION_FRACTION = 0.8;
+
String COMPACT_MEMORY_THRESHOLD = "qpid.compact_memory_threshold";
@SuppressWarnings("unused")
@ManagedContextDefault(name = COMPACT_MEMORY_THRESHOLD)
@@ -368,6 +375,9 @@ public interface Broker<X extends Broker<X>> extends ConfiguredObject<X>, EventL
@DerivedAttribute(description = "Threshold direct memory size (in bytes) at which the Broker will start flowing incoming messages to disk.")
long getFlowToDiskThreshold();
+ @DerivedAttribute(description = "Threshold direct memory size (in bytes) at which the Broker will cease flowing incoming messages to disk.")
+ long getFlowToDiskCessationThreshold();
+
@DerivedAttribute(description = "Threshold direct memory size (in bytes) at which the Broker will start considering to compact sparse buffers. Set to -1 to disable.")
long getCompactMemoryThreshold();
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/ac08557c/broker-core/src/main/java/org/apache/qpid/server/model/BrokerImpl.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/model/BrokerImpl.java b/broker-core/src/main/java/org/apache/qpid/server/model/BrokerImpl.java
index 0840b66..d7182bc 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/model/BrokerImpl.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/model/BrokerImpl.java
@@ -150,6 +150,7 @@ public class BrokerImpl extends AbstractContainer<BrokerImpl> implements Broker<
private long _compactMemoryThreshold;
private long _compactMemoryInterval;
private long _flowToDiskThreshold;
+ private long _flowToDiskCessationThreshold;
private double _sparsityFraction;
private long _lastDisposalCounter;
@@ -622,6 +623,7 @@ public class BrokerImpl extends AbstractContainer<BrokerImpl> implements Broker<
getEventLogger().message(BrokerMessages.MAX_MEMORY(heapMemory, directMemory));
_flowToDiskThreshold = getContextValue(Long.class, BROKER_FLOW_TO_DISK_THRESHOLD);
+ _flowToDiskCessationThreshold = (long) (getContextValue(Double.class, BROKER_FLOW_TO_DISK_CESSATION_FRACTION) * _flowToDiskThreshold);
_compactMemoryThreshold = getContextValue(Long.class, Broker.COMPACT_MEMORY_THRESHOLD);
_compactMemoryInterval = getContextValue(Long.class, Broker.COMPACT_MEMORY_INTERVAL);
@@ -886,6 +888,12 @@ public class BrokerImpl extends AbstractContainer<BrokerImpl> implements Broker<
}
@Override
+ public long getFlowToDiskCessationThreshold()
+ {
+ return _flowToDiskCessationThreshold;
+ }
+
+ @Override
public long getNumberOfActivePooledBuffers()
{
return QpidByteBuffer.getNumberOfActivePooledBuffers();
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/ac08557c/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java b/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java
index f2cbc25..94d02ee 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java
@@ -210,7 +210,11 @@ public interface Queue<X extends Queue<X>> extends ConfiguredObject<X>,
+ "none is explicitly set")
String DEFAULT_MESSAGE_DURABILTY = "DEFAULT";
-
+ String QUEUE_FLOW_TO_DISK_QUEUE_CESSATION_FRACTION = "queue.flowToDiskQueueCessationFraction";
+ @SuppressWarnings("unused")
+ @ManagedContextDefault( name = QUEUE_FLOW_TO_DISK_QUEUE_CESSATION_FRACTION,
+ description = "The fraction of this queue's 'target size' where the queue will cease flowing messages to disk")
+ double DEFAULT_QUEUE_FLOW_TO_DISK_QUEUE_CESSATION_FRACTION = 0.80;
@ManagedAttribute( defaultValue = "${queue.defaultMessageDurability}" )
MessageDurability getMessageDurability();
@@ -276,6 +280,9 @@ public interface Queue<X extends Queue<X>> extends ConfiguredObject<X>,
mandatory = true)
OverflowPolicy getOverflowPolicy();
+ @DerivedAttribute(description = "Returns 'true' when the broker is flowing transient messages to disk")
+ boolean isFlowingToDisk();
+
@ManagedOperation(nonModifying = true, changesConfiguredObjectState = false)
Collection<PublishingLink> getPublishingLinks();
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/ac08557c/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java b/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
index 6deda67..9a5f9d7 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
@@ -262,11 +262,14 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>>
private boolean _closing;
private final ConcurrentMap<String, Callable<MessageFilter>> _defaultFiltersMap = new ConcurrentHashMap<>();
private final List<HoldMethod> _holdMethods = new CopyOnWriteArrayList<>();
+ private final AtomicBoolean _flowingToDisk = new AtomicBoolean();
private Map<String, String> _mimeTypeToFileExtension = Collections.emptyMap();
private AdvanceConsumersTask _queueHouseKeepingTask;
private volatile int _bindingCount;
private volatile OverflowPolicyHandler _overflowPolicyHandler;
- private long _flowToDiskThreshold;
+ private volatile long _brokerFlowToDiskThreshold;
+ private volatile long _brokerFlowToDiskLowerThreshold;
+ private volatile double _queueFlowToDiskCessationFraction;
private interface HoldMethod
{
@@ -471,7 +474,9 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>>
}
_mimeTypeToFileExtension = getContextValue(Map.class, MAP_OF_STRING_STRING, MIME_TYPE_TO_FILE_EXTENSION);
- _flowToDiskThreshold = getAncestor(Broker.class).getFlowToDiskThreshold();
+ _queueFlowToDiskCessationFraction = getContextValue(Double.class, QUEUE_FLOW_TO_DISK_QUEUE_CESSATION_FRACTION);
+ _brokerFlowToDiskThreshold = getAncestor(Broker.class).getFlowToDiskThreshold();
+ _brokerFlowToDiskLowerThreshold = getAncestor(Broker.class).getFlowToDiskThreshold();
if(_defaultFilters != null)
{
@@ -1101,9 +1106,8 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>>
doEnqueue(message, action, enqueueRecord);
}
- long estimatedQueueSize = _queueStatistics.getQueueSizeIncludingHeader();
- _flowToDiskChecker.flowToDiskAndReportIfNecessary(message.getStoredMessage(), estimatedQueueSize,
- _targetQueueSize.get());
+ _flowToDiskChecker.flowToDiskIfNecessary(message.getStoredMessage(),
+ _queueStatistics.getQueueSizeIncludingHeader());
}
public final void recover(ServerMessage message, final MessageEnqueueRecord enqueueRecord)
@@ -1770,6 +1774,7 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>>
public void checkCapacity()
{
_overflowPolicyHandler.checkOverflow();
+ _flowToDiskChecker.ceaseFlowToDiskIfNecessary();
}
void notifyConsumers(QueueEntry entry)
@@ -2032,8 +2037,7 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>>
{
QueueEntryIterator queueListIterator = getEntries().iterator();
- final long estimatedQueueSize = _queueStatistics.getQueueSizeIncludingHeader();
- _flowToDiskChecker.reportFlowToDiskStatusIfNecessary(estimatedQueueSize, _targetQueueSize.get());
+ _flowToDiskChecker.ceaseFlowToDiskIfNecessary();
final Set<NotificationCheck> perMessageChecks = new HashSet<>();
final Set<NotificationCheck> queueLevelChecks = new HashSet<>();
@@ -2080,10 +2084,9 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>>
{
messageReference = msg.newReference();
cumulativeQueueSize += msg.getSizeIncludingHeader();
- _flowToDiskChecker.flowToDiskIfNecessary(msg.getStoredMessage(), cumulativeQueueSize,
- _targetQueueSize.get());
+ _flowToDiskChecker.flowToDiskIfNecessary(msg.getStoredMessage(), cumulativeQueueSize);
- for(NotificationCheck check : perMessageChecks)
+ for (NotificationCheck check : perMessageChecks)
{
checkForNotification(msg, listener, currentTime, thresholdTime, check);
}
@@ -2919,6 +2922,12 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>>
}
@Override
+ public boolean isFlowingToDisk()
+ {
+ return _flowingToDisk.get();
+ }
+
+ @Override
public <C extends ConfiguredObject> Collection<C> getChildren(final Class<C> clazz)
{
if(clazz == org.apache.qpid.server.model.Consumer.class)
@@ -3331,65 +3340,43 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>>
private class FlowToDiskChecker
{
- final AtomicBoolean _lastReportedFlowToDiskStatus = new AtomicBoolean(false);
-
- void flowToDiskIfNecessary(StoredMessage<?> storedMessage, long estimatedQueueSize, final long targetQueueSize)
- {
- if ((estimatedQueueSize > targetQueueSize
- || QpidByteBuffer.getAllocatedDirectMemorySize() > _flowToDiskThreshold)
- && storedMessage.isInMemory())
- {
- storedMessage.flowToDisk();
- }
- }
-
- void flowToDiskAndReportIfNecessary(StoredMessage<?> storedMessage,
- final long estimatedQueueSize,
- final long targetQueueSize)
- {
- flowToDiskIfNecessary(storedMessage, estimatedQueueSize, targetQueueSize);
- reportFlowToDiskStatusIfNecessary(estimatedQueueSize, targetQueueSize);
- }
- void reportFlowToDiskStatusIfNecessary(final long estimatedQueueSize, final long targetQueueSize)
+ void flowToDiskIfNecessary(StoredMessage<?> storedMessage, long currentQueueSize)
{
+ final long targetQueueSize = _targetQueueSize.get();
final int allocatedDirectMemorySize = QpidByteBuffer.getAllocatedDirectMemorySize();
- if (estimatedQueueSize > targetQueueSize
- || allocatedDirectMemorySize > _flowToDiskThreshold)
+ if ((currentQueueSize > targetQueueSize || allocatedDirectMemorySize > _brokerFlowToDiskThreshold) && storedMessage.isInMemory())
{
- reportFlowToDiskActiveIfNecessary(estimatedQueueSize, targetQueueSize, allocatedDirectMemorySize, _flowToDiskThreshold);
- }
- else
- {
- reportFlowToDiskInactiveIfNecessary(estimatedQueueSize, targetQueueSize, allocatedDirectMemorySize, _flowToDiskThreshold);
- }
- }
+ if (_flowingToDisk.compareAndSet(false, true))
+ {
- private void reportFlowToDiskActiveIfNecessary(long estimatedQueueSize,
- long targetQueueSize,
- long allocatedDirectMemorySize,
- long flowToDiskThreshold)
- {
- if (!_lastReportedFlowToDiskStatus.getAndSet(true))
- {
- getEventLogger().message(_logSubject, QueueMessages.FLOW_TO_DISK_ACTIVE(estimatedQueueSize / 1024.0,
- targetQueueSize / 1024.0,
- allocatedDirectMemorySize / 1024.0 / 1024.0,
- flowToDiskThreshold / 1024.0 / 1024.0));
+ getEventLogger().message(_logSubject, QueueMessages.FLOW_TO_DISK_ACTIVE(getQueueDepthBytesIncludingHeader() / 1024.0,
+ targetQueueSize / 1024.0,
+ allocatedDirectMemorySize / 1024.0 / 1024.0,
+ _brokerFlowToDiskThreshold
+ / 1024.0 / 1024.0));
+ }
+ storedMessage.flowToDisk();
}
}
- private void reportFlowToDiskInactiveIfNecessary(long estimatedQueueSize,
- long targetQueueSize,
- long allocatedDirectMemorySize,
- long flowToDiskThreshold)
+ void ceaseFlowToDiskIfNecessary()
{
- if (_lastReportedFlowToDiskStatus.getAndSet(false))
+ final long currentQueueSize = _queueStatistics.getQueueSizeIncludingHeader();
+ final long targetQueueSize = _targetQueueSize.get();
+ final long allocatedDirectMemorySize = (long) QpidByteBuffer.getAllocatedDirectMemorySize();
+
+ if (currentQueueSize <= (targetQueueSize * _queueFlowToDiskCessationFraction)
+ && allocatedDirectMemorySize <= _brokerFlowToDiskLowerThreshold)
{
- getEventLogger().message(_logSubject, QueueMessages.FLOW_TO_DISK_INACTIVE(estimatedQueueSize / 1024.0,
- targetQueueSize / 1024.0,
- allocatedDirectMemorySize / 1024.0 / 1024.0,
- flowToDiskThreshold / 1024.0 / 1024.0));
+ if (_flowingToDisk.compareAndSet(true, false))
+ {
+ getEventLogger().message(_logSubject, QueueMessages.FLOW_TO_DISK_INACTIVE(currentQueueSize / 1024.0,
+ targetQueueSize / 1024.0,
+ allocatedDirectMemorySize / 1024.0 / 1024.0,
+ _brokerFlowToDiskThreshold
+ / 1024.0 / 1024.0));
+ }
}
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org