You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by or...@apache.org on 2021/08/22 19:33:31 UTC

[qpid-broker-j] branch 8.0.x updated: QPID-8559: Add debug logging for flow to disk conditions

This is an automated email from the ASF dual-hosted git repository.

orudyy pushed a commit to branch 8.0.x
in repository https://gitbox.apache.org/repos/asf/qpid-broker-j.git


The following commit(s) were added to refs/heads/8.0.x by this push:
     new 40cf8a6  QPID-8559: Add debug logging for flow to disk conditions
40cf8a6 is described below

commit 40cf8a6b6bb16fd9d5da55b2b1e0046b762fe022
Author: Alex Rudyy <or...@apache.org>
AuthorDate: Sun Aug 22 20:18:30 2021 +0100

    QPID-8559: Add debug logging for flow to disk conditions
---
 .../org/apache/qpid/server/model/BrokerImpl.java   | 37 ++++++++++++++
 .../server/virtualhost/AbstractVirtualHost.java    | 59 ++++++++++++++++++++++
 .../runtime/Java-Broker-Runtime-Flow-To-Disk.xml   | 50 ++++++++++++++++++
 3 files changed, 146 insertions(+)

diff --git a/broker-core/src/main/java/org/apache/qpid/server/model/BrokerImpl.java b/broker-core/src/main/java/org/apache/qpid/server/model/BrokerImpl.java
index 7d0b77f..11e3868 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/model/BrokerImpl.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/model/BrokerImpl.java
@@ -43,6 +43,7 @@ import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.regex.Pattern;
 
@@ -96,6 +97,7 @@ import org.apache.qpid.server.virtualhost.VirtualHostPropertiesNodeCreator;
 public class BrokerImpl extends AbstractContainer<BrokerImpl> implements Broker<BrokerImpl>
 {
     private static final Logger LOGGER = LoggerFactory.getLogger(BrokerImpl.class);
+    private static final Logger DIRECT_MEMORY_USAGE_LOGGER = LoggerFactory.getLogger("org.apache.qpid.server.directMemory.broker");
 
     private static final Pattern MODEL_VERSION_PATTERN = Pattern.compile("^\\d+\\.\\d+$");
 
@@ -146,6 +148,8 @@ public class BrokerImpl extends AbstractContainer<BrokerImpl> implements Broker<
     private volatile ScheduledFuture<?> _statisticsReportingFuture;
     private long _housekeepingCheckPeriod;
 
+    private final AtomicBoolean _directMemoryExceedsThresholdReported = new AtomicBoolean();
+
     @ManagedObjectFactoryConstructor
     public BrokerImpl(Map<String, Object> attributes,
                       SystemConfig parent)
@@ -571,6 +575,7 @@ public class BrokerImpl extends AbstractContainer<BrokerImpl> implements Broker<
     @Override
     public synchronized void assignTargetSizes()
     {
+        reportDirectMemoryAboveThresholdIfExceeded();
         LOGGER.debug("Assigning target sizes based on total target {}", _flowToDiskThreshold);
         long totalSize = 0l;
         Collection<VirtualHostNode<?>> vhns = getVirtualHostNodes();
@@ -609,6 +614,7 @@ public class BrokerImpl extends AbstractContainer<BrokerImpl> implements Broker<
             }
             entry.getKey().setTargetSize(size);
         }
+        reportDirectMemoryBelowThresholdIfReached();
     }
 
     @Override
@@ -1344,4 +1350,35 @@ public class BrokerImpl extends AbstractContainer<BrokerImpl> implements Broker<
             }
         }
     }
+
+    private void reportDirectMemoryBelowThresholdIfReached()
+    {
+        if (DIRECT_MEMORY_USAGE_LOGGER.isDebugEnabled())
+        {
+            final long allocatedDirectMemorySize = QpidByteBuffer.getAllocatedDirectMemorySize();
+            if (allocatedDirectMemorySize >= _flowToDiskThreshold
+                && _directMemoryExceedsThresholdReported.compareAndSet(true, false))
+            {
+                DIRECT_MEMORY_USAGE_LOGGER.debug("Direct memory threshold ({}) maintained : {}",
+                                                 _flowToDiskThreshold,
+                                                 allocatedDirectMemorySize);
+            }
+        }
+    }
+
+    private void reportDirectMemoryAboveThresholdIfExceeded()
+    {
+        if (DIRECT_MEMORY_USAGE_LOGGER.isDebugEnabled())
+        {
+            final long allocatedDirectMemorySize = QpidByteBuffer.getAllocatedDirectMemorySize();
+            if (allocatedDirectMemorySize > _flowToDiskThreshold
+                && _directMemoryExceedsThresholdReported.compareAndSet(false, true))
+            {
+                DIRECT_MEMORY_USAGE_LOGGER.debug("Direct memory threshold ({}) exceeded : {}",
+                                                 _flowToDiskThreshold,
+                                                 allocatedDirectMemorySize);
+            }
+        }
+    }
+
 }
diff --git a/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java b/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
index 0950909..10782dd 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
@@ -78,6 +78,7 @@ import com.google.common.util.concurrent.SettableFuture;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.qpid.server.bytebuffer.QpidByteBuffer;
 import org.apache.qpid.server.configuration.IllegalConfigurationException;
 import org.apache.qpid.server.configuration.updater.Task;
 import org.apache.qpid.server.configuration.updater.TaskExecutor;
@@ -171,6 +172,7 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte
 
 
     private static final Logger LOGGER = LoggerFactory.getLogger(AbstractVirtualHost.class);
+    private static final Logger DIRECT_MEMORY_USAGE_LOGGER = LoggerFactory.getLogger("org.apache.qpid.server.directMemory.virtualhost");
 
     private static final int HOUSEKEEPING_SHUTDOWN_TIMEOUT = 5;
 
@@ -221,6 +223,8 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte
 
     private volatile boolean _createDefaultExchanges;
 
+    private final AtomicBoolean _directMemoryExceedsTargetReported = new AtomicBoolean();
+
     private final AccessControl _systemUserAllowed = new SubjectFixedResultAccessControl(new ResultCalculator()
     {
         @Override
@@ -1695,6 +1699,7 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte
         _messagesOut.incrementAndGet();
         _bytesOut.addAndGet(messageSize);
         _broker.registerMessageDelivered(messageSize);
+        reportDirectMemoryBelowTargetIfReached();
     }
 
     @Override
@@ -1708,6 +1713,7 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte
         {
             _maximumMessageSize.compareAndSet(hwm, messageSize);
         }
+        reportDirectMemoryAboveTargetIfExceeded();
     }
 
     @Override
@@ -2119,6 +2125,8 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte
             if (isOverTargetSize())
             {
                 long currentTargetSize = _targetSize.get();
+                reportDirectMemoryAboveTargetIfExceeded(currentTargetSize,
+                                                        AbstractVirtualHost.this.getInMemoryMessageSize());
                 List<QueueEntryIterator> queueIterators = new ArrayList<>();
                 for (Queue<?> q : getChildren(Queue.class))
                 {
@@ -2164,6 +2172,8 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte
                         cyclicIterators.remove();
                     }
                 }
+                reportDirectMemoryBelowTargetIfReached(cumulativeSize,
+                                                       AbstractVirtualHost.this.getInMemoryMessageSize());
             }
         }
     }
@@ -2573,6 +2583,9 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte
     public void setTargetSize(final long targetSize)
     {
         _targetSize.set(targetSize);
+        final long inMemoryMessageSize = getInMemoryMessageSize();
+        reportDirectMemoryAboveTargetIfExceeded(targetSize, inMemoryMessageSize);
+        reportDirectMemoryBelowTargetIfReached(targetSize, inMemoryMessageSize);
     }
 
     @Override
@@ -3448,4 +3461,50 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte
                                                         String.valueOf(outcome),
                                                         attributesAsString(attributes)));
     }
+
+    private void reportDirectMemoryAboveTargetIfExceeded()
+    {
+        if (DIRECT_MEMORY_USAGE_LOGGER.isDebugEnabled())
+        {
+            reportDirectMemoryAboveTargetIfExceeded(getTargetSize(), getInMemoryMessageSize());
+        }
+    }
+
+    private void reportDirectMemoryBelowTargetIfReached()
+    {
+        if (DIRECT_MEMORY_USAGE_LOGGER.isDebugEnabled())
+        {
+            reportDirectMemoryBelowTargetIfReached(getTargetSize(), getInMemoryMessageSize());
+        }
+    }
+
+    private void reportDirectMemoryBelowTargetIfReached(final long currentTargetSize, final long inMemoryMessageSize)
+    {
+        if (DIRECT_MEMORY_USAGE_LOGGER.isDebugEnabled()
+            && inMemoryMessageSize <= currentTargetSize
+            && QpidByteBuffer.getAllocatedDirectMemorySize() <= _broker.getFlowToDiskThreshold()
+            && _directMemoryExceedsTargetReported.compareAndSet(true, false))
+        {
+            DIRECT_MEMORY_USAGE_LOGGER.debug(
+                    "VirtualHost '{}' direct memory allocation threshold ({}) maintained : {} bytes. Flow to disk stopped.",
+                    getName(),
+                    currentTargetSize,
+                    inMemoryMessageSize);
+        }
+    }
+
+    private void reportDirectMemoryAboveTargetIfExceeded(final long currentTargetSize, final long inMemoryMessageSize)
+    {
+        if (DIRECT_MEMORY_USAGE_LOGGER.isDebugEnabled()
+            && (inMemoryMessageSize > currentTargetSize
+                || QpidByteBuffer.getAllocatedDirectMemorySize() > _broker.getFlowToDiskThreshold())
+            && _directMemoryExceedsTargetReported.compareAndSet(false, true))
+        {
+            DIRECT_MEMORY_USAGE_LOGGER.debug(
+                    "VirtualHost '{}' direct memory allocation threshold ({}) exceeded : {} bytes. Flow to disk enforced.",
+                    getName(),
+                    currentTargetSize,
+                    inMemoryMessageSize);
+        }
+    }
 }
diff --git a/doc/java-broker/src/docbkx/runtime/Java-Broker-Runtime-Flow-To-Disk.xml b/doc/java-broker/src/docbkx/runtime/Java-Broker-Runtime-Flow-To-Disk.xml
index 01dbf1a..09bdb77 100644
--- a/doc/java-broker/src/docbkx/runtime/Java-Broker-Runtime-Flow-To-Disk.xml
+++ b/doc/java-broker/src/docbkx/runtime/Java-Broker-Runtime-Flow-To-Disk.xml
@@ -39,4 +39,54 @@
   <para>Flow to disk is configured by Broker context variable
       <literal>broker.flowToDiskThreshold</literal>. It is expressed as a size in bytes and defaults
     to 75% of the JVM maximum heap size.</para>
+  <section role="h2" xml:id="Java-Broker-Runtime-Flow-To-Disk-Monitoring">
+    <title>Flow to Disk Monitoring</title>
+    <para>A number of statistics attributes are available on the <literal>Broker</literal> to allow monitoring
+      of the amount of utilized direct memory by the enqueued messages.
+    </para>
+    <para>The total amount of allocated direct memory by the <literal>Broker</literal> can be determined by
+      checking <literal>Broker</literal> statistics <literal>usedDirectMemorySize</literal>. There is another
+      <literal>Broker</literal> level statistics <literal>directMemoryTotalCapacity</literal> to get the total amount
+      of allocated direct memory. Usually, the values reported by both statistics attributes
+      <literal>usedDirectMemorySize</literal> and <literal>directMemoryTotalCapacity</literal> are the same
+      or do not differ much.
+    </para>
+    <para>The direct memory consumed by the <literal>VirtualHost</literal> messages is reported as
+      <literal>VirtualHost</literal> statistics <literal>inMemoryMessageSize</literal>. The current value of
+      <literal>VirtualHost</literal> direct memory threshold is exposed with statistics attribute
+      <literal>inMemoryMessageThreshold</literal>. When the value of <literal>inMemoryMessageSize</literal> is
+      greater than <literal>inMemoryMessageThreshold</literal>, the flow to disk is triggered to bring the amount of
+      direct memory consumed by the <literal>VirtualHost</literal> messages in-line with the
+      <literal>inMemoryMessageThreshold</literal>.
+    </para>
+  </section>
+  <section role="h2" xml:id="Java-Broker-Runtime-Flow-To-Disk-Logging">
+    <title>Flow to Disk Logging</title>
+    <para>The <literal>Flow to Disk</literal> events are not reported as operational logs or
+      <literal>INFO</literal> logs due to quite frequent triggering of <literal>Flow to Disk</literal>
+      for messaging use cases requiring holding messages on the <literal>Broker</literal> side for some time.
+      As result, the <literal>Flow to Disk</literal> logs can quickly dominate the broker logs and cause unnecessary
+      disk consumption.
+    </para>
+    <para>Though, if required, the <literal>Flow to Disk</literal> DEBUG logs can be enabled by adding
+      the following logging rule into the corresponding <literal>Broker</literal> logger.
+      <example>
+        <title>Flow to Disk logging rule</title>
+        <programlisting>
+          {
+            "name" : "DirectMemory",
+            "type" : "NameAndLevel",
+            "level" : "DEBUG",
+            "loggerName" : "org.apache.qpid.server.directMemory.*"
+          }
+        </programlisting>
+      </example>
+    </para>
+    <para>Please note, that the logger <literal>org.apache.qpid.server.directMemory.broker</literal>
+      is used by the <literal>Broker</literal> to report conditions when direct memory utilization exceeds the pred-defined
+      <literal>Broker</literal> threshold, whilst the logger <literal>org.apache.qpid.server.directMemory.virtualhost</literal>
+      is used to report conditions when direct memory utilization by the <literal>VirtualHost</literal>
+      messages exceeds the current value of the <literal>VirtualHost</literal> threshold.
+    </para>
+  </section>
 </section>

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org