You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by bh...@apache.org on 2006/10/17 13:10:42 UTC
svn commit: r464900 - in
/incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/queue:
AMQQueue.java ManagedQueue.java
Author: bhupendrab
Date: Tue Oct 17 04:10:39 2006
New Revision: 464900
URL: http://svn.apache.org/viewvc?view=rev&rev=464900
Log:
AMQQueueMBean - management notifications will be sent to the listening clients if the queue size increases the threshold value or a message with size higher than threshold value is received.
Modified:
incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/queue/AMQQueue.java
incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/queue/ManagedQueue.java
Modified: incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/queue/AMQQueue.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/queue/AMQQueue.java?view=diff&rev=464900&r1=464899&r2=464900
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/queue/AMQQueue.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/queue/AMQQueue.java Tue Oct 17 04:10:39 2006
@@ -95,19 +95,19 @@
private final AMQQueueMBean _managedObject;
/**
- * max allowed size of a single message.
+ * max allowed size of a single message(in KBytes).
*/
- private long _maxAllowedMessageSize = 0;
+ private long _maxAllowedMessageSize = 10000; // 10 MB
/**
* max allowed number of messages on a queue.
*/
- private Integer _maxAllowedMessageCount = 0;
+ private Integer _maxAllowedMessageCount = 10000;
/**
- * max allowed size in bytes for all the messages combined together in a queue.
+ * max allowed size in KBytes for all the messages combined together in a queue.
*/
- private long _queueDepth = 0;
+ private long _queueDepth = 10000000; // 10 GB
/**
* total messages received by the queue since startup.
@@ -266,26 +266,81 @@
return _queueDepth;
}
+ // Sets the queue depth, the max queue size
public void setQueueDepth(Long value)
{
_queueDepth = value;
}
+ // Returns the size of messages in the queue
+ public Long getQueueSize()
+ {
+ List<AMQMessage> list = _deliveryMgr.getMessages();
+ if (list.size() == 0)
+ return 0l;
+
+ long queueSize = 0;
+ for (AMQMessage message : list)
+ {
+ queueSize = queueSize + getMessageSize(message);
+ }
+ return new Long(Math.round(queueSize/100));
+ }
// Operations
- private void checkForNotification()
+ // calculates the size of an AMQMessage
+ private long getMessageSize(AMQMessage msg)
{
- if (getMessageCount() >= getMaximumMessageCount())
+ if (msg == null)
+ return 0l;
+
+ List<ContentBody> cBodies = msg.getContentBodies();
+ long messageSize = 0;
+ for (ContentBody body : cBodies)
{
- Notification n = new Notification(
+ if (body != null)
+ messageSize = messageSize + body.getSize();
+ }
+ return messageSize;
+ }
+
+ // Checks if there is any notification to be send to the listeners
+ private void checkForNotification(AMQMessage msg)
+ {
+ // Check for message count
+ Integer msgCount = getMessageCount();
+ if (msgCount >= getMaximumMessageCount())
+ {
+ notifyClients("MessageCount = " + msgCount + ", Queue has reached its size limit and is now full.");
+ }
+
+ // Check for received message size
+ long messageSize = getMessageSize(msg);
+ if (messageSize >= getMaximumMessageSize())
+ {
+ notifyClients("MessageSize = " + messageSize + ", Message size (MessageID="+ msg.getMessageId() +
+ ")is higher than the threshold value");
+ }
+
+ // Check for queue size in bytes
+ long queueSize = getQueueSize();
+ if (queueSize >= getQueueDepth())
+ {
+ notifyClients("QueueSize = " + queueSize + ", Queue size has reached the threshold value");
+ }
+ }
+
+ // Send the notification to the listeners
+ private void notifyClients(String notificationMsg)
+ {
+ Notification n = new Notification(
MonitorNotification.THRESHOLD_VALUE_EXCEEDED,
this,
++_notificationSequenceNumber,
System.currentTimeMillis(),
- "MessageCount = " + getMessageCount() + ", Queue has reached its size limit and is now full.");
+ notificationMsg);
_broadcaster.sendNotification(n);
- }
}
public void deleteMessageFromTop() throws JMException
@@ -634,10 +689,11 @@
private void process(AMQMessage msg) throws FailedDequeueException
{
_deliveryMgr.deliver(getName(), msg);
+ updateReceivedMessageCount(msg);
try
{
msg.checkDeliveredToConsumer();
- updateReceivedMessageCount();
+ updateReceivedMessageCount(msg);
}
catch(NoConsumersException e)
{
@@ -645,14 +701,13 @@
// from the queue:
dequeue(msg);
}
-
}
void dequeue(AMQMessage msg) throws FailedDequeueException
{
try
{
- msg.decrementReference();
+ msg.decrementReference();
msg.dequeue(this);
}
catch(AMQException e)
@@ -660,7 +715,7 @@
throw new FailedDequeueException(_name, e);
}
}
-
+
public void deliverAsync()
{
_deliveryMgr.processAsync(_asyncDelivery);
@@ -671,10 +726,10 @@
return _subscribers;
}
- protected void updateReceivedMessageCount()
+ protected void updateReceivedMessageCount(AMQMessage msg)
{
_totalMessagesReceived++;
- _managedObject.checkForNotification();
+ _managedObject.checkForNotification(msg);
}
public boolean equals(Object o)
Modified: incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/queue/ManagedQueue.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/queue/ManagedQueue.java?view=diff&rev=464900&r1=464899&r2=464900
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/queue/ManagedQueue.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/queue/ManagedQueue.java Tue Oct 17 04:10:39 2006
@@ -78,7 +78,7 @@
Integer getMessageCount() throws IOException;
/**
- * Returns the maximum size of a message (in bytes) allowed to be accepted by the
+ * Returns the maximum size of a message (in kbytes) allowed to be accepted by the
* ManagedQueue. This is useful in setting notifications or taking
* appropriate action, if the size of the message received is more than
* the allowed size.
@@ -89,14 +89,14 @@
Long getMaximumMessageSize() throws IOException;
/**
- * Sets the maximum size of the message (in bytes) that is allowed to be
+ * Sets the maximum size of the message (in kbytes) that is allowed to be
* accepted by the Queue.
- * @param bytes maximum size of message.
+ * @param size maximum size of message.
* @throws IOException
*/
@MBeanAttribute(name="MaximumMessageSize",
- description="Maximum size of a message in bytes allowed for this Queue")
- void setMaximumMessageSize(Long bytes) throws IOException;
+ description="Maximum size(KB) of a message allowed for this Queue")
+ void setMaximumMessageSize(Long size) throws IOException;
/**
* Returns the total number of subscribers to the queue.
@@ -142,6 +142,14 @@
void setMaximumMessageCount(Integer value) throws IOException;
/**
+ * Size of messages in the queue
+ * @return
+ * @throws IOException
+ */
+ @MBeanAttribute(name="QueueSize", description="Size of messages(KB) in the queue")
+ Long getQueueSize() throws IOException;
+
+ /**
* Tells the maximum size of all the messages combined together,
* that can be stored in the queue. This is useful for setting notifications
* or taking required action if the size of messages stored in the queue
@@ -158,7 +166,7 @@
* @throws IOException
*/
@MBeanAttribute(name="QueueDepth",
- description="The size of all the messages together, that can be stored in the queue")
+ description="The size(KB) of all the messages together, that can be stored in the queue")
void setQueueDepth(Long value) throws IOException;