You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by lq...@apache.org on 2017/05/15 16:13:48 UTC
[1/2] qpid-broker-j git commit: Revert "QPID-7770 : [Java Broker]
Avoid rapid oscillation of flow to disk state"
Repository: qpid-broker-j
Updated Branches:
refs/heads/master a5b0f7761 -> 501aa8919
Revert "QPID-7770 : [Java Broker] Avoid rapid oscillation of flow to disk state"
This reverts commit ac08557c89f29641f1b5a42332582379fb19eab6.
Project: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/commit/ba0e9c3f
Tree: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/tree/ba0e9c3f
Diff: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/diff/ba0e9c3f
Branch: refs/heads/master
Commit: ba0e9c3f5bd3ac8aa782f916327c37392c7f6236
Parents: a5b0f77
Author: Lorenz Quack <lq...@apache.org>
Authored: Fri May 12 13:45:08 2017 +0100
Committer: Lorenz Quack <lq...@apache.org>
Committed: Mon May 15 11:52:47 2017 +0100
----------------------------------------------------------------------
.../org/apache/qpid/server/model/Broker.java | 10 --
.../apache/qpid/server/model/BrokerImpl.java | 8 --
.../org/apache/qpid/server/model/Queue.java | 9 +-
.../apache/qpid/server/queue/AbstractQueue.java | 105 +++++++++++--------
4 files changed, 60 insertions(+), 72 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/ba0e9c3f/broker-core/src/main/java/org/apache/qpid/server/model/Broker.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/model/Broker.java b/broker-core/src/main/java/org/apache/qpid/server/model/Broker.java
index 5545f28..19e6de0 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/model/Broker.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/model/Broker.java
@@ -85,13 +85,6 @@ public interface Broker<X extends Broker<X>> extends ConfiguredObject<X>, EventL
@ManagedContextDefault(name = BROKER_FLOW_TO_DISK_THRESHOLD)
long DEFAULT_FLOW_TO_DISK_THRESHOLD = (long)(0.75 * (double) BrokerImpl.getMaxDirectMemorySize());
-
- String BROKER_FLOW_TO_DISK_CESSATION_FRACTION = "broker.flowToDiskCessationFraction";
- @SuppressWarnings("unused")
- @ManagedContextDefault( name = BROKER_FLOW_TO_DISK_CESSATION_FRACTION,
- description = "Fraction of the flow to disk threshold where flow to disk will cease.")
- double DEFAULT_BROKER_FLOW_TO_DISK_CESSATION_FRACTION = 0.8;
-
String COMPACT_MEMORY_THRESHOLD = "qpid.compact_memory_threshold";
@SuppressWarnings("unused")
@ManagedContextDefault(name = COMPACT_MEMORY_THRESHOLD)
@@ -375,9 +368,6 @@ public interface Broker<X extends Broker<X>> extends ConfiguredObject<X>, EventL
@DerivedAttribute(description = "Threshold direct memory size (in bytes) at which the Broker will start flowing incoming messages to disk.")
long getFlowToDiskThreshold();
- @DerivedAttribute(description = "Threshold direct memory size (in bytes) at which the Broker will cease flowing incoming messages to disk.")
- long getFlowToDiskCessationThreshold();
-
@DerivedAttribute(description = "Threshold direct memory size (in bytes) at which the Broker will start considering to compact sparse buffers. Set to -1 to disable.")
long getCompactMemoryThreshold();
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/ba0e9c3f/broker-core/src/main/java/org/apache/qpid/server/model/BrokerImpl.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/model/BrokerImpl.java b/broker-core/src/main/java/org/apache/qpid/server/model/BrokerImpl.java
index d7182bc..0840b66 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/model/BrokerImpl.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/model/BrokerImpl.java
@@ -150,7 +150,6 @@ public class BrokerImpl extends AbstractContainer<BrokerImpl> implements Broker<
private long _compactMemoryThreshold;
private long _compactMemoryInterval;
private long _flowToDiskThreshold;
- private long _flowToDiskCessationThreshold;
private double _sparsityFraction;
private long _lastDisposalCounter;
@@ -623,7 +622,6 @@ public class BrokerImpl extends AbstractContainer<BrokerImpl> implements Broker<
getEventLogger().message(BrokerMessages.MAX_MEMORY(heapMemory, directMemory));
_flowToDiskThreshold = getContextValue(Long.class, BROKER_FLOW_TO_DISK_THRESHOLD);
- _flowToDiskCessationThreshold = (long) (getContextValue(Double.class, BROKER_FLOW_TO_DISK_CESSATION_FRACTION) * _flowToDiskThreshold);
_compactMemoryThreshold = getContextValue(Long.class, Broker.COMPACT_MEMORY_THRESHOLD);
_compactMemoryInterval = getContextValue(Long.class, Broker.COMPACT_MEMORY_INTERVAL);
@@ -888,12 +886,6 @@ public class BrokerImpl extends AbstractContainer<BrokerImpl> implements Broker<
}
@Override
- public long getFlowToDiskCessationThreshold()
- {
- return _flowToDiskCessationThreshold;
- }
-
- @Override
public long getNumberOfActivePooledBuffers()
{
return QpidByteBuffer.getNumberOfActivePooledBuffers();
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/ba0e9c3f/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java b/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java
index 94d02ee..f2cbc25 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java
@@ -210,11 +210,7 @@ public interface Queue<X extends Queue<X>> extends ConfiguredObject<X>,
+ "none is explicitly set")
String DEFAULT_MESSAGE_DURABILTY = "DEFAULT";
- String QUEUE_FLOW_TO_DISK_QUEUE_CESSATION_FRACTION = "queue.flowToDiskQueueCessationFraction";
- @SuppressWarnings("unused")
- @ManagedContextDefault( name = QUEUE_FLOW_TO_DISK_QUEUE_CESSATION_FRACTION,
- description = "The fraction of this queue's 'target size' where the queue will cease flowing messages to disk")
- double DEFAULT_QUEUE_FLOW_TO_DISK_QUEUE_CESSATION_FRACTION = 0.80;
+
@ManagedAttribute( defaultValue = "${queue.defaultMessageDurability}" )
MessageDurability getMessageDurability();
@@ -280,9 +276,6 @@ public interface Queue<X extends Queue<X>> extends ConfiguredObject<X>,
mandatory = true)
OverflowPolicy getOverflowPolicy();
- @DerivedAttribute(description = "Returns 'true' when the broker is flowing transient messages to disk")
- boolean isFlowingToDisk();
-
@ManagedOperation(nonModifying = true, changesConfiguredObjectState = false)
Collection<PublishingLink> getPublishingLinks();
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/ba0e9c3f/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java b/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
index 9a5f9d7..6deda67 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
@@ -262,14 +262,11 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>>
private boolean _closing;
private final ConcurrentMap<String, Callable<MessageFilter>> _defaultFiltersMap = new ConcurrentHashMap<>();
private final List<HoldMethod> _holdMethods = new CopyOnWriteArrayList<>();
- private final AtomicBoolean _flowingToDisk = new AtomicBoolean();
private Map<String, String> _mimeTypeToFileExtension = Collections.emptyMap();
private AdvanceConsumersTask _queueHouseKeepingTask;
private volatile int _bindingCount;
private volatile OverflowPolicyHandler _overflowPolicyHandler;
- private volatile long _brokerFlowToDiskThreshold;
- private volatile long _brokerFlowToDiskLowerThreshold;
- private volatile double _queueFlowToDiskCessationFraction;
+ private long _flowToDiskThreshold;
private interface HoldMethod
{
@@ -474,9 +471,7 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>>
}
_mimeTypeToFileExtension = getContextValue(Map.class, MAP_OF_STRING_STRING, MIME_TYPE_TO_FILE_EXTENSION);
- _queueFlowToDiskCessationFraction = getContextValue(Double.class, QUEUE_FLOW_TO_DISK_QUEUE_CESSATION_FRACTION);
- _brokerFlowToDiskThreshold = getAncestor(Broker.class).getFlowToDiskThreshold();
- _brokerFlowToDiskLowerThreshold = getAncestor(Broker.class).getFlowToDiskThreshold();
+ _flowToDiskThreshold = getAncestor(Broker.class).getFlowToDiskThreshold();
if(_defaultFilters != null)
{
@@ -1106,8 +1101,9 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>>
doEnqueue(message, action, enqueueRecord);
}
- _flowToDiskChecker.flowToDiskIfNecessary(message.getStoredMessage(),
- _queueStatistics.getQueueSizeIncludingHeader());
+ long estimatedQueueSize = _queueStatistics.getQueueSizeIncludingHeader();
+ _flowToDiskChecker.flowToDiskAndReportIfNecessary(message.getStoredMessage(), estimatedQueueSize,
+ _targetQueueSize.get());
}
public final void recover(ServerMessage message, final MessageEnqueueRecord enqueueRecord)
@@ -1774,7 +1770,6 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>>
public void checkCapacity()
{
_overflowPolicyHandler.checkOverflow();
- _flowToDiskChecker.ceaseFlowToDiskIfNecessary();
}
void notifyConsumers(QueueEntry entry)
@@ -2037,7 +2032,8 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>>
{
QueueEntryIterator queueListIterator = getEntries().iterator();
- _flowToDiskChecker.ceaseFlowToDiskIfNecessary();
+ final long estimatedQueueSize = _queueStatistics.getQueueSizeIncludingHeader();
+ _flowToDiskChecker.reportFlowToDiskStatusIfNecessary(estimatedQueueSize, _targetQueueSize.get());
final Set<NotificationCheck> perMessageChecks = new HashSet<>();
final Set<NotificationCheck> queueLevelChecks = new HashSet<>();
@@ -2084,9 +2080,10 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>>
{
messageReference = msg.newReference();
cumulativeQueueSize += msg.getSizeIncludingHeader();
- _flowToDiskChecker.flowToDiskIfNecessary(msg.getStoredMessage(), cumulativeQueueSize);
+ _flowToDiskChecker.flowToDiskIfNecessary(msg.getStoredMessage(), cumulativeQueueSize,
+ _targetQueueSize.get());
- for (NotificationCheck check : perMessageChecks)
+ for(NotificationCheck check : perMessageChecks)
{
checkForNotification(msg, listener, currentTime, thresholdTime, check);
}
@@ -2922,12 +2919,6 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>>
}
@Override
- public boolean isFlowingToDisk()
- {
- return _flowingToDisk.get();
- }
-
- @Override
public <C extends ConfiguredObject> Collection<C> getChildren(final Class<C> clazz)
{
if(clazz == org.apache.qpid.server.model.Consumer.class)
@@ -3340,43 +3331,65 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>>
private class FlowToDiskChecker
{
+ final AtomicBoolean _lastReportedFlowToDiskStatus = new AtomicBoolean(false);
- void flowToDiskIfNecessary(StoredMessage<?> storedMessage, long currentQueueSize)
+ void flowToDiskIfNecessary(StoredMessage<?> storedMessage, long estimatedQueueSize, final long targetQueueSize)
{
- final long targetQueueSize = _targetQueueSize.get();
- final int allocatedDirectMemorySize = QpidByteBuffer.getAllocatedDirectMemorySize();
- if ((currentQueueSize > targetQueueSize || allocatedDirectMemorySize > _brokerFlowToDiskThreshold) && storedMessage.isInMemory())
+ if ((estimatedQueueSize > targetQueueSize
+ || QpidByteBuffer.getAllocatedDirectMemorySize() > _flowToDiskThreshold)
+ && storedMessage.isInMemory())
{
- if (_flowingToDisk.compareAndSet(false, true))
- {
-
- getEventLogger().message(_logSubject, QueueMessages.FLOW_TO_DISK_ACTIVE(getQueueDepthBytesIncludingHeader() / 1024.0,
- targetQueueSize / 1024.0,
- allocatedDirectMemorySize / 1024.0 / 1024.0,
- _brokerFlowToDiskThreshold
- / 1024.0 / 1024.0));
- }
storedMessage.flowToDisk();
}
}
- void ceaseFlowToDiskIfNecessary()
+ void flowToDiskAndReportIfNecessary(StoredMessage<?> storedMessage,
+ final long estimatedQueueSize,
+ final long targetQueueSize)
{
- final long currentQueueSize = _queueStatistics.getQueueSizeIncludingHeader();
- final long targetQueueSize = _targetQueueSize.get();
- final long allocatedDirectMemorySize = (long) QpidByteBuffer.getAllocatedDirectMemorySize();
+ flowToDiskIfNecessary(storedMessage, estimatedQueueSize, targetQueueSize);
+ reportFlowToDiskStatusIfNecessary(estimatedQueueSize, targetQueueSize);
+ }
- if (currentQueueSize <= (targetQueueSize * _queueFlowToDiskCessationFraction)
- && allocatedDirectMemorySize <= _brokerFlowToDiskLowerThreshold)
+ void reportFlowToDiskStatusIfNecessary(final long estimatedQueueSize, final long targetQueueSize)
+ {
+ final int allocatedDirectMemorySize = QpidByteBuffer.getAllocatedDirectMemorySize();
+ if (estimatedQueueSize > targetQueueSize
+ || allocatedDirectMemorySize > _flowToDiskThreshold)
{
- if (_flowingToDisk.compareAndSet(true, false))
- {
- getEventLogger().message(_logSubject, QueueMessages.FLOW_TO_DISK_INACTIVE(currentQueueSize / 1024.0,
- targetQueueSize / 1024.0,
- allocatedDirectMemorySize / 1024.0 / 1024.0,
- _brokerFlowToDiskThreshold
- / 1024.0 / 1024.0));
- }
+ reportFlowToDiskActiveIfNecessary(estimatedQueueSize, targetQueueSize, allocatedDirectMemorySize, _flowToDiskThreshold);
+ }
+ else
+ {
+ reportFlowToDiskInactiveIfNecessary(estimatedQueueSize, targetQueueSize, allocatedDirectMemorySize, _flowToDiskThreshold);
+ }
+ }
+
+ private void reportFlowToDiskActiveIfNecessary(long estimatedQueueSize,
+ long targetQueueSize,
+ long allocatedDirectMemorySize,
+ long flowToDiskThreshold)
+ {
+ if (!_lastReportedFlowToDiskStatus.getAndSet(true))
+ {
+ getEventLogger().message(_logSubject, QueueMessages.FLOW_TO_DISK_ACTIVE(estimatedQueueSize / 1024.0,
+ targetQueueSize / 1024.0,
+ allocatedDirectMemorySize / 1024.0 / 1024.0,
+ flowToDiskThreshold / 1024.0 / 1024.0));
+ }
+ }
+
+ private void reportFlowToDiskInactiveIfNecessary(long estimatedQueueSize,
+ long targetQueueSize,
+ long allocatedDirectMemorySize,
+ long flowToDiskThreshold)
+ {
+ if (_lastReportedFlowToDiskStatus.getAndSet(false))
+ {
+ getEventLogger().message(_logSubject, QueueMessages.FLOW_TO_DISK_INACTIVE(estimatedQueueSize / 1024.0,
+ targetQueueSize / 1024.0,
+ allocatedDirectMemorySize / 1024.0 / 1024.0,
+ flowToDiskThreshold / 1024.0 / 1024.0));
}
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org
[2/2] qpid-broker-j git commit: QPID-7775: [Java Broker] Trigger flow
to disk based on actual memory occupancy.
Posted by lq...@apache.org.
QPID-7775: [Java Broker] Trigger flow to disk based on actual memory occupancy.
Project: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/commit/501aa891
Tree: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/tree/501aa891
Diff: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/diff/501aa891
Branch: refs/heads/master
Commit: 501aa8919c8cd4938b6fa26eef9e6746e267175b
Parents: ba0e9c3
Author: Lorenz Quack <lq...@apache.org>
Authored: Fri May 12 16:21:14 2017 +0100
Committer: Lorenz Quack <lq...@apache.org>
Committed: Mon May 15 17:13:27 2017 +0100
----------------------------------------------------------------------
.../berkeleydb/AbstractBDBMessageStore.java | 41 +++++-
.../store/berkeleydb/BDBConfigurationStore.java | 1 +
.../store/berkeleydb/BDBMessageStore.java | 1 +
.../server/logging/messages/QueueMessages.java | 121 -----------------
.../messages/Queue_logmessages.properties | 6 +-
.../message/AbstractServerMessageImpl.java | 9 +-
.../qpid/server/message/MessageReference.java | 7 +-
.../org/apache/qpid/server/model/Broker.java | 6 +
.../apache/qpid/server/model/BrokerImpl.java | 18 +++
.../org/apache/qpid/server/model/Queue.java | 5 +-
.../apache/qpid/server/queue/AbstractQueue.java | 113 ++--------------
.../qpid/server/store/MemoryMessageStore.java | 26 +++-
.../apache/qpid/server/store/MessageStore.java | 4 +
.../qpid/server/store/NullMessageStore.java | 12 ++
.../server/virtualhost/AbstractVirtualHost.java | 134 ++++++++++++-------
.../virtualhost/QueueManagingVirtualHost.java | 8 ++
.../server/store/TestMessageMetaDataType.java | 6 +
.../store/jdbc/AbstractJDBCMessageStore.java | 34 ++++-
18 files changed, 264 insertions(+), 288 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/501aa891/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java
----------------------------------------------------------------------
diff --git a/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java b/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java
index ce25a8f..6f1326e 100644
--- a/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java
+++ b/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java
@@ -30,6 +30,7 @@ import java.util.Collections;
import java.util.List;
import java.util.Random;
import java.util.UUID;
+import java.util.concurrent.atomic.AtomicLong;
import com.google.common.util.concurrent.ListenableFuture;
import com.sleepycat.bind.tuple.LongBinding;
@@ -100,6 +101,8 @@ public abstract class AbstractBDBMessageStore implements MessageStore
private boolean _limitBusted;
private long _totalStoreSize;
private final Random _lockConflictRandom = new Random();
+ private final AtomicLong _inMemorySize = new AtomicLong();
+ private final AtomicLong _bytesEvacuatedFromMemory = new AtomicLong();
@Override
public void upgradeStoreStructure() throws StoreException
@@ -147,6 +150,18 @@ public abstract class AbstractBDBMessageStore implements MessageStore
}
@Override
+ public long getInMemorySize()
+ {
+ return _inMemorySize.get();
+ }
+
+ @Override
+ public long getBytesEvacuatedFromMemory()
+ {
+ return _bytesEvacuatedFromMemory.get();
+ }
+
+ @Override
public boolean isPersistent()
{
return true;
@@ -167,6 +182,13 @@ public abstract class AbstractBDBMessageStore implements MessageStore
}
@Override
+ public void closeMessageStore()
+ {
+ _inMemorySize.set(0);
+ _bytesEvacuatedFromMemory.set(0);
+ }
+
+ @Override
public MessageStoreReader newMessageStoreReader()
{
return new BDBMessageStoreReader();
@@ -979,10 +1001,12 @@ public abstract class AbstractBDBMessageStore implements MessageStore
_data = data;
}
- public void clear()
+ public long clear()
{
+ long bytesCleared = 0;
if(_metaData != null)
{
+ bytesCleared += _metaData.getStorableSize();
_metaData.clearEncodedForm();
_metaData = null;
}
@@ -990,10 +1014,12 @@ public abstract class AbstractBDBMessageStore implements MessageStore
{
for(QpidByteBuffer buf : _data)
{
+ bytesCleared += buf.remaining();
buf.dispose();
}
+ _data = null;
}
- _data = null;
+ return bytesCleared;
}
@Override
@@ -1037,7 +1063,9 @@ public abstract class AbstractBDBMessageStore implements MessageStore
{
_messageDataRef = new MessageDataSoftRef<>(metaData, null);
}
+
_contentSize = metaData.getContentSize();
+ _inMemorySize.addAndGet(metaData.getStorableSize());
}
@Override
@@ -1089,6 +1117,7 @@ public abstract class AbstractBDBMessageStore implements MessageStore
@Override
public StoredMessage<T> allContentAdded()
{
+ _inMemorySize.addAndGet(getContentSize());
return this;
}
@@ -1225,14 +1254,17 @@ public abstract class AbstractBDBMessageStore implements MessageStore
}
final T metaData;
+ long bytesCleared = 0;
if ((metaData =_messageDataRef.getMetaData()) != null)
{
+ bytesCleared += metaData.getStorableSize();
metaData.dispose();
}
Collection<QpidByteBuffer> data = _messageDataRef.getData();
if(data != null)
{
+ bytesCleared += getContentSize();
_messageDataRef.setData(null);
for(QpidByteBuffer buf : data)
{
@@ -1240,6 +1272,7 @@ public abstract class AbstractBDBMessageStore implements MessageStore
}
}
_messageDataRef = null;
+ _inMemorySize.addAndGet(-bytesCleared);
}
@Override
@@ -1260,7 +1293,9 @@ public abstract class AbstractBDBMessageStore implements MessageStore
flushToStore();
if(_messageDataRef != null && !_messageDataRef.isHardRef())
{
- ((MessageDataSoftRef)_messageDataRef).clear();
+ final long bytesCleared = ((MessageDataSoftRef) _messageDataRef).clear();
+ _inMemorySize.addAndGet(-bytesCleared);
+ _bytesEvacuatedFromMemory.addAndGet(bytesCleared);
}
return true;
}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/501aa891/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBConfigurationStore.java
----------------------------------------------------------------------
diff --git a/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBConfigurationStore.java b/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBConfigurationStore.java
index 2e854f7..ca7b4dd 100644
--- a/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBConfigurationStore.java
+++ b/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBConfigurationStore.java
@@ -592,6 +592,7 @@ public class BDBConfigurationStore implements MessageStoreProvider, DurableConfi
@Override
public void closeMessageStore()
{
+ super.closeMessageStore();
_messageStoreOpen.set(false);
}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/501aa891/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java
----------------------------------------------------------------------
diff --git a/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java b/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java
index a5ebc68..83df5aa 100644
--- a/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java
+++ b/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java
@@ -83,6 +83,7 @@ public class BDBMessageStore extends AbstractBDBMessageStore
@Override
public void closeMessageStore()
{
+ super.closeMessageStore();
if (_messageStoreOpen.compareAndSet(true, false))
{
if (_environmentFacade != null)
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/501aa891/broker-core/src/main/java/org/apache/qpid/server/logging/messages/QueueMessages.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/logging/messages/QueueMessages.java b/broker-core/src/main/java/org/apache/qpid/server/logging/messages/QueueMessages.java
index e73e173..5df5b21 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/logging/messages/QueueMessages.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/logging/messages/QueueMessages.java
@@ -67,10 +67,8 @@ public class QueueMessages
public static final String DROPPED_LOG_HIERARCHY = DEFAULT_LOG_HIERARCHY_PREFIX + "queue.dropped";
public static final String OVERFULL_LOG_HIERARCHY = DEFAULT_LOG_HIERARCHY_PREFIX + "queue.overfull";
public static final String OPERATION_LOG_HIERARCHY = DEFAULT_LOG_HIERARCHY_PREFIX + "queue.operation";
- public static final String FLOW_TO_DISK_ACTIVE_LOG_HIERARCHY = DEFAULT_LOG_HIERARCHY_PREFIX + "queue.flow_to_disk_active";
public static final String UNDERFULL_LOG_HIERARCHY = DEFAULT_LOG_HIERARCHY_PREFIX + "queue.underfull";
public static final String DELETED_LOG_HIERARCHY = DEFAULT_LOG_HIERARCHY_PREFIX + "queue.deleted";
- public static final String FLOW_TO_DISK_INACTIVE_LOG_HIERARCHY = DEFAULT_LOG_HIERARCHY_PREFIX + "queue.flow_to_disk_inactive";
static
{
@@ -79,10 +77,8 @@ public class QueueMessages
LoggerFactory.getLogger(DROPPED_LOG_HIERARCHY);
LoggerFactory.getLogger(OVERFULL_LOG_HIERARCHY);
LoggerFactory.getLogger(OPERATION_LOG_HIERARCHY);
- LoggerFactory.getLogger(FLOW_TO_DISK_ACTIVE_LOG_HIERARCHY);
LoggerFactory.getLogger(UNDERFULL_LOG_HIERARCHY);
LoggerFactory.getLogger(DELETED_LOG_HIERARCHY);
- LoggerFactory.getLogger(FLOW_TO_DISK_INACTIVE_LOG_HIERARCHY);
_messages = ResourceBundle.getBundle("org.apache.qpid.server.logging.messages.Queue_logmessages", _currentLocale);
}
@@ -384,64 +380,6 @@ public class QueueMessages
/**
* Log a Queue message of the Format:
- * <pre>QUE-1014 : Message flow to disk active : Queue : depth {0,number,#.##} kB, threshold {1,number,#.##} kB / Broker : direct memory used {2,number,#.##} MB, threshold {3,number,#.##} MB</pre>
- * Optional values are contained in [square brackets] and are numbered
- * sequentially in the method call.
- *
- */
- public static LogMessage FLOW_TO_DISK_ACTIVE(Number param1, Number param2, Number param3, Number param4)
- {
- String rawMessage = _messages.getString("FLOW_TO_DISK_ACTIVE");
-
- final Object[] messageArguments = {param1, param2, param3, param4};
- // Create a new MessageFormat to ensure thread safety.
- // Sharing a MessageFormat and using applyPattern is not thread safe
- MessageFormat formatter = new MessageFormat(rawMessage, _currentLocale);
-
- final String message = formatter.format(messageArguments);
-
- return new LogMessage()
- {
- public String toString()
- {
- return message;
- }
-
- public String getLogHierarchy()
- {
- return FLOW_TO_DISK_ACTIVE_LOG_HIERARCHY;
- }
-
- @Override
- public boolean equals(final Object o)
- {
- if (this == o)
- {
- return true;
- }
- if (o == null || getClass() != o.getClass())
- {
- return false;
- }
-
- final LogMessage that = (LogMessage) o;
-
- return getLogHierarchy().equals(that.getLogHierarchy()) && toString().equals(that.toString());
-
- }
-
- @Override
- public int hashCode()
- {
- int result = toString().hashCode();
- result = 31 * result + getLogHierarchy().hashCode();
- return result;
- }
- };
- }
-
- /**
- * Log a Queue message of the Format:
* <pre>QUE-1004 : Underfull : Size : {0,number} bytes, Resume Capacity : {1,number}, Messages : {2,number}, Message Capacity : {3,number}</pre>
* Optional values are contained in [square brackets] and are numbered
* sequentially in the method call.
@@ -556,65 +494,6 @@ public class QueueMessages
};
}
- /**
- * Log a Queue message of the Format:
- * <pre>QUE-1015 : Message flow to disk inactive : Queue : depth {0,number,#.##} kB, threshold {1,number,#.##} kB / Broker : direct memory used {2,number,#.##} MB, threshold {3,number,#.##} MB</pre>
- * Optional values are contained in [square brackets] and are numbered
- * sequentially in the method call.
- *
- */
- public static LogMessage FLOW_TO_DISK_INACTIVE(Number param1, Number param2, Number param3, Number param4)
- {
- String rawMessage = _messages.getString("FLOW_TO_DISK_INACTIVE");
-
- final Object[] messageArguments = {param1, param2, param3, param4};
- // Create a new MessageFormat to ensure thread safety.
- // Sharing a MessageFormat and using applyPattern is not thread safe
- MessageFormat formatter = new MessageFormat(rawMessage, _currentLocale);
-
- final String message = formatter.format(messageArguments);
-
- return new LogMessage()
- {
- public String toString()
- {
- return message;
- }
-
- public String getLogHierarchy()
- {
- return FLOW_TO_DISK_INACTIVE_LOG_HIERARCHY;
- }
-
- @Override
- public boolean equals(final Object o)
- {
- if (this == o)
- {
- return true;
- }
- if (o == null || getClass() != o.getClass())
- {
- return false;
- }
-
- final LogMessage that = (LogMessage) o;
-
- return getLogHierarchy().equals(that.getLogHierarchy()) && toString().equals(that.toString());
-
- }
-
- @Override
- public int hashCode()
- {
- int result = toString().hashCode();
- result = 31 * result + getLogHierarchy().hashCode();
- return result;
- }
- };
- }
-
-
private QueueMessages()
{
}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/501aa891/broker-core/src/main/java/org/apache/qpid/server/logging/messages/Queue_logmessages.properties
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/logging/messages/Queue_logmessages.properties b/broker-core/src/main/java/org/apache/qpid/server/logging/messages/Queue_logmessages.properties
index 9061f0d..cd4ecdb 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/logging/messages/Queue_logmessages.properties
+++ b/broker-core/src/main/java/org/apache/qpid/server/logging/messages/Queue_logmessages.properties
@@ -27,9 +27,9 @@ OVERFULL = QUE-1003 : Overfull : Size : {0,number} bytes, Capacity : {1,number},
UNDERFULL = QUE-1004 : Underfull : Size : {0,number} bytes, Resume Capacity : {1,number}, Messages : {2,number}, Message Capacity : {3,number}
DROPPED = QUE-1005 : Dropped : {0,number} messages, Depth : {1,number} bytes, {2,number} messages, Capacity : {3,number} bytes, {4,number} messages
-# use similar number to the broker for similar topic
-FLOW_TO_DISK_ACTIVE = QUE-1014 : Message flow to disk active : Queue : depth {0,number,#.##} kB, threshold {1,number,#.##} kB / Broker : direct memory used {2,number,#.##} MB, threshold {3,number,#.##} MB
-FLOW_TO_DISK_INACTIVE = QUE-1015 : Message flow to disk inactive : Queue : depth {0,number,#.##} kB, threshold {1,number,#.##} kB / Broker : direct memory used {2,number,#.##} MB, threshold {3,number,#.##} MB
+# These are no longer in use
+#FLOW_TO_DISK_ACTIVE = QUE-1014 : Message flow to disk active : Queue : depth {0,number,#.##} kB, threshold {1,number,#.##} kB / Broker : direct memory used {2,number,#.##} MB, threshold {3,number,#.##} MB
+#FLOW_TO_DISK_INACTIVE = QUE-1015 : Message flow to disk inactive : Queue : depth {0,number,#.##} kB, threshold {1,number,#.##} kB / Broker : direct memory used {2,number,#.##} MB, threshold {3,number,#.##} MB
# 0 - operation name
OPERATION = QUE-1016 : Operation : {0}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/501aa891/broker-core/src/main/java/org/apache/qpid/server/message/AbstractServerMessageImpl.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/message/AbstractServerMessageImpl.java b/broker-core/src/main/java/org/apache/qpid/server/message/AbstractServerMessageImpl.java
index b60624a..978abc5 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/message/AbstractServerMessageImpl.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/message/AbstractServerMessageImpl.java
@@ -213,11 +213,11 @@ public abstract class AbstractServerMessageImpl<X extends AbstractServerMessageI
private final UUID _resourceId;
private volatile int _released;
- private Reference(final AbstractServerMessageImpl<X, T> message)
+ private Reference(final AbstractServerMessageImpl<X, T> message) throws MessageDeletedException
{
this(message, null);
}
- private Reference(final AbstractServerMessageImpl<X, T> message, TransactionLogResource resource)
+ private Reference(final AbstractServerMessageImpl<X, T> message, TransactionLogResource resource) throws MessageDeletedException
{
_message = message;
if(resource != null)
@@ -299,6 +299,11 @@ public abstract class AbstractServerMessageImpl<X extends AbstractServerMessageI
}
}
+ @Override
+ public void close()
+ {
+ release();
+ }
}
}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/501aa891/broker-core/src/main/java/org/apache/qpid/server/message/MessageReference.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/message/MessageReference.java b/broker-core/src/main/java/org/apache/qpid/server/message/MessageReference.java
index eda8550..dfe5d64 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/message/MessageReference.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/message/MessageReference.java
@@ -20,8 +20,9 @@
*/
package org.apache.qpid.server.message;
-public interface MessageReference<M extends ServerMessage>
+public interface MessageReference<M extends ServerMessage> extends AutoCloseable
{
- public M getMessage();
- public void release();
+ M getMessage();
+ void release();
+ void close();
}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/501aa891/broker-core/src/main/java/org/apache/qpid/server/model/Broker.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/model/Broker.java b/broker-core/src/main/java/org/apache/qpid/server/model/Broker.java
index 19e6de0..044f035 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/model/Broker.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/model/Broker.java
@@ -163,6 +163,9 @@ public interface Broker<X extends Broker<X>> extends ConfiguredObject<X>, EventL
@ManagedContextDefault( name = "broker.housekeepingThreadCount")
public static final int DEFAULT_HOUSEKEEPING_THREAD_COUNT = 2;
+ String QPID_BROKER_HOUSEKEEPING_CHECK_PERIOD = "qpid.broker.housekeepingCheckPeriod";
+ @ManagedContextDefault(name = QPID_BROKER_HOUSEKEEPING_CHECK_PERIOD)
+ long DEFAULT_BROKER_HOUSEKEEPING_CHECK_PERIOD = 30000L;
@ManagedAttribute( defaultValue = "${broker.housekeepingThreadCount}")
int getHousekeepingThreadCount();
@@ -377,6 +380,9 @@ public interface Broker<X extends Broker<X>> extends ConfiguredObject<X>, EventL
@DerivedAttribute(description = "Minimum fraction of direct memory buffer that can be occupied before the buffer is considered for compaction")
double getSparsityFraction();
+ @DerivedAttribute()
+ long getHousekeepingCheckPeriod();
+
@ManagedOperation(changesConfiguredObjectState = false, nonModifying = true,
description = "Force direct memory buffer compaction.")
void compactMemory();
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/501aa891/broker-core/src/main/java/org/apache/qpid/server/model/BrokerImpl.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/model/BrokerImpl.java b/broker-core/src/main/java/org/apache/qpid/server/model/BrokerImpl.java
index 0840b66..b9242dc 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/model/BrokerImpl.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/model/BrokerImpl.java
@@ -40,6 +40,7 @@ import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.regex.Pattern;
@@ -152,6 +153,8 @@ public class BrokerImpl extends AbstractContainer<BrokerImpl> implements Broker<
private long _flowToDiskThreshold;
private double _sparsityFraction;
private long _lastDisposalCounter;
+ private ScheduledFuture<?> _assignTargetSizeSchedulingFuture;
+ private long _housekeepingCheckPeriod;
@ManagedObjectFactoryConstructor
public BrokerImpl(Map<String, Object> attributes,
@@ -420,6 +423,9 @@ public class BrokerImpl extends AbstractContainer<BrokerImpl> implements Broker<
getSystemTaskSubject("Housekeeping", _principal));
scheduleDirectMemoryCheck();
+ _assignTargetSizeSchedulingFuture = scheduleHouseKeepingTask(getHousekeepingCheckPeriod(),
+ TimeUnit.MILLISECONDS,
+ this::assignTargetSizes);
final PreferenceStoreUpdaterImpl updater = new PreferenceStoreUpdaterImpl();
final Collection<PreferenceRecord> preferenceRecords = _preferenceStore.openAndLoad(updater);
@@ -624,6 +630,7 @@ public class BrokerImpl extends AbstractContainer<BrokerImpl> implements Broker<
_flowToDiskThreshold = getContextValue(Long.class, BROKER_FLOW_TO_DISK_THRESHOLD);
_compactMemoryThreshold = getContextValue(Long.class, Broker.COMPACT_MEMORY_THRESHOLD);
_compactMemoryInterval = getContextValue(Long.class, Broker.COMPACT_MEMORY_INTERVAL);
+ _housekeepingCheckPeriod = getContextValue(Long.class, Broker.QPID_BROKER_HOUSEKEEPING_CHECK_PERIOD);
if (SystemUtils.getProcessPid() != null)
{
@@ -732,6 +739,11 @@ public class BrokerImpl extends AbstractContainer<BrokerImpl> implements Broker<
_reportingTimer.cancel();
}
+ if (_assignTargetSizeSchedulingFuture != null)
+ {
+ _assignTargetSizeSchedulingFuture.cancel(true);
+ }
+
shutdownHouseKeeping();
stopPreferenceTaskExecutor();
@@ -1222,6 +1234,12 @@ public class BrokerImpl extends AbstractContainer<BrokerImpl> implements Broker<
}
@Override
+ public long getHousekeepingCheckPeriod()
+ {
+ return _housekeepingCheckPeriod;
+ }
+
+ @Override
public void compactMemory()
{
compactMemoryInternal();
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/501aa891/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java b/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java
index f2cbc25..7e154ed 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java
@@ -37,6 +37,7 @@ import org.apache.qpid.server.queue.BaseQueue;
import org.apache.qpid.server.queue.NotificationCheck;
import org.apache.qpid.server.queue.QueueConsumer;
import org.apache.qpid.server.queue.QueueEntry;
+import org.apache.qpid.server.queue.QueueEntryIterator;
import org.apache.qpid.server.queue.QueueEntryVisitor;
import org.apache.qpid.server.store.MessageDurability;
import org.apache.qpid.server.store.MessageEnqueueRecord;
@@ -471,8 +472,6 @@ public interface Queue<X extends Queue<X>> extends ConfiguredObject<X>,
void recover(ServerMessage<?> message, MessageEnqueueRecord enqueueRecord);
- void setTargetSize(long targetSize);
-
boolean isHeld(QueueEntry queueEntry, final long evaluationTime);
void checkCapacity();
@@ -480,4 +479,6 @@ public interface Queue<X extends Queue<X>> extends ConfiguredObject<X>,
void deleteEntry(QueueEntry entry);
QueueEntry getLeastSignificantOldestEntry();
+
+ QueueEntryIterator queueEntryIterator();
}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/501aa891/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java b/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
index 6deda67..31b77e7 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
@@ -142,7 +142,6 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>>
}
};
- private static final long INITIAL_TARGET_QUEUE_SIZE = 102400l;
private static final String UTF8 = StandardCharsets.UTF_8.name();
private static final Operation PUBLISH_ACTION = Operation.ACTION("publish");
@@ -156,8 +155,6 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>>
private volatile QueueConsumer<?,?> _exclusiveSubscriber;
- private final AtomicLong _targetQueueSize = new AtomicLong(INITIAL_TARGET_QUEUE_SIZE);
-
private final AtomicInteger _activeSubscriberCount = new AtomicInteger();
private final QueueStatistics _queueStatistics = new QueueStatistics();
@@ -210,7 +207,6 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>>
@ManagedAttributeField
private boolean _noLocal;
- private final FlowToDiskChecker _flowToDiskChecker = new FlowToDiskChecker();
private final CopyOnWriteArrayList<Binding> _bindings = new CopyOnWriteArrayList<>();
private Map<String, Object> _arguments;
@@ -1101,9 +1097,13 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>>
doEnqueue(message, action, enqueueRecord);
}
- long estimatedQueueSize = _queueStatistics.getQueueSizeIncludingHeader();
- _flowToDiskChecker.flowToDiskAndReportIfNecessary(message.getStoredMessage(), estimatedQueueSize,
- _targetQueueSize.get());
+ final StoredMessage storedMessage = message.getStoredMessage();
+ if ((_virtualHost.isOverTargetSize()
+ || QpidByteBuffer.getAllocatedDirectMemorySize() > _flowToDiskThreshold)
+ && storedMessage.isInMemory())
+ {
+ storedMessage.flowToDisk();
+ }
}
public final void recover(ServerMessage message, final MessageEnqueueRecord enqueueRecord)
@@ -1229,15 +1229,6 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>>
// Simple Queues don't :-)
}
- @Override
- public void setTargetSize(final long targetSize)
- {
- if (_targetQueueSize.compareAndSet(_targetQueueSize.get(), targetSize))
- {
- _logger.debug("Queue '{}' target size : {}", getName(), targetSize);
- }
- }
-
public long getTotalDequeuedMessages()
{
return _queueStatistics.getDequeueCount();
@@ -1460,6 +1451,12 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>>
}
+ @Override
+ public QueueEntryIterator queueEntryIterator()
+ {
+ return getEntries().iterator();
+ }
+
public int compareTo(final X o)
{
return getName().compareTo(o.getName());
@@ -2032,9 +2029,6 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>>
{
QueueEntryIterator queueListIterator = getEntries().iterator();
- final long estimatedQueueSize = _queueStatistics.getQueueSizeIncludingHeader();
- _flowToDiskChecker.reportFlowToDiskStatusIfNecessary(estimatedQueueSize, _targetQueueSize.get());
-
final Set<NotificationCheck> perMessageChecks = new HashSet<>();
final Set<NotificationCheck> queueLevelChecks = new HashSet<>();
@@ -2053,7 +2047,6 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>>
final long currentTime = System.currentTimeMillis();
final long thresholdTime = currentTime - getAlertRepeatGap();
- long cumulativeQueueSize = 0;
while (!_stopped.get() && queueListIterator.advance())
{
final QueueEntry node = queueListIterator.getNode();
@@ -2075,14 +2068,8 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>>
ServerMessage msg = node.getMessage();
if (msg != null)
{
- MessageReference messageReference = null;
- try
+ try (MessageReference messageReference = msg.newReference())
{
- messageReference = msg.newReference();
- cumulativeQueueSize += msg.getSizeIncludingHeader();
- _flowToDiskChecker.flowToDiskIfNecessary(msg.getStoredMessage(), cumulativeQueueSize,
- _targetQueueSize.get());
-
for(NotificationCheck check : perMessageChecks)
{
checkForNotification(msg, listener, currentTime, thresholdTime, check);
@@ -2092,13 +2079,6 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>>
{
// Ignore
}
- finally
- {
- if (messageReference != null)
- {
- messageReference.release();
- }
- }
}
}
}
@@ -3329,71 +3309,6 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>>
}
}
- private class FlowToDiskChecker
- {
- final AtomicBoolean _lastReportedFlowToDiskStatus = new AtomicBoolean(false);
-
- void flowToDiskIfNecessary(StoredMessage<?> storedMessage, long estimatedQueueSize, final long targetQueueSize)
- {
- if ((estimatedQueueSize > targetQueueSize
- || QpidByteBuffer.getAllocatedDirectMemorySize() > _flowToDiskThreshold)
- && storedMessage.isInMemory())
- {
- storedMessage.flowToDisk();
- }
- }
-
- void flowToDiskAndReportIfNecessary(StoredMessage<?> storedMessage,
- final long estimatedQueueSize,
- final long targetQueueSize)
- {
- flowToDiskIfNecessary(storedMessage, estimatedQueueSize, targetQueueSize);
- reportFlowToDiskStatusIfNecessary(estimatedQueueSize, targetQueueSize);
- }
-
- void reportFlowToDiskStatusIfNecessary(final long estimatedQueueSize, final long targetQueueSize)
- {
- final int allocatedDirectMemorySize = QpidByteBuffer.getAllocatedDirectMemorySize();
- if (estimatedQueueSize > targetQueueSize
- || allocatedDirectMemorySize > _flowToDiskThreshold)
- {
- reportFlowToDiskActiveIfNecessary(estimatedQueueSize, targetQueueSize, allocatedDirectMemorySize, _flowToDiskThreshold);
- }
- else
- {
- reportFlowToDiskInactiveIfNecessary(estimatedQueueSize, targetQueueSize, allocatedDirectMemorySize, _flowToDiskThreshold);
- }
- }
-
- private void reportFlowToDiskActiveIfNecessary(long estimatedQueueSize,
- long targetQueueSize,
- long allocatedDirectMemorySize,
- long flowToDiskThreshold)
- {
- if (!_lastReportedFlowToDiskStatus.getAndSet(true))
- {
- getEventLogger().message(_logSubject, QueueMessages.FLOW_TO_DISK_ACTIVE(estimatedQueueSize / 1024.0,
- targetQueueSize / 1024.0,
- allocatedDirectMemorySize / 1024.0 / 1024.0,
- flowToDiskThreshold / 1024.0 / 1024.0));
- }
- }
-
- private void reportFlowToDiskInactiveIfNecessary(long estimatedQueueSize,
- long targetQueueSize,
- long allocatedDirectMemorySize,
- long flowToDiskThreshold)
- {
- if (_lastReportedFlowToDiskStatus.getAndSet(false))
- {
- getEventLogger().message(_logSubject, QueueMessages.FLOW_TO_DISK_INACTIVE(estimatedQueueSize / 1024.0,
- targetQueueSize / 1024.0,
- allocatedDirectMemorySize / 1024.0 / 1024.0,
- flowToDiskThreshold / 1024.0 / 1024.0));
- }
- }
- }
-
private class AdvanceConsumersTask extends HouseKeepingTask
{
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/501aa891/broker-core/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java b/broker-core/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java
index b2fa18a..22020c0 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java
@@ -52,6 +52,7 @@ public class MemoryMessageStore implements MessageStore
private final Object _transactionLock = new Object();
private final Map<UUID, Set<Long>> _messageInstances = new HashMap<UUID, Set<Long>>();
private final Map<Xid, DistributedTransactionRecords> _distributedTransactions = new HashMap<Xid, DistributedTransactionRecords>();
+ private final AtomicLong _inMemorySize = new AtomicLong();
private final class MemoryMessageStoreTransaction implements Transaction
@@ -289,13 +290,23 @@ public class MemoryMessageStore implements MessageStore
{
@Override
+ public synchronized StoredMessage<T> allContentAdded()
+ {
+ final StoredMessage<T> storedMessage = super.allContentAdded();
+ _inMemorySize.addAndGet(getContentSize());
+ return storedMessage;
+ }
+
+ @Override
public void remove()
{
_messages.remove(getMessageNumber());
+ int bytesCleared = metaData.getStorableSize() + metaData.getContentSize();
super.remove();
+ _inMemorySize.addAndGet(-bytesCleared);
}
-
};
+ _inMemorySize.addAndGet(metaData.getStorableSize());
return storedMemoryMessage;
@@ -314,6 +325,18 @@ public class MemoryMessageStore implements MessageStore
}
@Override
+ public long getInMemorySize()
+ {
+ return _inMemorySize.get();
+ }
+
+ @Override
+ public long getBytesEvacuatedFromMemory()
+ {
+ return 0L;
+ }
+
+ @Override
public Transaction newTransaction()
{
return new MemoryMessageStoreTransaction();
@@ -323,6 +346,7 @@ public class MemoryMessageStore implements MessageStore
public void closeMessageStore()
{
_messages.clear();
+ _inMemorySize.set(0);
synchronized (_transactionLock)
{
_messageInstances.clear();
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/501aa891/broker-core/src/main/java/org/apache/qpid/server/store/MessageStore.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/store/MessageStore.java b/broker-core/src/main/java/org/apache/qpid/server/store/MessageStore.java
index ab5764e..b50dbb1 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/store/MessageStore.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/store/MessageStore.java
@@ -61,6 +61,10 @@ public interface MessageStore
<T extends StorableMessageMetaData> MessageHandle<T> addMessage(T metaData);
+ long getInMemorySize();
+
+ long getBytesEvacuatedFromMemory();
+
/**
* Is this store capable of persisting the data
*
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/501aa891/broker-core/src/main/java/org/apache/qpid/server/store/NullMessageStore.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/store/NullMessageStore.java b/broker-core/src/main/java/org/apache/qpid/server/store/NullMessageStore.java
index e543084..37c79e5 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/store/NullMessageStore.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/store/NullMessageStore.java
@@ -97,6 +97,18 @@ public abstract class NullMessageStore implements MessageStore, DurableConfigura
}
@Override
+ public long getInMemorySize()
+ {
+ return 0;
+ }
+
+ @Override
+ public long getBytesEvacuatedFromMemory()
+ {
+ return 0L;
+ }
+
+ @Override
public Transaction newTransaction()
{
return null;
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/501aa891/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java b/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
index 7c3ac37..67bee82 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
@@ -20,6 +20,7 @@
*/
package org.apache.qpid.server.virtualhost;
+import static com.google.common.collect.Iterators.cycle;
import static java.util.Collections.newSetFromMap;
import java.io.BufferedInputStream;
@@ -88,8 +89,10 @@ import org.apache.qpid.server.logging.messages.VirtualHostMessages;
import org.apache.qpid.server.logging.subjects.MessageStoreLogSubject;
import org.apache.qpid.server.message.AMQMessageHeader;
import org.apache.qpid.server.message.InstanceProperties;
+import org.apache.qpid.server.message.MessageDeletedException;
import org.apache.qpid.server.message.MessageDestination;
import org.apache.qpid.server.message.MessageNode;
+import org.apache.qpid.server.message.MessageReference;
import org.apache.qpid.server.message.MessageSource;
import org.apache.qpid.server.message.RoutingResult;
import org.apache.qpid.server.message.ServerMessage;
@@ -105,6 +108,7 @@ import org.apache.qpid.server.plugin.SystemNodeCreator;
import org.apache.qpid.server.pool.SuppressingInheritedAccessControlContextThreadFactory;
import org.apache.qpid.server.protocol.LinkModel;
import org.apache.qpid.server.queue.QueueEntry;
+import org.apache.qpid.server.queue.QueueEntryIterator;
import org.apache.qpid.server.security.AccessControl;
import org.apache.qpid.server.security.CompoundAccessControl;
import org.apache.qpid.server.security.Result;
@@ -308,8 +312,6 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte
_fileSystemSpaceCheckerJobContext = getSystemTaskControllerContext("FileSystemSpaceChecker["+getName()+"]", _principal);
_fileSystemSpaceChecker = new FileSystemSpaceChecker();
-
- addChangeListener(new TargetSizeAssigningListener());
}
private void updateAccessControl()
@@ -1165,6 +1167,7 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte
if (period > 0L)
{
scheduleHouseKeepingTask(period, new VirtualHostHouseKeepingTask());
+ scheduleHouseKeepingTask(period, new FlowToDiskCheckingTask());
}
}
@@ -1444,6 +1447,18 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte
}
@Override
+ public long getInMemoryMessageSize()
+ {
+ return _messageStore == null ? -1 : _messageStore.getInMemorySize();
+ }
+
+ @Override
+ public long getBytesEvacuatedFromMemory()
+ {
+ return _messageStore == null ? -1 : _messageStore.getBytesEvacuatedFromMemory();
+ }
+
+ @Override
public <T extends ConfiguredObject<?>> T getAttainedChildFromAddress(final Class<T> childClass,
final String address)
{
@@ -1738,6 +1753,12 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte
return null;
}
+ @Override
+ public boolean isOverTargetSize()
+ {
+ return getInMemoryMessageSize() > _targetSize.get();
+ }
+
private static class MessageHeaderImpl implements AMQMessageHeader
{
private final String _userName;
@@ -1856,46 +1877,84 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte
}
}
- private class TargetSizeAssigningListener extends AbstractConfigurationChangeListener
+ private class VirtualHostHouseKeepingTask extends HouseKeepingTask
{
- @Override
- public void childAdded(final ConfiguredObject<?> object, final ConfiguredObject<?> child)
+ public VirtualHostHouseKeepingTask()
{
- if (child instanceof Queue)
- {
- allocateTargetSizeToQueues();
- }
+ super("Housekeeping["+AbstractVirtualHost.this.getName()+"]",AbstractVirtualHost.this,_housekeepingJobContext);
}
- @Override
- public void childRemoved(final ConfiguredObject<?> object,
- final ConfiguredObject<?> child)
+ public void execute()
{
- if (child instanceof Queue)
+ for (Queue<?> q : getChildren(Queue.class))
{
- allocateTargetSizeToQueues();
+ if (q.getState() == State.ACTIVE)
+ {
+ _logger.debug("Checking message status for queue: {}", q.getName());
+ q.checkMessageStatus();
+ }
}
}
}
- private class VirtualHostHouseKeepingTask extends HouseKeepingTask
+ private class FlowToDiskCheckingTask extends HouseKeepingTask
{
- public VirtualHostHouseKeepingTask()
+ public FlowToDiskCheckingTask()
{
- super("Housekeeping["+AbstractVirtualHost.this.getName()+"]",AbstractVirtualHost.this,_housekeepingJobContext);
+ super("FlowToDiskChecking["+AbstractVirtualHost.this.getName()+"]", AbstractVirtualHost.this, _housekeepingJobContext);
}
+ @Override
public void execute()
{
- Broker<?> broker = getAncestor(Broker.class);
- broker.assignTargetSizes();
-
- for (Queue<?> q : getChildren(Queue.class))
+ if (isOverTargetSize())
{
- if (q.getState() == State.ACTIVE)
+ long currentTargetSize = _targetSize.get();
+ List<QueueEntryIterator> queueIterators = new ArrayList<>();
+ for (Queue<?> q : getChildren(Queue.class))
{
- _logger.debug("Checking message status for queue: {}", q.getName());
- q.checkMessageStatus();
+ queueIterators.add(q.queueEntryIterator());
+ }
+ Collections.shuffle(queueIterators);
+
+ long cumulativeSize = 0;
+ final Iterator<QueueEntryIterator> cyclicIterators = cycle(queueIterators);
+ while (cyclicIterators.hasNext())
+ {
+ final QueueEntryIterator queueIterator = cyclicIterators.next();
+ if (queueIterator.advance())
+ {
+ QueueEntry node = queueIterator.getNode();
+ if (node != null && !node.isDeleted())
+ {
+ try (MessageReference messageReference = node.getMessage().newReference())
+ {
+ final StoredMessage storedMessage = messageReference.getMessage().getStoredMessage();
+ if (storedMessage.isInMemory())
+ {
+ if (cumulativeSize <= currentTargetSize)
+ {
+ cumulativeSize += storedMessage.getContentSize();
+ cumulativeSize += storedMessage.getMetaData() == null
+ ? 0
+ : storedMessage.getMetaData().getStorableSize();
+ }
+ else
+ {
+ storedMessage.flowToDisk();
+ }
+ }
+ }
+ catch (MessageDeletedException e)
+ {
+ // pass
+ }
+ }
+ }
+ else
+ {
+ cyclicIterators.remove();
+ }
}
}
}
@@ -2396,7 +2455,6 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte
public void setTargetSize(final long targetSize)
{
_targetSize.set(targetSize);
- allocateTargetSizeToQueues();
}
public long getTargetSize()
@@ -2404,38 +2462,12 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte
return _targetSize.get();
}
- private void allocateTargetSizeToQueues()
- {
- long targetSize = _targetSize.get();
- Collection<Queue> queues = getChildren(Queue.class);
- long totalSize = calculateTotalEnqueuedSize(queues);
- _logger.debug("Allocating target size to queues, total target: {} ; total enqueued size {}", targetSize, totalSize);
- if (targetSize > 0l)
- {
- for (Queue<?> q : queues)
- {
- long size;
- if (totalSize == 0)
- {
- size = targetSize / queues.size();
- }
- else
- {
- size = (long) ((q.getQueueDepthBytesIncludingHeader() / (double) totalSize) * targetSize);
- }
-
- q.setTargetSize(size);
- }
- }
- }
-
@Override
public long getTotalQueueDepthBytes()
{
return calculateTotalEnqueuedSize(getChildren(Queue.class));
}
-
@Override
public Principal getPrincipal()
{
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/501aa891/broker-core/src/main/java/org/apache/qpid/server/virtualhost/QueueManagingVirtualHost.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/virtualhost/QueueManagingVirtualHost.java b/broker-core/src/main/java/org/apache/qpid/server/virtualhost/QueueManagingVirtualHost.java
index a47ab37..77289d0 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/virtualhost/QueueManagingVirtualHost.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/virtualhost/QueueManagingVirtualHost.java
@@ -206,6 +206,12 @@ public interface QueueManagingVirtualHost<X extends QueueManagingVirtualHost<X>>
@ManagedStatistic(statisticType = StatisticType.POINT_IN_TIME, units = StatisticUnit.BYTES, label = "Total Depth of Queues Including Header")
long getTotalDepthOfQueuesBytesIncludingHeader();
+ @ManagedStatistic(statisticType = StatisticType.POINT_IN_TIME, units = StatisticUnit.BYTES, label = "Total Memory Occupied by Message Headers and Content")
+ long getInMemoryMessageSize();
+
+ @ManagedStatistic(statisticType = StatisticType.CUMULATIVE, units = StatisticUnit.BYTES, label = "Total Number of Bytes Evacuated from Memory Due to Flow to Disk")
+ long getBytesEvacuatedFromMemory();
+
@Override
@ManagedOperation(nonModifying = true, changesConfiguredObjectState = false)
Collection<? extends Connection<?>> getConnections();
@@ -285,6 +291,8 @@ public interface QueueManagingVirtualHost<X extends QueueManagingVirtualHost<X>>
ListenableFuture<Void> reallocateMessages();
+ boolean isOverTargetSize();
+
interface Transaction
{
void dequeue(QueueEntry entry);
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/501aa891/broker-core/src/test/java/org/apache/qpid/server/store/TestMessageMetaDataType.java
----------------------------------------------------------------------
diff --git a/broker-core/src/test/java/org/apache/qpid/server/store/TestMessageMetaDataType.java b/broker-core/src/test/java/org/apache/qpid/server/store/TestMessageMetaDataType.java
index 3b6a509..f384a13 100644
--- a/broker-core/src/test/java/org/apache/qpid/server/store/TestMessageMetaDataType.java
+++ b/broker-core/src/test/java/org/apache/qpid/server/store/TestMessageMetaDataType.java
@@ -77,6 +77,12 @@ public class TestMessageMetaDataType implements MessageMetaDataType<TestMessageM
{
@Override
+ public void close()
+ {
+ release();
+ }
+
+ @Override
public ServerMessage getMessage()
{
return TestServerMessage.this;
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/501aa891/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/AbstractJDBCMessageStore.java
----------------------------------------------------------------------
diff --git a/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/AbstractJDBCMessageStore.java b/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/AbstractJDBCMessageStore.java
index 918e193..277810d 100644
--- a/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/AbstractJDBCMessageStore.java
+++ b/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/AbstractJDBCMessageStore.java
@@ -84,6 +84,8 @@ public abstract class AbstractJDBCMessageStore implements MessageStore
protected final EventManager _eventManager = new EventManager();
private ConfiguredObject<?> _parent;
private String _tablePrefix = "";
+ private final AtomicLong _inMemorySize = new AtomicLong();
+ private final AtomicLong _bytesEvacuatedFromMemory = new AtomicLong();
protected abstract boolean isMessageStoreOpen();
@@ -245,6 +247,8 @@ public abstract class AbstractJDBCMessageStore implements MessageStore
@Override
public void closeMessageStore()
{
+ _inMemorySize.set(0);
+ _bytesEvacuatedFromMemory.set(0);
if(_executor != null)
{
_executor.shutdown();
@@ -1124,6 +1128,17 @@ public abstract class AbstractJDBCMessageStore implements MessageStore
return true;
}
+ @Override
+ public long getInMemorySize()
+ {
+ return _inMemorySize.get();
+ }
+
+ @Override
+ public long getBytesEvacuatedFromMemory()
+ {
+ return _bytesEvacuatedFromMemory.get();
+ }
protected class JDBCTransaction implements Transaction
{
@@ -1408,10 +1423,12 @@ public abstract class AbstractJDBCMessageStore implements MessageStore
_data = data;
}
- public void clear()
+ public long clear()
{
+ long bytesCleared = 0;
if(_metaData != null)
{
+ bytesCleared += _metaData.getStorableSize();
_metaData.clearEncodedForm();
_metaData = null;
}
@@ -1419,10 +1436,12 @@ public abstract class AbstractJDBCMessageStore implements MessageStore
{
for(QpidByteBuffer buf : _data)
{
+ bytesCleared += buf.remaining();
buf.dispose();
}
+ _data = null;
}
- _data = null;
+ return bytesCleared;
}
@Override
@@ -1471,7 +1490,9 @@ public abstract class AbstractJDBCMessageStore implements MessageStore
{
_messageDataRef = new MessageDataSoftRef<>(metaData, null);
}
+
_contentSize = metaData.getContentSize();
+ _inMemorySize.addAndGet(metaData.getStorableSize());
}
@@ -1531,6 +1552,7 @@ public abstract class AbstractJDBCMessageStore implements MessageStore
@Override
public StoredMessage<T> allContentAdded()
{
+ _inMemorySize.addAndGet(getContentSize());
return this;
}
@@ -1668,14 +1690,17 @@ public abstract class AbstractJDBCMessageStore implements MessageStore
}
final T metaData;
+ long bytesCleared = 0;
if ((metaData = _messageDataRef.getMetaData()) != null)
{
+ bytesCleared += metaData.getStorableSize();
metaData.dispose();
}
Collection<QpidByteBuffer> data = _messageDataRef.getData();
if(data != null)
{
+ bytesCleared += getContentSize();
_messageDataRef.setData(null);
for(QpidByteBuffer buf : data)
{
@@ -1683,6 +1708,7 @@ public abstract class AbstractJDBCMessageStore implements MessageStore
}
}
_messageDataRef = null;
+ _inMemorySize.addAndGet(-bytesCleared);
}
@Override
@@ -1703,7 +1729,9 @@ public abstract class AbstractJDBCMessageStore implements MessageStore
flushToStore();
if(_messageDataRef != null && !_messageDataRef.isHardRef())
{
- ((MessageDataSoftRef)_messageDataRef).clear();
+ final long bytesCleared = ((MessageDataSoftRef) _messageDataRef).clear();
+ _inMemorySize.addAndGet(-bytesCleared);
+ _bytesEvacuatedFromMemory.addAndGet(bytesCleared);
}
return true;
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org