You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by lq...@apache.org on 2015/10/21 16:22:43 UTC
svn commit: r1709850 - in
/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server:
logging/messages/QueueMessages.java
logging/messages/Queue_logmessages.properties queue/AbstractQueue.java
Author: lquack
Date: Wed Oct 21 14:22:43 2015
New Revision: 1709850
URL: http://svn.apache.org/viewvc?rev=1709850&view=rev
Log:
QPID-6792: [Java Broker] Report change in flow to disk state on enqueuing messages and housekeeping
Modified:
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/logging/messages/QueueMessages.java
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/logging/messages/Queue_logmessages.properties
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/logging/messages/QueueMessages.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/logging/messages/QueueMessages.java?rev=1709850&r1=1709849&r2=1709850&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/logging/messages/QueueMessages.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/logging/messages/QueueMessages.java Wed Oct 21 14:22:43 2015
@@ -49,6 +49,8 @@ public class QueueMessages
public static final String UNDERFULL_LOG_HIERARCHY = DEFAULT_LOG_HIERARCHY_PREFIX + "queue.underfull";
public static final String CREATED_LOG_HIERARCHY = DEFAULT_LOG_HIERARCHY_PREFIX + "queue.created";
public static final String DELETED_LOG_HIERARCHY = DEFAULT_LOG_HIERARCHY_PREFIX + "queue.deleted";
+ public static final String FLOW_TO_DISK_ACTIVE_LOG_HIERARCHY = DEFAULT_LOG_HIERARCHY_PREFIX + "queue.flow_to_disk_active";
+ public static final String FLOW_TO_DISK_INACTIVE_LOG_HIERARCHY = DEFAULT_LOG_HIERARCHY_PREFIX + "queue.flow_to_disk_inactive";
static
{
@@ -57,6 +59,8 @@ public class QueueMessages
LoggerFactory.getLogger(UNDERFULL_LOG_HIERARCHY);
LoggerFactory.getLogger(CREATED_LOG_HIERARCHY);
LoggerFactory.getLogger(DELETED_LOG_HIERARCHY);
+ LoggerFactory.getLogger(FLOW_TO_DISK_ACTIVE_LOG_HIERARCHY);
+ LoggerFactory.getLogger(FLOW_TO_DISK_INACTIVE_LOG_HIERARCHY);
_messages = ResourceBundle.getBundle("org.apache.qpid.server.logging.messages.Queue_logmessages", _currentLocale);
}
@@ -324,6 +328,122 @@ public class QueueMessages
}
@Override
+ public boolean equals(final Object o)
+ {
+ if (this == o)
+ {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass())
+ {
+ return false;
+ }
+
+ final LogMessage that = (LogMessage) o;
+
+ return getLogHierarchy().equals(that.getLogHierarchy()) && toString().equals(that.toString());
+
+ }
+
+ @Override
+ public int hashCode()
+ {
+ int result = toString().hashCode();
+ result = 31 * result + getLogHierarchy().hashCode();
+ return result;
+ }
+ };
+ }
+
+ /**
+ * Log a Queue message of the Format:
+ * <pre>QUE-1014 : Message flow to disk active : Message memory use {0,number,#} kB exceeds threshold {1,number,#.##} kB</pre>
+ * Optional values are contained in [square brackets] and are numbered
+ * sequentially in the method call.
+ *
+ */
+ public static LogMessage FLOW_TO_DISK_ACTIVE(Number param1, Number param2)
+ {
+ String rawMessage = _messages.getString("FLOW_TO_DISK_ACTIVE");
+
+ final Object[] messageArguments = {param1, param2};
+ // Create a new MessageFormat to ensure thread safety.
+ // Sharing a MessageFormat and using applyPattern is not thread safe
+ MessageFormat formatter = new MessageFormat(rawMessage, _currentLocale);
+
+ final String message = formatter.format(messageArguments);
+
+ return new LogMessage()
+ {
+ public String toString()
+ {
+ return message;
+ }
+
+ public String getLogHierarchy()
+ {
+ return FLOW_TO_DISK_ACTIVE_LOG_HIERARCHY;
+ }
+
+ @Override
+ public boolean equals(final Object o)
+ {
+ if (this == o)
+ {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass())
+ {
+ return false;
+ }
+
+ final LogMessage that = (LogMessage) o;
+
+ return getLogHierarchy().equals(that.getLogHierarchy()) && toString().equals(that.toString());
+
+ }
+
+ @Override
+ public int hashCode()
+ {
+ int result = toString().hashCode();
+ result = 31 * result + getLogHierarchy().hashCode();
+ return result;
+ }
+ };
+ }
+
+ /**
+ * Log a Queue message of the Format:
+ * <pre>QUE-1015 : Message flow to disk inactive : Message memory use {0,number,#} kB within threshold {1,number,#.##} kB</pre>
+ * Optional values are contained in [square brackets] and are numbered
+ * sequentially in the method call.
+ *
+ */
+ public static LogMessage FLOW_TO_DISK_INACTIVE(Number param1, Number param2)
+ {
+ String rawMessage = _messages.getString("FLOW_TO_DISK_INACTIVE");
+
+ final Object[] messageArguments = {param1, param2};
+ // Create a new MessageFormat to ensure thread safety.
+ // Sharing a MessageFormat and using applyPattern is not thread safe
+ MessageFormat formatter = new MessageFormat(rawMessage, _currentLocale);
+
+ final String message = formatter.format(messageArguments);
+
+ return new LogMessage()
+ {
+ public String toString()
+ {
+ return message;
+ }
+
+ public String getLogHierarchy()
+ {
+ return FLOW_TO_DISK_INACTIVE_LOG_HIERARCHY;
+ }
+
+ @Override
public boolean equals(final Object o)
{
if (this == o)
Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/logging/messages/Queue_logmessages.properties
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/logging/messages/Queue_logmessages.properties?rev=1709850&r1=1709849&r2=1709850&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/logging/messages/Queue_logmessages.properties (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/logging/messages/Queue_logmessages.properties Wed Oct 21 14:22:43 2015
@@ -24,3 +24,7 @@ CREATED = QUE-1001 : Create :[ Owner: {0
DELETED = QUE-1002 : Deleted
OVERFULL = QUE-1003 : Overfull : Size : {0,number} bytes, Capacity : {1,number}
UNDERFULL = QUE-1004 : Underfull : Size : {0,number} bytes, Resume Capacity : {1,number}
+
+# use similar number to the broker for similar topic
+FLOW_TO_DISK_ACTIVE = QUE-1014 : Message flow to disk active : Message memory use {0,number,#} kB exceeds threshold {1,number,#.##} kB
+FLOW_TO_DISK_INACTIVE = QUE-1015 : Message flow to disk inactive : Message memory use {0,number,#} kB within threshold {1,number,#.##} kB
Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java?rev=1709850&r1=1709849&r2=1709850&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java Wed Oct 21 14:22:43 2015
@@ -241,6 +241,7 @@ public abstract class AbstractQueue<X ex
private boolean _noLocal;
private final AtomicBoolean _overfull = new AtomicBoolean(false);
+ private final FlowToDiskChecker _flowToDiskChecker = new FlowToDiskChecker();
private final CopyOnWriteArrayList<BindingImpl> _bindings = new CopyOnWriteArrayList<BindingImpl>();
private Map<String, Object> _arguments;
@@ -1065,10 +1066,7 @@ public abstract class AbstractQueue<X ex
incrementQueueCount();
incrementQueueSize(message);
- if((_atomicQueueSize.get() + _atomicQueueCount.get()*1024l) > _targetQueueSize.get() && message.getStoredMessage().isInMemory())
- {
- message.getStoredMessage().flowToDisk();
- }
+ _flowToDiskChecker.flowToDiskAndReportIfNecessary(message.getStoredMessage());
_totalMessagesReceived.incrementAndGet();
@@ -2444,8 +2442,8 @@ public abstract class AbstractQueue<X ex
{
QueueEntryIterator queueListIterator = getEntries().iterator();
- long totalSize = getContextValue(Long.class, QUEUE_ESTIMATED_MESSAGE_MEMORY_OVERHEAD) * getQueueDepthMessages();
- long targetSize = _targetQueueSize.get();
+ long cumulativeQueueSize = getContextValue(Long.class, QUEUE_ESTIMATED_MESSAGE_MEMORY_OVERHEAD) * getQueueDepthMessages();
+ _flowToDiskChecker.reportFlowToDiskStatusIfNecessary();
while (queueListIterator.advance())
{
@@ -2483,18 +2481,13 @@ public abstract class AbstractQueue<X ex
if (msg != null)
{
- totalSize += msg.getSize();
- StoredMessage storedMessage = msg.getStoredMessage();
- if(totalSize > targetSize && storedMessage.isInMemory())
- {
- storedMessage.flowToDisk();
- }
+ cumulativeQueueSize += msg.getSize();
+ _flowToDiskChecker.flowToDiskIfNecessary(msg.getStoredMessage(), cumulativeQueueSize);
checkForNotification(msg);
}
}
}
}
-
}
@Override
@@ -3549,4 +3542,56 @@ public abstract class AbstractQueue<X ex
}
}
+ private class FlowToDiskChecker
+ {
+ AtomicBoolean _lastReportedFlowToDiskStatus = new AtomicBoolean(false);
+
+ void flowToDiskIfNecessary(StoredMessage<?> storedMessage, long estimatedQueueSize)
+ {
+ long targetQueueSize = _targetQueueSize.get();
+ if ((estimatedQueueSize > targetQueueSize) && storedMessage.isInMemory())
+ {
+ storedMessage.flowToDisk();
+ }
+ }
+
+ void flowToDiskAndReportIfNecessary(StoredMessage<?> storedMessage)
+ {
+ long estimatedQueueSize = _atomicQueueSize.get() + _atomicQueueCount.get() * getContextValue(Long.class, QUEUE_ESTIMATED_MESSAGE_MEMORY_OVERHEAD);
+ flowToDiskIfNecessary(storedMessage, estimatedQueueSize);
+ reportFlowToDiskStatusIfNecessary();
+ }
+
+ void reportFlowToDiskStatusIfNecessary()
+ {
+ long estimatedQueueSize = _atomicQueueSize.get() + _atomicQueueCount.get() * getContextValue(Long.class, QUEUE_ESTIMATED_MESSAGE_MEMORY_OVERHEAD);
+ long targetQueueSize = _targetQueueSize.get();
+ if (estimatedQueueSize > targetQueueSize)
+ {
+ reportFlowToDiskActiveIfNecessary(estimatedQueueSize, targetQueueSize);
+ }
+ else
+ {
+ reportFlowToDiskInactiveIfNecessary(estimatedQueueSize, targetQueueSize);
+ }
+ }
+
+ private void reportFlowToDiskActiveIfNecessary(long estimatedQueueSize, long targetQueueSize)
+ {
+ if (!_lastReportedFlowToDiskStatus.getAndSet(true))
+ {
+ getEventLogger().message(_logSubject, QueueMessages.FLOW_TO_DISK_ACTIVE(estimatedQueueSize,
+ targetQueueSize));
+ }
+ }
+
+ private void reportFlowToDiskInactiveIfNecessary(long estimatedQueueSize, long targetQueueSize)
+ {
+ if (_lastReportedFlowToDiskStatus.getAndSet(false))
+ {
+ getEventLogger().message(_logSubject, QueueMessages.FLOW_TO_DISK_INACTIVE(estimatedQueueSize,
+ targetQueueSize));
+ }
+ }
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org