You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by bh...@apache.org on 2006/10/17 13:10:42 UTC

svn commit: r464900 - in /incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/queue: AMQQueue.java ManagedQueue.java

Author: bhupendrab
Date: Tue Oct 17 04:10:39 2006
New Revision: 464900

URL: http://svn.apache.org/viewvc?view=rev&rev=464900
Log:
AMQQueueMBean - management notifications will be sent to the listening clients if the queue size increases the threshold value or a message with size higher than threshold value is received.

Modified:
    incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/queue/AMQQueue.java
    incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/queue/ManagedQueue.java

Modified: incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/queue/AMQQueue.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/queue/AMQQueue.java?view=diff&rev=464900&r1=464899&r2=464900
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/queue/AMQQueue.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/queue/AMQQueue.java Tue Oct 17 04:10:39 2006
@@ -95,19 +95,19 @@
     private final AMQQueueMBean _managedObject;
 
     /**
-     * max allowed size of a single message.
+     * max allowed size of a single message(in KBytes).
      */
-    private long _maxAllowedMessageSize = 0;
+    private long _maxAllowedMessageSize = 10000;    // 10 MB
 
     /**
      * max allowed number of messages on a queue.
      */
-    private Integer _maxAllowedMessageCount = 0;
+    private Integer _maxAllowedMessageCount = 10000;
 
     /**
-     * max allowed size in bytes for all the messages combined together in a queue.
+     * max allowed size in  KBytes for all the messages combined together in a queue.
      */
-    private long _queueDepth = 0;
+    private long _queueDepth = 10000000;  //   10 GB
 
     /**
      * total messages received by the queue since startup.
@@ -266,26 +266,81 @@
             return _queueDepth;
         }
 
+        // Sets the queue depth, the max queue size
         public void setQueueDepth(Long value)
         {
            _queueDepth = value;
         }
 
+        // Returns the size of messages in the queue
+        public Long getQueueSize()
+        {
+            List<AMQMessage> list = _deliveryMgr.getMessages();
+            if (list.size() == 0)
+                return 0l;
+
+            long queueSize = 0;
+            for (AMQMessage message : list)
+            {
+                queueSize = queueSize + getMessageSize(message);
+            }
+            return new Long(Math.round(queueSize/100));
+        }
         // Operations
 
-        private void checkForNotification()
+        // calculates the size of an AMQMessage
+        private long getMessageSize(AMQMessage msg)
         {
-            if (getMessageCount() >= getMaximumMessageCount())
+            if (msg == null)
+                return 0l;
+
+            List<ContentBody> cBodies = msg.getContentBodies();
+            long messageSize = 0;
+            for (ContentBody body : cBodies)
             {
-                Notification n = new Notification(
+                if (body != null)
+                    messageSize = messageSize + body.getSize();
+            }
+            return messageSize;
+        }
+
+        // Checks if there is any notification to be send to the listeners
+        private void checkForNotification(AMQMessage msg)
+        {
+            // Check for message count
+            Integer msgCount = getMessageCount();
+            if (msgCount >= getMaximumMessageCount())
+            {
+                notifyClients("MessageCount = " + msgCount + ", Queue has reached its size limit and is now full.");
+            }
+
+            // Check for received message size
+            long messageSize = getMessageSize(msg);
+            if (messageSize >= getMaximumMessageSize())
+            {
+                notifyClients("MessageSize = " + messageSize + ", Message size (MessageID="+ msg.getMessageId() +
+                                ")is higher than the threshold value");
+            }
+
+            // Check for queue size in bytes
+            long queueSize = getQueueSize();
+            if (queueSize >= getQueueDepth())
+            {
+                notifyClients("QueueSize = " + queueSize + ", Queue size has reached the threshold value");
+            }
+        }
+
+        // Send the notification to the listeners
+        private void notifyClients(String notificationMsg)
+        {
+            Notification n = new Notification(
                         MonitorNotification.THRESHOLD_VALUE_EXCEEDED,
                         this,
                         ++_notificationSequenceNumber,
                         System.currentTimeMillis(),
-                        "MessageCount = " + getMessageCount() + ", Queue has reached its size limit and is now full.");
+                        notificationMsg);
 
                 _broadcaster.sendNotification(n);
-            }
         }
 
         public void deleteMessageFromTop() throws JMException
@@ -634,10 +689,11 @@
     private void process(AMQMessage msg) throws FailedDequeueException
     {
         _deliveryMgr.deliver(getName(), msg);
+        updateReceivedMessageCount(msg);
         try
         {
             msg.checkDeliveredToConsumer();
-            updateReceivedMessageCount();
+            updateReceivedMessageCount(msg);
         }
         catch(NoConsumersException e)
         {
@@ -645,14 +701,13 @@
             // from the queue:
             dequeue(msg);
         }
-
     }
 
     void dequeue(AMQMessage msg) throws FailedDequeueException
     {
         try
         {
-            msg.decrementReference();                
+            msg.decrementReference();
             msg.dequeue(this);
         }
         catch(AMQException e)
@@ -660,7 +715,7 @@
             throw new FailedDequeueException(_name, e);
         }
     }
-    
+
     public void deliverAsync()
     {
         _deliveryMgr.processAsync(_asyncDelivery);
@@ -671,10 +726,10 @@
         return _subscribers;
     }
 
-    protected void updateReceivedMessageCount()
+    protected void updateReceivedMessageCount(AMQMessage msg)
     {
         _totalMessagesReceived++;
-        _managedObject.checkForNotification();
+        _managedObject.checkForNotification(msg);
     }
 
     public boolean equals(Object o)

Modified: incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/queue/ManagedQueue.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/queue/ManagedQueue.java?view=diff&rev=464900&r1=464899&r2=464900
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/queue/ManagedQueue.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/queue/ManagedQueue.java Tue Oct 17 04:10:39 2006
@@ -78,7 +78,7 @@
     Integer getMessageCount() throws IOException;
 
     /**
-     * Returns the maximum size of a message (in bytes) allowed to be accepted by the
+     * Returns the maximum size of a message (in kbytes) allowed to be accepted by the
      * ManagedQueue. This is useful in setting notifications or taking
      * appropriate action, if the size of the message received is more than
      * the allowed size.
@@ -89,14 +89,14 @@
     Long getMaximumMessageSize() throws IOException;
 
     /**
-     * Sets the maximum size of the message (in bytes) that is allowed to be
+     * Sets the maximum size of the message (in kbytes) that is allowed to be
      * accepted by the Queue.
-     * @param bytes  maximum size of message.
+     * @param size  maximum size of message.
      * @throws IOException
      */
     @MBeanAttribute(name="MaximumMessageSize",
-                         description="Maximum size of a message in bytes allowed for this Queue")
-    void setMaximumMessageSize(Long bytes) throws IOException;
+                         description="Maximum size(KB) of a message allowed for this Queue")
+    void setMaximumMessageSize(Long size) throws IOException;
 
     /**
      * Returns the total number of subscribers to the queue.
@@ -142,6 +142,14 @@
     void setMaximumMessageCount(Integer value) throws IOException;
 
     /**
+     * Size of messages in the queue
+     * @return
+     * @throws IOException
+     */
+    @MBeanAttribute(name="QueueSize", description="Size of messages(KB) in the queue")
+    Long getQueueSize() throws IOException;
+
+    /**
      * Tells the maximum size of all the messages combined together,
      * that can be stored in the queue. This is useful for setting notifications
      * or taking required action if the size of messages stored in the queue
@@ -158,7 +166,7 @@
      * @throws IOException
      */
     @MBeanAttribute(name="QueueDepth",
-                         description="The size of all the messages together, that can be stored in the queue")
+                         description="The size(KB) of all the messages together, that can be stored in the queue")
     void setQueueDepth(Long value) throws IOException;