You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kg...@apache.org on 2011/02/04 17:56:13 UTC
svn commit: r1067220 - in /qpid/branches/qpid-2935/qpid/cpp/src/tests:
Makefile.am queue_flow_limit_tests.py run_queue_flow_limit_tests
Author: kgiusti
Date: Fri Feb 4 16:56:13 2011
New Revision: 1067220
URL: http://svn.apache.org/viewvc?rev=1067220&view=rev
Log:
QPID-2935: add simple flow tests
Added:
qpid/branches/qpid-2935/qpid/cpp/src/tests/queue_flow_limit_tests.py
qpid/branches/qpid-2935/qpid/cpp/src/tests/run_queue_flow_limit_tests (with props)
Modified:
qpid/branches/qpid-2935/qpid/cpp/src/tests/Makefile.am
Modified: qpid/branches/qpid-2935/qpid/cpp/src/tests/Makefile.am
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2935/qpid/cpp/src/tests/Makefile.am?rev=1067220&r1=1067219&r2=1067220&view=diff
==============================================================================
--- qpid/branches/qpid-2935/qpid/cpp/src/tests/Makefile.am (original)
+++ qpid/branches/qpid-2935/qpid/cpp/src/tests/Makefile.am Fri Feb 4 16:56:13 2011
@@ -310,7 +310,9 @@ TESTS_ENVIRONMENT = \
$(srcdir)/run_test
system_tests = qpid-client-test quick_perftest quick_topictest run_header_test quick_txtest
-TESTS += start_broker $(system_tests) python_tests stop_broker run_federation_tests run_acl_tests run_cli_tests replication_test dynamic_log_level_test
+TESTS += start_broker $(system_tests) python_tests stop_broker run_federation_tests \
+ run_acl_tests run_cli_tests replication_test dynamic_log_level_test \
+ run_queue_flow_limit_tests
EXTRA_DIST += \
run_test vg_check \
@@ -349,7 +351,8 @@ EXTRA_DIST += \
run_test.ps1 \
start_broker.ps1 \
stop_broker.ps1 \
- topictest.ps1
+ topictest.ps1 \
+ run_queue_flow_limit_tests
check_LTLIBRARIES += libdlclose_noop.la
libdlclose_noop_la_LDFLAGS = -module -rpath $(abs_builddir)
Added: qpid/branches/qpid-2935/qpid/cpp/src/tests/queue_flow_limit_tests.py
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2935/qpid/cpp/src/tests/queue_flow_limit_tests.py?rev=1067220&view=auto
==============================================================================
--- qpid/branches/qpid-2935/qpid/cpp/src/tests/queue_flow_limit_tests.py (added)
+++ qpid/branches/qpid-2935/qpid/cpp/src/tests/queue_flow_limit_tests.py Fri Feb 4 16:56:13 2011
@@ -0,0 +1,245 @@
+#!/usr/bin/env python
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+import sys
+from qpid.testlib import TestBase010
+from qpid import datatypes, messaging
+from qpid.messaging import Message, Empty
+from threading import Thread, Lock
+from logging import getLogger
+from time import sleep
+from subprocess import Popen, PIPE
+from os import environ
+
+class QueueFlowLimitTests(TestBase010):
+
+ def _create_queue(self, name,
+ stop_count=None, resume_count=None,
+ stop_size=None, resume_size=None):
+ """ Create a queue with the given flow settings via the queue.declare
+ command.
+ """
+ args={}
+ if (stop_count is not None):
+ args["qpid.flow_stop_count"] = stop_count;
+ if (resume_count is not None):
+ args["qpid.flow_resume_count"] = resume_count;
+ if (stop_size is not None):
+ args["qpid.flow_stop_size"] = stop_size;
+ if (resume_size is not None):
+ args["qpid.flow_resume_size"] = resume_size;
+
+ self.session.queue_declare(queue=name, arguments=args)
+
+ qs = self.qmf.getObjects(_class="queue")
+ for i in qs:
+ if i.name == name:
+ # verify flow settings
+ if (stop_count is not None):
+ self.assertEqual(i.flowStopCount, stop_count)
+ if (resume_count is not None):
+ self.assertEqual(i.flowResumeCount, resume_count)
+ if (stop_size is not None):
+ self.assertEqual(i.flowStopSize, stop_size)
+ if (resume_size is not None):
+ self.assertEqual(i.flowResumeSize, resume_size)
+ self.assertFalse(i.flowStopped)
+ return i.getObjectId()
+ self.fail("Unable to create queue '%s'" % name)
+ return None
+
+
+ def _delete_queue(self, name):
+ """ Delete a named queue
+ """
+ self.session.queue_delete(queue=name)
+
+
+ def _start_qpid_send(self, queue, count, content="X", capacity=10):
+ """ Use the qpid-send client to generate traffic to a queue.
+ """
+ command = ["qpid-send",
+ "-b", "%s:%s" % (self.broker.host, self.broker.port),
+ "-a", str(queue),
+ "--messages", str(count),
+ "--content-string", str(content),
+ "--capacity", str(capacity)
+ ]
+
+ return Popen(command, stdout=PIPE)
+
+ def _start_qpid_receive(self, queue, count, timeout=5):
+ """ Use the qpid-receive client to consume from a queue.
+ Note well: prints one line of text to stdout for each consumed msg.
+ """
+ command = ["qpid-receive",
+ "-b", "%s:%s" % (self.broker.host, self.broker.port),
+ "-a", str(queue),
+ "--messages", str(count),
+ "--timeout", str(timeout),
+ "--print-content", "yes"
+ ]
+ return Popen(command, stdout=PIPE)
+
+
+
+ def test_qpid_config_cmd(self):
+ """ Test the qpid-config command's ability to configure a queue's flow
+ control thresholds.
+ """
+ tool = environ.get("QPID_CONFIG_EXEC")
+ if tool:
+ command = [tool,
+ "--broker-addr=%s:%s" % (self.broker.host, self.broker.port),
+ "add", "queue", "test01",
+ "--flow-stop-count=999",
+ "--flow-resume-count=55",
+ "--flow-stop-size=5000000",
+ "--flow-resume-size=100000"]
+ #cmd = Popen(command, stdout=PIPE)
+ cmd = Popen(command)
+ cmd.wait()
+ self.assertEqual(cmd.returncode, 0)
+
+ # now verify the settings
+ self.startQmf();
+ qs = self.qmf.getObjects(_class="queue")
+ for i in qs:
+ if i.name == "test01":
+ self.assertEqual(i.flowStopCount, 999)
+ self.assertEqual(i.flowResumeCount, 55)
+ self.assertEqual(i.flowStopSize, 5000000)
+ self.assertEqual(i.flowResumeSize, 100000)
+ self.assertFalse(i.flowStopped)
+ break;
+ self.assertEqual(i.name, "test01")
+ self._delete_queue("test01")
+
+
+ def test_flow_count(self):
+ """ Create a queue with count-based flow limit. Spawn several
+ producers which will exceed the limit. Verify limit exceeded. Consume
+ all messages. Verify flow control released.
+ """
+ self.startQmf();
+ oid = self._create_queue("test-q", stop_count=373, resume_count=229)
+
+ sndr1 = self._start_qpid_send("test-q", count=1213, content="XXX", capacity=50);
+ sndr2 = self._start_qpid_send("test-q", count=797, content="Y", capacity=13);
+ sndr3 = self._start_qpid_send("test-q", count=331, content="ZZZZZ", capacity=149);
+ totalMsgs = 1213 + 797 + 331
+
+
+ # wait until flow control is active
+ count = 0
+ while self.qmf.getObjects(_objectId=oid)[0].flowStopped == False and \
+ count < 10:
+ sleep(1);
+ count += 1;
+ self.assertTrue(self.qmf.getObjects(_objectId=oid)[0].flowStopped)
+ depth = self.qmf.getObjects(_objectId=oid)[0].msgDepth
+ self.assertGreater(depth, 373)
+
+ # now wait until the enqueues stop happening - ensure that
+ # not all msgs have been sent (senders are blocked)
+ sleep(1)
+ newDepth = self.qmf.getObjects(_objectId=oid)[0].msgDepth
+ while depth != newDepth:
+ depth = newDepth;
+ sleep(1)
+ newDepth = self.qmf.getObjects(_objectId=oid)[0].msgDepth
+ self.assertGreater(totalMsgs, depth)
+
+ # drain the queue
+ rcvr = self._start_qpid_receive("test-q",
+ count=totalMsgs)
+ count = 0;
+ x = rcvr.stdout.readline() # prints a line for each received msg
+ while x:
+ count += 1;
+ x = rcvr.stdout.readline()
+
+ sndr1.wait();
+ sndr2.wait();
+ sndr3.wait();
+ rcvr.wait();
+
+ self.assertEqual(count, totalMsgs)
+ self.assertFalse(self.qmf.getObjects(_objectId=oid)[0].flowStopped)
+
+ self._delete_queue("test-q")
+
+
+ def test_flow_size(self):
+ """ Create a queue with size-based flow limit. Spawn several
+ producers which will exceed the limit. Verify limit exceeded. Consume
+ all messages. Verify flow control released.
+ """
+ self.startQmf();
+ oid = self._create_queue("test-q", stop_size=351133, resume_size=251143)
+
+ sndr1 = self._start_qpid_send("test-q", count=1699, content="X"*439, capacity=53);
+ sndr2 = self._start_qpid_send("test-q", count=1129, content="Y"*631, capacity=13);
+ sndr3 = self._start_qpid_send("test-q", count=881, content="Z"*823, capacity=149);
+ totalMsgs = 1699 + 1129 + 881
+ totalBytes = 439 + 631 + 823
+
+ # wait until flow control is active
+ count = 0
+ while self.qmf.getObjects(_objectId=oid)[0].flowStopped == False and \
+ count < 10:
+ sleep(1);
+ count += 1;
+ self.assertTrue(self.qmf.getObjects(_objectId=oid)[0].flowStopped)
+ self.assertGreater(self.qmf.getObjects(_objectId=oid)[0].byteDepth, 351133)
+
+ # now wait until the enqueues stop happening - ensure that
+ # not all msgs have been sent (senders are blocked)
+ depth = self.qmf.getObjects(_objectId=oid)[0].msgDepth
+ sleep(1)
+ newDepth = self.qmf.getObjects(_objectId=oid)[0].msgDepth
+ while depth != newDepth:
+ depth = newDepth;
+ sleep(1)
+ newDepth = self.qmf.getObjects(_objectId=oid)[0].msgDepth
+ self.assertGreater(totalMsgs, depth)
+
+ # drain the queue
+ rcvr = self._start_qpid_receive("test-q",
+ count=totalMsgs)
+ count = 0;
+ x = rcvr.stdout.readline() # prints a line for each received msg
+ while x:
+ count += 1;
+ x = rcvr.stdout.readline()
+
+ sndr1.wait();
+ sndr2.wait();
+ sndr3.wait();
+ rcvr.wait();
+
+ self.assertEqual(count, totalMsgs)
+ self.assertFalse(self.qmf.getObjects(_objectId=oid)[0].flowStopped)
+
+ self._delete_queue("test-q")
+
+
+
+
Added: qpid/branches/qpid-2935/qpid/cpp/src/tests/run_queue_flow_limit_tests
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2935/qpid/cpp/src/tests/run_queue_flow_limit_tests?rev=1067220&view=auto
==============================================================================
--- qpid/branches/qpid-2935/qpid/cpp/src/tests/run_queue_flow_limit_tests (added)
+++ qpid/branches/qpid-2935/qpid/cpp/src/tests/run_queue_flow_limit_tests Fri Feb 4 16:56:13 2011
@@ -0,0 +1,55 @@
+#!/bin/sh
+
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+# Run tests against Queue producer flow control.
+
+source ./test_env.sh
+test -d $PYTHON_DIR || { echo "Skipping queue flow control tests, no python dir."; exit 0; }
+
+LOG_FILE=queue_flow_limit_test.log
+PORT=""
+
+trap stop_broker INT TERM QUIT
+
+error() {
+ echo $*
+ exit 1;
+}
+
+start_broker() {
+ rm -rf $LOG_FILE
+ PORT=$($QPIDD_EXEC --auth=no --no-module-dir --daemon --port=0 -t --log-to-file $LOG_FILE) || error "Could not start broker"
+}
+
+stop_broker() {
+ test -n "$PORT" && $QPIDD_EXEC --no-module-dir --quit --port $PORT
+}
+
+start_broker
+echo "Running Queue flow limit tests using broker on port $PORT"
+$QPID_PYTHON_TEST -m queue_flow_limit_tests $SKIPTESTS -b localhost:$PORT $@
+RETCODE=$?
+stop_broker
+if test x$RETCODE != x0; then
+ echo "FAIL queue flow limit tests"; exit 1;
+fi
+rm -rf $LOG_FILE
+
Propchange: qpid/branches/qpid-2935/qpid/cpp/src/tests/run_queue_flow_limit_tests
------------------------------------------------------------------------------
svn:executable = *
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org