You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by lq...@apache.org on 2015/10/21 16:22:43 UTC

svn commit: r1709850 - in /qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server: logging/messages/QueueMessages.java logging/messages/Queue_logmessages.properties queue/AbstractQueue.java

Author: lquack
Date: Wed Oct 21 14:22:43 2015
New Revision: 1709850

URL: http://svn.apache.org/viewvc?rev=1709850&view=rev
Log:
QPID-6792: [Java Broker] Report change in flow to disk state on enqueuing messages and housekeeping

Modified:
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/logging/messages/QueueMessages.java
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/logging/messages/Queue_logmessages.properties
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java

Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/logging/messages/QueueMessages.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/logging/messages/QueueMessages.java?rev=1709850&r1=1709849&r2=1709850&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/logging/messages/QueueMessages.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/logging/messages/QueueMessages.java Wed Oct 21 14:22:43 2015
@@ -49,6 +49,8 @@ public class QueueMessages
     public static final String UNDERFULL_LOG_HIERARCHY = DEFAULT_LOG_HIERARCHY_PREFIX + "queue.underfull";
     public static final String CREATED_LOG_HIERARCHY = DEFAULT_LOG_HIERARCHY_PREFIX + "queue.created";
     public static final String DELETED_LOG_HIERARCHY = DEFAULT_LOG_HIERARCHY_PREFIX + "queue.deleted";
+    public static final String FLOW_TO_DISK_ACTIVE_LOG_HIERARCHY = DEFAULT_LOG_HIERARCHY_PREFIX + "queue.flow_to_disk_active";
+    public static final String FLOW_TO_DISK_INACTIVE_LOG_HIERARCHY = DEFAULT_LOG_HIERARCHY_PREFIX + "queue.flow_to_disk_inactive";
 
     static
     {
@@ -57,6 +59,8 @@ public class QueueMessages
         LoggerFactory.getLogger(UNDERFULL_LOG_HIERARCHY);
         LoggerFactory.getLogger(CREATED_LOG_HIERARCHY);
         LoggerFactory.getLogger(DELETED_LOG_HIERARCHY);
+        LoggerFactory.getLogger(FLOW_TO_DISK_ACTIVE_LOG_HIERARCHY);
+        LoggerFactory.getLogger(FLOW_TO_DISK_INACTIVE_LOG_HIERARCHY);
 
         _messages = ResourceBundle.getBundle("org.apache.qpid.server.logging.messages.Queue_logmessages", _currentLocale);
     }
@@ -324,6 +328,122 @@ public class QueueMessages
             }
 
             @Override
+            public boolean equals(final Object o)
+            {
+                if (this == o)
+                {
+                    return true;
+                }
+                if (o == null || getClass() != o.getClass())
+                {
+                    return false;
+                }
+
+                final LogMessage that = (LogMessage) o;
+
+                return getLogHierarchy().equals(that.getLogHierarchy()) && toString().equals(that.toString());
+
+            }
+
+            @Override
+            public int hashCode()
+            {
+                int result = toString().hashCode();
+                result = 31 * result + getLogHierarchy().hashCode();
+                return result;
+            }
+        };
+    }
+
+    /**
+     * Log a Queue message of the Format:
+     * <pre>QUE-1014 : Message flow to disk active :  Message memory use {0,number,#} kB exceeds threshold {1,number,#.##} kB</pre>
+     * Optional values are contained in [square brackets] and are numbered
+     * sequentially in the method call.
+     *
+     */
+    public static LogMessage FLOW_TO_DISK_ACTIVE(Number param1, Number param2)
+    {
+        String rawMessage = _messages.getString("FLOW_TO_DISK_ACTIVE");
+
+        final Object[] messageArguments = {param1, param2};
+        // Create a new MessageFormat to ensure thread safety.
+        // Sharing a MessageFormat and using applyPattern is not thread safe
+        MessageFormat formatter = new MessageFormat(rawMessage, _currentLocale);
+
+        final String message = formatter.format(messageArguments);
+
+        return new LogMessage()
+        {
+            public String toString()
+            {
+                return message;
+            }
+
+            public String getLogHierarchy()
+            {
+                return FLOW_TO_DISK_ACTIVE_LOG_HIERARCHY;
+            }
+
+            @Override
+            public boolean equals(final Object o)
+            {
+                if (this == o)
+                {
+                    return true;
+                }
+                if (o == null || getClass() != o.getClass())
+                {
+                    return false;
+                }
+
+                final LogMessage that = (LogMessage) o;
+
+                return getLogHierarchy().equals(that.getLogHierarchy()) && toString().equals(that.toString());
+
+            }
+
+            @Override
+            public int hashCode()
+            {
+                int result = toString().hashCode();
+                result = 31 * result + getLogHierarchy().hashCode();
+                return result;
+            }
+        };
+    }
+
+    /**
+     * Log a Queue message of the Format:
+     * <pre>QUE-1015 : Message flow to disk inactive : Message memory use {0,number,#} kB within threshold {1,number,#.##} kB</pre>
+     * Optional values are contained in [square brackets] and are numbered
+     * sequentially in the method call.
+     *
+     */
+    public static LogMessage FLOW_TO_DISK_INACTIVE(Number param1, Number param2)
+    {
+        String rawMessage = _messages.getString("FLOW_TO_DISK_INACTIVE");
+
+        final Object[] messageArguments = {param1, param2};
+        // Create a new MessageFormat to ensure thread safety.
+        // Sharing a MessageFormat and using applyPattern is not thread safe
+        MessageFormat formatter = new MessageFormat(rawMessage, _currentLocale);
+
+        final String message = formatter.format(messageArguments);
+
+        return new LogMessage()
+        {
+            public String toString()
+            {
+                return message;
+            }
+
+            public String getLogHierarchy()
+            {
+                return FLOW_TO_DISK_INACTIVE_LOG_HIERARCHY;
+            }
+
+            @Override
             public boolean equals(final Object o)
             {
                 if (this == o)

Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/logging/messages/Queue_logmessages.properties
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/logging/messages/Queue_logmessages.properties?rev=1709850&r1=1709849&r2=1709850&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/logging/messages/Queue_logmessages.properties (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/logging/messages/Queue_logmessages.properties Wed Oct 21 14:22:43 2015
@@ -24,3 +24,7 @@ CREATED = QUE-1001 : Create :[ Owner: {0
 DELETED = QUE-1002 : Deleted
 OVERFULL = QUE-1003 : Overfull : Size : {0,number} bytes, Capacity : {1,number}
 UNDERFULL = QUE-1004 : Underfull : Size : {0,number} bytes, Resume Capacity : {1,number}
+
+# use similar number to the broker for similar topic
+FLOW_TO_DISK_ACTIVE = QUE-1014 : Message flow to disk active :  Message memory use {0,number,#} kB exceeds threshold {1,number,#.##} kB
+FLOW_TO_DISK_INACTIVE = QUE-1015 : Message flow to disk inactive : Message memory use {0,number,#} kB within threshold {1,number,#.##} kB

Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java?rev=1709850&r1=1709849&r2=1709850&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java Wed Oct 21 14:22:43 2015
@@ -241,6 +241,7 @@ public abstract class AbstractQueue<X ex
     private boolean _noLocal;
 
     private final AtomicBoolean _overfull = new AtomicBoolean(false);
+    private final FlowToDiskChecker _flowToDiskChecker = new FlowToDiskChecker();
     private final CopyOnWriteArrayList<BindingImpl> _bindings = new CopyOnWriteArrayList<BindingImpl>();
     private Map<String, Object> _arguments;
 
@@ -1065,10 +1066,7 @@ public abstract class AbstractQueue<X ex
         incrementQueueCount();
         incrementQueueSize(message);
 
-        if((_atomicQueueSize.get() + _atomicQueueCount.get()*1024l) > _targetQueueSize.get() && message.getStoredMessage().isInMemory())
-        {
-            message.getStoredMessage().flowToDisk();
-        }
+        _flowToDiskChecker.flowToDiskAndReportIfNecessary(message.getStoredMessage());
 
         _totalMessagesReceived.incrementAndGet();
 
@@ -2444,8 +2442,8 @@ public abstract class AbstractQueue<X ex
     {
         QueueEntryIterator queueListIterator = getEntries().iterator();
 
-        long totalSize = getContextValue(Long.class, QUEUE_ESTIMATED_MESSAGE_MEMORY_OVERHEAD) * getQueueDepthMessages();
-        long targetSize = _targetQueueSize.get();
+        long cumulativeQueueSize = getContextValue(Long.class, QUEUE_ESTIMATED_MESSAGE_MEMORY_OVERHEAD) * getQueueDepthMessages();
+        _flowToDiskChecker.reportFlowToDiskStatusIfNecessary();
 
         while (queueListIterator.advance())
         {
@@ -2483,18 +2481,13 @@ public abstract class AbstractQueue<X ex
 
                     if (msg != null)
                     {
-                        totalSize += msg.getSize();
-                        StoredMessage storedMessage = msg.getStoredMessage();
-                        if(totalSize > targetSize && storedMessage.isInMemory())
-                        {
-                            storedMessage.flowToDisk();
-                        }
+                        cumulativeQueueSize += msg.getSize();
+                        _flowToDiskChecker.flowToDiskIfNecessary(msg.getStoredMessage(), cumulativeQueueSize);
                         checkForNotification(msg);
                     }
                 }
             }
         }
-
     }
 
     @Override
@@ -3549,4 +3542,56 @@ public abstract class AbstractQueue<X ex
         }
     }
 
+    private class FlowToDiskChecker
+    {
+        AtomicBoolean _lastReportedFlowToDiskStatus = new AtomicBoolean(false);
+
+        void flowToDiskIfNecessary(StoredMessage<?> storedMessage, long estimatedQueueSize)
+        {
+            long targetQueueSize = _targetQueueSize.get();
+            if ((estimatedQueueSize > targetQueueSize) && storedMessage.isInMemory())
+            {
+                storedMessage.flowToDisk();
+            }
+        }
+
+        void flowToDiskAndReportIfNecessary(StoredMessage<?> storedMessage)
+        {
+            long estimatedQueueSize = _atomicQueueSize.get() + _atomicQueueCount.get() * getContextValue(Long.class, QUEUE_ESTIMATED_MESSAGE_MEMORY_OVERHEAD);
+            flowToDiskIfNecessary(storedMessage, estimatedQueueSize);
+            reportFlowToDiskStatusIfNecessary();
+        }
+
+        void reportFlowToDiskStatusIfNecessary()
+        {
+            long estimatedQueueSize = _atomicQueueSize.get() + _atomicQueueCount.get() * getContextValue(Long.class, QUEUE_ESTIMATED_MESSAGE_MEMORY_OVERHEAD);
+            long targetQueueSize = _targetQueueSize.get();
+            if (estimatedQueueSize > targetQueueSize)
+            {
+                reportFlowToDiskActiveIfNecessary(estimatedQueueSize, targetQueueSize);
+            }
+            else
+            {
+                reportFlowToDiskInactiveIfNecessary(estimatedQueueSize, targetQueueSize);
+            }
+        }
+
+        private void reportFlowToDiskActiveIfNecessary(long estimatedQueueSize, long targetQueueSize)
+        {
+            if (!_lastReportedFlowToDiskStatus.getAndSet(true))
+            {
+                getEventLogger().message(_logSubject, QueueMessages.FLOW_TO_DISK_ACTIVE(estimatedQueueSize,
+                                                                                        targetQueueSize));
+            }
+        }
+
+        private void reportFlowToDiskInactiveIfNecessary(long estimatedQueueSize, long targetQueueSize)
+        {
+            if (_lastReportedFlowToDiskStatus.getAndSet(false))
+            {
+                getEventLogger().message(_logSubject, QueueMessages.FLOW_TO_DISK_INACTIVE(estimatedQueueSize,
+                                                                                          targetQueueSize));
+            }
+        }
+    }
 }



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org