You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by gs...@apache.org on 2011/02/15 15:18:19 UTC

svn commit: r1070914 - in /qpid/trunk/qpid: specs/management-schema.xml tests/src/py/qpid_tests/broker_0_10/__init__.py tests/src/py/qpid_tests/broker_0_10/threshold.py

Author: gsim
Date: Tue Feb 15 14:18:19 2011
New Revision: 1070914

URL: http://svn.apache.org/viewvc?rev=1070914&view=rev
Log:
QPID-3002: Configurable threshold alerts for queues; added spec change and tests missing from last commit

Added:
    qpid/trunk/qpid/tests/src/py/qpid_tests/broker_0_10/threshold.py
Modified:
    qpid/trunk/qpid/specs/management-schema.xml
    qpid/trunk/qpid/tests/src/py/qpid_tests/broker_0_10/__init__.py

Modified: qpid/trunk/qpid/specs/management-schema.xml
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/specs/management-schema.xml?rev=1070914&r1=1070913&r2=1070914&view=diff
==============================================================================
--- qpid/trunk/qpid/specs/management-schema.xml (original)
+++ qpid/trunk/qpid/specs/management-schema.xml Tue Feb 15 14:18:19 2011
@@ -380,6 +380,8 @@
     <arg name="reason"  type="lstr"   desc="Reason for a failure"/>
     <arg name="rhost"   type="sstr"   desc="Address (i.e. DNS name, IP address, etc.) of a remotely connected host"/>
     <arg name="user"    type="sstr"   desc="Authentication identity"/>
+    <arg name="msgDepth" type="count32" desc="Current size of queue in messages"/>
+    <arg name="byteDepth" type="count32" desc="Current size of queue in bytes"/>
   </eventArguments>
 
   <event name="clientConnect"     sev="inform" args="rhost, user"/>
@@ -395,5 +397,6 @@
   <event name="unbind"            sev="inform" args="rhost, user, exName, qName, key"/>
   <event name="subscribe"         sev="inform" args="rhost, user, qName, dest, excl, args"/>
   <event name="unsubscribe"       sev="inform" args="rhost, user, dest"/>
+  <event name="queueThresholdExceeded"       sev="warn" args="qName, msgDepth, byteDepth"/>
 </schema>
 

Modified: qpid/trunk/qpid/tests/src/py/qpid_tests/broker_0_10/__init__.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/tests/src/py/qpid_tests/broker_0_10/__init__.py?rev=1070914&r1=1070913&r2=1070914&view=diff
==============================================================================
--- qpid/trunk/qpid/tests/src/py/qpid_tests/broker_0_10/__init__.py (original)
+++ qpid/trunk/qpid/tests/src/py/qpid_tests/broker_0_10/__init__.py Tue Feb 15 14:18:19 2011
@@ -31,3 +31,4 @@ from queue import *
 from tx import *
 from lvq import *
 from priority import *
+from threshold import *

Added: qpid/trunk/qpid/tests/src/py/qpid_tests/broker_0_10/threshold.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/tests/src/py/qpid_tests/broker_0_10/threshold.py?rev=1070914&view=auto
==============================================================================
--- qpid/trunk/qpid/tests/src/py/qpid_tests/broker_0_10/threshold.py (added)
+++ qpid/trunk/qpid/tests/src/py/qpid_tests/broker_0_10/threshold.py Tue Feb 15 14:18:19 2011
@@ -0,0 +1,124 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+# 
+#   http://www.apache.org/licenses/LICENSE-2.0
+# 
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+from qpid.messaging import *
+from qpid.tests.messaging import Base
+import math
+
+class ThresholdTests (Base):
+    """
+    Test queue threshold events are sent and received correctly
+    """ 
+
+    def setup_connection(self):
+        return Connection.establish(self.broker, **self.connection_options())
+
+    def setup_session(self):
+        return self.conn.session()
+
+    def do_threshold_test(self, key, value, messages):
+        rcv = self.ssn.receiver("qmf.default.topic/agent.ind.event.org_apache_qpid_broker.queueThresholdExceeded.#")
+        snd = self.ssn.sender("ttq; {create:always, node: {x-declare:{auto_delete:True,exclusive:True,arguments:{'%s':%s}}}}" % (key, value))
+        size = 0
+        count = 0
+        for m in messages:
+            snd.send(m)
+            count = count + 1
+            size = size + len(m.content)
+        event = rcv.fetch()
+        schema = event.content[0]["_schema_id"]
+        assert schema["_class_name"] == "queueThresholdExceeded"
+        values = event.content[0]["_values"]
+        assert values["qName"] == "ttq"
+        assert values["msgDepth"] == count, "msgDepth %s, expected %s" % (values["msgDepth"], count)
+        assert values["byteDepth"] == size, "byteDepth %s, expected %s" % (values["byteDepth"], size)
+
+    def test_alert_count(self):
+        self.do_threshold_test("qpid.alert_count", 5, [Message("msg-%s" % i) for i in range(5)])
+
+    def test_alert_size(self):
+        self.do_threshold_test("qpid.alert_size", 25, [Message("msg-%s" % i) for i in range(5)])
+
+    def test_alert_count_alias(self):
+        self.do_threshold_test("x-qpid-maximum-message-count", 10, [Message("msg-%s" % i) for i in range(10)])
+
+    def test_alert_size_alias(self):
+        self.do_threshold_test("x-qpid-maximum-message-size", 15, [Message("msg-%s" % i) for i in range(3)])
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+# 
+#   http://www.apache.org/licenses/LICENSE-2.0
+# 
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+from qpid.messaging import *
+from qpid.tests.messaging import Base
+import math
+
+class ThresholdTests (Base):
+    """
+    Test queue threshold events are sent and received correctly
+    """ 
+
+    def setup_connection(self):
+        return Connection.establish(self.broker, **self.connection_options())
+
+    def setup_session(self):
+        return self.conn.session()
+
+    def do_threshold_test(self, key, value, messages):
+        rcv = self.ssn.receiver("qmf.default.topic/agent.ind.event.org_apache_qpid_broker.queueThresholdExceeded.#")
+        snd = self.ssn.sender("ttq; {create:always, node: {x-declare:{auto_delete:True,exclusive:True,arguments:{'%s':%s}}}}" % (key, value))
+        size = 0
+        count = 0
+        for m in messages:
+            snd.send(m)
+            count = count + 1
+            size = size + len(m.content)
+        event = rcv.fetch()
+        schema = event.content[0]["_schema_id"]
+        assert schema["_class_name"] == "queueThresholdExceeded"
+        values = event.content[0]["_values"]
+        assert values["qName"] == "ttq"
+        assert values["msgDepth"] == count, "msgDepth %s, expected %s" % (values["msgDepth"], count)
+        assert values["byteDepth"] == size, "byteDepth %s, expected %s" % (values["byteDepth"], size)
+
+    def test_alert_count(self):
+        self.do_threshold_test("qpid.alert_count", 5, [Message("msg-%s" % i) for i in range(5)])
+
+    def test_alert_size(self):
+        self.do_threshold_test("qpid.alert_size", 25, [Message("msg-%s" % i) for i in range(5)])
+
+    def test_alert_count_alias(self):
+        self.do_threshold_test("x-qpid-maximum-message-count", 10, [Message("msg-%s" % i) for i in range(10)])
+
+    def test_alert_size_alias(self):
+        self.do_threshold_test("x-qpid-maximum-message-size", 15, [Message("msg-%s" % i) for i in range(3)])



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org