You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by or...@apache.org on 2021/08/22 19:23:27 UTC
[qpid-broker-j] branch main updated: QPID-8559: Add debug logging
for flow to disk conditions
This is an automated email from the ASF dual-hosted git repository.
orudyy pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/qpid-broker-j.git
The following commit(s) were added to refs/heads/main by this push:
new 94d5373 QPID-8559: Add debug logging for flow to disk conditions
94d5373 is described below
commit 94d5373a9d402827fd761e2b190fa3df435f95ed
Author: Alex Rudyy <or...@apache.org>
AuthorDate: Sun Aug 22 20:18:30 2021 +0100
QPID-8559: Add debug logging for flow to disk conditions
---
.../org/apache/qpid/server/model/BrokerImpl.java | 36 +++++++++++++
.../server/virtualhost/AbstractVirtualHost.java | 59 ++++++++++++++++++++++
.../runtime/Java-Broker-Runtime-Flow-To-Disk.xml | 50 ++++++++++++++++++
3 files changed, 145 insertions(+)
diff --git a/broker-core/src/main/java/org/apache/qpid/server/model/BrokerImpl.java b/broker-core/src/main/java/org/apache/qpid/server/model/BrokerImpl.java
index d2bd9d4..2d61b11 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/model/BrokerImpl.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/model/BrokerImpl.java
@@ -43,6 +43,7 @@ import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.regex.Pattern;
@@ -96,6 +97,7 @@ import org.apache.qpid.server.virtualhost.VirtualHostPropertiesNodeCreator;
public class BrokerImpl extends AbstractContainer<BrokerImpl> implements Broker<BrokerImpl>
{
private static final Logger LOGGER = LoggerFactory.getLogger(BrokerImpl.class);
+ private static final Logger DIRECT_MEMORY_USAGE_LOGGER = LoggerFactory.getLogger("org.apache.qpid.server.directMemory.broker");
private static final Pattern MODEL_VERSION_PATTERN = Pattern.compile("^\\d+\\.\\d+$");
@@ -146,6 +148,8 @@ public class BrokerImpl extends AbstractContainer<BrokerImpl> implements Broker<
private volatile ScheduledFuture<?> _statisticsReportingFuture;
private long _housekeepingCheckPeriod;
+ private final AtomicBoolean _directMemoryExceedsThresholdReported = new AtomicBoolean();
+
@ManagedObjectFactoryConstructor
public BrokerImpl(Map<String, Object> attributes,
SystemConfig parent)
@@ -571,6 +575,7 @@ public class BrokerImpl extends AbstractContainer<BrokerImpl> implements Broker<
@Override
public synchronized void assignTargetSizes()
{
+ reportDirectMemoryAboveThresholdIfExceeded();
LOGGER.debug("Assigning target sizes based on total target {}", _flowToDiskThreshold);
long totalSize = 0l;
Collection<VirtualHostNode<?>> vhns = getVirtualHostNodes();
@@ -609,6 +614,7 @@ public class BrokerImpl extends AbstractContainer<BrokerImpl> implements Broker<
}
entry.getKey().setTargetSize(size);
}
+ reportDirectMemoryBelowThresholdIfReached();
}
@Override
@@ -1352,4 +1358,34 @@ public class BrokerImpl extends AbstractContainer<BrokerImpl> implements Broker<
}
return Collections.<Principal>unmodifiableSet(currentSubject.getPrincipals(GroupPrincipal.class));
}
+
+ private void reportDirectMemoryBelowThresholdIfReached()
+ {
+ if (DIRECT_MEMORY_USAGE_LOGGER.isDebugEnabled())
+ {
+ final long allocatedDirectMemorySize = QpidByteBuffer.getAllocatedDirectMemorySize();
+ if (allocatedDirectMemorySize >= _flowToDiskThreshold
+ && _directMemoryExceedsThresholdReported.compareAndSet(true, false))
+ {
+ DIRECT_MEMORY_USAGE_LOGGER.debug("Direct memory threshold ({}) maintained : {}",
+ _flowToDiskThreshold,
+ allocatedDirectMemorySize);
+ }
+ }
+ }
+
+ private void reportDirectMemoryAboveThresholdIfExceeded()
+ {
+ if (DIRECT_MEMORY_USAGE_LOGGER.isDebugEnabled())
+ {
+ final long allocatedDirectMemorySize = QpidByteBuffer.getAllocatedDirectMemorySize();
+ if (allocatedDirectMemorySize > _flowToDiskThreshold
+ && _directMemoryExceedsThresholdReported.compareAndSet(false, true))
+ {
+ DIRECT_MEMORY_USAGE_LOGGER.debug("Direct memory threshold ({}) exceeded : {}",
+ _flowToDiskThreshold,
+ allocatedDirectMemorySize);
+ }
+ }
+ }
}
diff --git a/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java b/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
index ec0d776..0b1d1de 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
@@ -77,6 +77,7 @@ import com.google.common.util.concurrent.SettableFuture;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.qpid.server.bytebuffer.QpidByteBuffer;
import org.apache.qpid.server.configuration.IllegalConfigurationException;
import org.apache.qpid.server.configuration.updater.Task;
import org.apache.qpid.server.configuration.updater.TaskExecutor;
@@ -169,6 +170,7 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte
private static final Logger LOGGER = LoggerFactory.getLogger(AbstractVirtualHost.class);
+ private static final Logger DIRECT_MEMORY_USAGE_LOGGER = LoggerFactory.getLogger("org.apache.qpid.server.directMemory.virtualhost");
private static final int HOUSEKEEPING_SHUTDOWN_TIMEOUT = 5;
@@ -219,6 +221,8 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte
private volatile boolean _createDefaultExchanges;
+ private final AtomicBoolean _directMemoryExceedsTargetReported = new AtomicBoolean();
+
private final AccessControl _systemUserAllowed = new SubjectFixedResultAccessControl(new ResultCalculator()
{
@Override
@@ -1712,6 +1716,7 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte
_messagesOut.incrementAndGet();
_bytesOut.addAndGet(messageSize);
_broker.registerMessageDelivered(messageSize);
+ reportDirectMemoryBelowTargetIfReached();
}
@Override
@@ -1725,6 +1730,7 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte
{
_maximumMessageSize.compareAndSet(hwm, messageSize);
}
+ reportDirectMemoryAboveTargetIfExceeded();
}
@Override
@@ -2136,6 +2142,8 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte
if (isOverTargetSize())
{
long currentTargetSize = _targetSize.get();
+ reportDirectMemoryAboveTargetIfExceeded(currentTargetSize,
+ AbstractVirtualHost.this.getInMemoryMessageSize());
List<QueueEntryIterator> queueIterators = new ArrayList<>();
for (Queue<?> q : getChildren(Queue.class))
{
@@ -2181,6 +2189,8 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte
cyclicIterators.remove();
}
}
+ reportDirectMemoryBelowTargetIfReached(cumulativeSize,
+ AbstractVirtualHost.this.getInMemoryMessageSize());
}
}
}
@@ -2590,6 +2600,9 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte
public void setTargetSize(final long targetSize)
{
_targetSize.set(targetSize);
+ final long inMemoryMessageSize = getInMemoryMessageSize();
+ reportDirectMemoryAboveTargetIfExceeded(targetSize, inMemoryMessageSize);
+ reportDirectMemoryBelowTargetIfReached(targetSize, inMemoryMessageSize);
}
@Override
@@ -3423,4 +3436,50 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte
String.valueOf(outcome),
attributesAsString(attributes)));
}
+
+ private void reportDirectMemoryAboveTargetIfExceeded()
+ {
+ if (DIRECT_MEMORY_USAGE_LOGGER.isDebugEnabled())
+ {
+ reportDirectMemoryAboveTargetIfExceeded(getTargetSize(), getInMemoryMessageSize());
+ }
+ }
+
+ private void reportDirectMemoryBelowTargetIfReached()
+ {
+ if (DIRECT_MEMORY_USAGE_LOGGER.isDebugEnabled())
+ {
+ reportDirectMemoryBelowTargetIfReached(getTargetSize(), getInMemoryMessageSize());
+ }
+ }
+
+ private void reportDirectMemoryBelowTargetIfReached(final long currentTargetSize, final long inMemoryMessageSize)
+ {
+ if (DIRECT_MEMORY_USAGE_LOGGER.isDebugEnabled()
+ && inMemoryMessageSize <= currentTargetSize
+ && QpidByteBuffer.getAllocatedDirectMemorySize() <= _broker.getFlowToDiskThreshold()
+ && _directMemoryExceedsTargetReported.compareAndSet(true, false))
+ {
+ DIRECT_MEMORY_USAGE_LOGGER.debug(
+ "VirtualHost '{}' direct memory allocation threshold ({}) maintained : {} bytes. Flow to disk stopped.",
+ getName(),
+ currentTargetSize,
+ inMemoryMessageSize);
+ }
+ }
+
+ private void reportDirectMemoryAboveTargetIfExceeded(final long currentTargetSize, final long inMemoryMessageSize)
+ {
+ if (DIRECT_MEMORY_USAGE_LOGGER.isDebugEnabled()
+ && (inMemoryMessageSize > currentTargetSize
+ || QpidByteBuffer.getAllocatedDirectMemorySize() > _broker.getFlowToDiskThreshold())
+ && _directMemoryExceedsTargetReported.compareAndSet(false, true))
+ {
+ DIRECT_MEMORY_USAGE_LOGGER.debug(
+ "VirtualHost '{}' direct memory allocation threshold ({}) exceeded : {} bytes. Flow to disk enforced.",
+ getName(),
+ currentTargetSize,
+ inMemoryMessageSize);
+ }
+ }
}
diff --git a/doc/java-broker/src/docbkx/runtime/Java-Broker-Runtime-Flow-To-Disk.xml b/doc/java-broker/src/docbkx/runtime/Java-Broker-Runtime-Flow-To-Disk.xml
index 01dbf1a..09bdb77 100644
--- a/doc/java-broker/src/docbkx/runtime/Java-Broker-Runtime-Flow-To-Disk.xml
+++ b/doc/java-broker/src/docbkx/runtime/Java-Broker-Runtime-Flow-To-Disk.xml
@@ -39,4 +39,54 @@
<para>Flow to disk is configured by Broker context variable
<literal>broker.flowToDiskThreshold</literal>. It is expressed as a size in bytes and defaults
to 75% of the JVM maximum heap size.</para>
+ <section role="h2" xml:id="Java-Broker-Runtime-Flow-To-Disk-Monitoring">
+ <title>Flow to Disk Monitoring</title>
+ <para>A number of statistics attributes are available on the <literal>Broker</literal> to allow monitoring
+ of the amount of utilized direct memory by the enqueued messages.
+ </para>
+ <para>The total amount of allocated direct memory by the <literal>Broker</literal> can be determined by
+ checking <literal>Broker</literal> statistics <literal>usedDirectMemorySize</literal>. There is another
+ <literal>Broker</literal> level statistics <literal>directMemoryTotalCapacity</literal> to get the total amount
+ of allocated direct memory. Usually, the values reported by both statistics attributes
+ <literal>usedDirectMemorySize</literal> and <literal>directMemoryTotalCapacity</literal> are the same
+ or do not differ much.
+ </para>
+ <para>The direct memory consumed by the <literal>VirtualHost</literal> messages is reported as
+ <literal>VirtualHost</literal> statistics <literal>inMemoryMessageSize</literal>. The current value of
+ <literal>VirtualHost</literal> direct memory threshold is exposed with statistics attribute
+ <literal>inMemoryMessageThreshold</literal>. When the value of <literal>inMemoryMessageSize</literal> is
+ greater than <literal>inMemoryMessageThreshold</literal>, the flow to disk is triggered to bring the amount of
+ direct memory consumed by the <literal>VirtualHost</literal> messages in-line with the
+ <literal>inMemoryMessageThreshold</literal>.
+ </para>
+ </section>
+ <section role="h2" xml:id="Java-Broker-Runtime-Flow-To-Disk-Logging">
+ <title>Flow to Disk Logging</title>
+ <para>The <literal>Flow to Disk</literal> events are not reported as operational logs or
+ <literal>INFO</literal> logs due to quite frequent triggering of <literal>Flow to Disk</literal>
+ for messaging use cases requiring holding messages on the <literal>Broker</literal> side for some time.
+ As result, the <literal>Flow to Disk</literal> logs can quickly dominate the broker logs and cause unnecessary
+ disk consumption.
+ </para>
+ <para>Though, if required, the <literal>Flow to Disk</literal> DEBUG logs can be enabled by adding
+ the following logging rule into the corresponding <literal>Broker</literal> logger.
+ <example>
+ <title>Flow to Disk logging rule</title>
+ <programlisting>
+ {
+ "name" : "DirectMemory",
+ "type" : "NameAndLevel",
+ "level" : "DEBUG",
+ "loggerName" : "org.apache.qpid.server.directMemory.*"
+ }
+ </programlisting>
+ </example>
+ </para>
+ <para>Please note, that the logger <literal>org.apache.qpid.server.directMemory.broker</literal>
+ is used by the <literal>Broker</literal> to report conditions when direct memory utilization exceeds the pred-defined
+ <literal>Broker</literal> threshold, whilst the logger <literal>org.apache.qpid.server.directMemory.virtualhost</literal>
+ is used to report conditions when direct memory utilization by the <literal>VirtualHost</literal>
+ messages exceeds the current value of the <literal>VirtualHost</literal> threshold.
+ </para>
+ </section>
</section>
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org