You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by sh...@apache.org on 2011/10/21 16:42:51 UTC
svn commit: r1187375 [17/43] - in /qpid/branches/QPID-2519: ./ bin/ cpp/
cpp/bindings/ cpp/bindings/qmf/python/ cpp/bindings/qmf/ruby/
cpp/bindings/qmf/tests/ cpp/bindings/qmf2/ cpp/bindings/qmf2/examples/cpp/
cpp/bindings/qmf2/python/ cpp/bindings/qmf...
Modified: qpid/branches/QPID-2519/cpp/src/tests/cluster_tests.py
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/tests/cluster_tests.py?rev=1187375&r1=1187374&r2=1187375&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/tests/cluster_tests.py (original)
+++ qpid/branches/QPID-2519/cpp/src/tests/cluster_tests.py Fri Oct 21 14:42:12 2011
@@ -18,12 +18,13 @@
# under the License.
#
-import os, signal, sys, time, imp, re, subprocess, glob, cluster_test_logs
+import os, signal, sys, time, imp, re, subprocess, glob, random, logging
+import cluster_test_logs
from qpid import datatypes, messaging
from brokertest import *
from qpid.harness import Skipped
-from qpid.messaging import Message, Empty
-from threading import Thread, Lock
+from qpid.messaging import Message, Empty, Disposition, REJECTED, util
+from threading import Thread, Lock, Condition
from logging import getLogger
from itertools import chain
from tempfile import NamedTemporaryFile
@@ -96,9 +97,15 @@ class ShortTests(BrokerTest):
destination="amq.direct",
message=qpid.datatypes.Message(props, "content"))
+ # Try message with TTL and differnet headers/properties
+ cluster[0].send_message("q", Message(durable=True, ttl=100000))
+ cluster[0].send_message("q", Message(durable=True, properties={}, ttl=100000))
+ cluster[0].send_message("q", Message(durable=True, properties={"x":10}, ttl=100000))
+
# Now update a new member and compare their dumps.
cluster.start(args=["--test-store-dump", "updatee.dump"])
assert readfile("direct.dump") == readfile("updatee.dump")
+
os.remove("direct.dump")
os.remove("updatee.dump")
@@ -108,19 +115,22 @@ class ShortTests(BrokerTest):
acl=os.path.join(os.getcwd(), "policy.acl")
aclf=file(acl,"w")
aclf.write("""
-acl deny zag@QPID create queue
-acl allow all all
+acl allow zig@QPID all all
+acl deny all all
""")
aclf.close()
- cluster = self.cluster(2, args=["--auth", "yes",
+ cluster = self.cluster(1, args=["--auth", "yes",
"--sasl-config", sasl_config,
"--load-module", os.getenv("ACL_LIB"),
"--acl-file", acl])
# Valid user/password, ensure queue is created.
c = cluster[0].connect(username="zig", password="zig")
- c.session().sender("ziggy;{create:always}")
+ c.session().sender("ziggy;{create:always,node:{x-declare:{exclusive:true}}}")
c.close()
+ cluster.start() # Start second node.
+
+ # Check queue is created on second node.
c = cluster[1].connect(username="zig", password="zig")
c.session().receiver("ziggy;{assert:always}")
c.close()
@@ -149,7 +159,7 @@ acl allow all all
self.fail("Expected exception")
except messaging.exceptions.UnauthorizedAccess: pass
# make sure the queue was not created at the other node.
- c = cluster[0].connect(username="zag", password="zag")
+ c = cluster[1].connect(username="zig", password="zig")
try:
s = c.session()
s.sender("zaggy;{assert:always}")
@@ -157,6 +167,35 @@ acl allow all all
self.fail("Expected exception")
except messaging.exceptions.NotFound: pass
+ def test_sasl_join(self):
+ """Verify SASL authentication between brokers when joining a cluster."""
+ sasl_config=os.path.join(self.rootdir, "sasl_config")
+ # Test with a valid username/password
+ cluster = self.cluster(1, args=["--auth", "yes",
+ "--sasl-config", sasl_config,
+ "--load-module", os.getenv("ACL_LIB"),
+ "--cluster-username=zig",
+ "--cluster-password=zig",
+ "--cluster-mechanism=PLAIN"
+ ])
+ cluster.start()
+ cluster.ready()
+ c = cluster[1].connect(username="zag", password="zag")
+
+ # Test with an invalid username/password
+ cluster = self.cluster(1, args=["--auth", "yes",
+ "--sasl-config", sasl_config,
+ "--load-module", os.getenv("ACL_LIB"),
+ "--cluster-username=x",
+ "--cluster-password=y",
+ "--cluster-mechanism=PLAIN"
+ ])
+ try:
+ cluster.start(expect=EXPECT_EXIT_OK)
+ cluster[1].ready()
+ self.fail("Expected exception")
+ except: pass
+
def test_user_id_update(self):
"""Ensure that user-id of an open session is updated to new cluster members"""
sasl_config=os.path.join(self.rootdir, "sasl_config")
@@ -246,25 +285,6 @@ acl allow all all
session1 = cluster[1].connect().session()
for q in queues: self.assert_browse(session1, "q1", ["foo"])
- def test_dr_no_message(self):
- """Regression test for https://bugzilla.redhat.com/show_bug.cgi?id=655141
- Joining broker crashes with 'error deliveryRecord no update message'
- """
-
- cluster = self.cluster(1)
- session0 = cluster[0].connect().session()
- s = session0.sender("q1;{create:always}")
- s.send(Message("a", ttl=0.05), sync=False)
- s.send(Message("b", ttl=0.05), sync=False)
- r1 = session0.receiver("q1")
- self.assertEqual("a", r1.fetch(timeout=0).content)
- r2 = session0.receiver("q1;{mode:browse}")
- self.assertEqual("b", r2.fetch(timeout=0).content)
- # Leave messages un-acknowledged, let the expire, then start new broker.
- time.sleep(.1)
- cluster.start()
- self.assertRaises(Empty, cluster[1].connect().session().receiver("q1").fetch,0)
-
def test_route_update(self):
"""Regression test for https://issues.apache.org/jira/browse/QPID-2982
Links and bridges associated with routes were not replicated on update.
@@ -272,6 +292,7 @@ acl allow all all
client was attached.
"""
args=["--mgmt-pub-interval=1","--log-enable=trace+:management"]
+ # First broker will be killed.
cluster0 = self.cluster(1, args=args)
cluster1 = self.cluster(1, args=args)
assert 0 == subprocess.call(
@@ -301,9 +322,695 @@ acl allow all all
qpid_tool.wait()
scanner.join()
assert scanner.found
+ # Regression test for https://issues.apache.org/jira/browse/QPID-3235
+ # Inconsistent stats when changing elder.
+
+ # Force a change of elder
+ cluster0.start()
+ cluster0[0].expect=EXPECT_EXIT_FAIL # About to die.
+ cluster0[0].kill()
+ time.sleep(2) # Allow a management interval to pass.
# Verify logs are consistent
cluster_test_logs.verify_logs()
+ def test_redelivered(self):
+ """Verify that redelivered flag is set correctly on replayed messages"""
+ cluster = self.cluster(2, expect=EXPECT_EXIT_FAIL)
+ url = "amqp:tcp:%s,tcp:%s" % (cluster[0].host_port(), cluster[1].host_port())
+ queue = "my-queue"
+ cluster[0].declare_queue(queue)
+ self.sender = self.popen(
+ ["qpid-send",
+ "--broker", url,
+ "--address", queue,
+ "--sequence=true",
+ "--send-eos=1",
+ "--messages=100000",
+ "--connection-options={%s}"%(Cluster.CONNECTION_OPTIONS)
+ ])
+ self.receiver = self.popen(
+ ["qpid-receive",
+ "--broker", url,
+ "--address", queue,
+ "--ignore-duplicates",
+ "--check-redelivered",
+ "--connection-options={%s}"%(Cluster.CONNECTION_OPTIONS),
+ "--forever"
+ ])
+ time.sleep(1)#give sender enough time to have some messages to replay
+ cluster[0].kill()
+ self.sender.wait()
+ self.receiver.wait()
+ cluster[1].kill()
+
+ class BlockedSend(Thread):
+ """Send a message, send is expected to block.
+ Verify that it does block (for a given timeout), then allow
+ waiting till it unblocks when it is expected to do so."""
+ def __init__(self, sender, msg):
+ self.sender, self.msg = sender, msg
+ self.blocked = True
+ self.condition = Condition()
+ self.timeout = 0.1 # Time to wait for expected results.
+ Thread.__init__(self)
+ def run(self):
+ try:
+ self.sender.send(self.msg, sync=True)
+ self.condition.acquire()
+ try:
+ self.blocked = False
+ self.condition.notify()
+ finally: self.condition.release()
+ except Exception,e: print "BlockedSend exception: %s"%e
+ def start(self):
+ Thread.start(self)
+ time.sleep(self.timeout)
+ assert self.blocked # Expected to block
+ def assert_blocked(self): assert self.blocked
+ def wait(self): # Now expecting to unblock
+ self.condition.acquire()
+ try:
+ while self.blocked:
+ self.condition.wait(self.timeout)
+ if self.blocked: raise Exception("Timed out waiting for send to unblock")
+ finally: self.condition.release()
+ self.join()
+
+ def queue_flowlimit_test(self, brokers):
+ """Verify that the queue's flowlimit configuration and state are
+ correctly replicated.
+ The brokers argument allows this test to run on single broker,
+ cluster of 2 pre-startd brokers or cluster where second broker
+ starts after queue is in flow control.
+ """
+ # configure a queue with a specific flow limit on first broker
+ ssn0 = brokers.first().connect().session()
+ s0 = ssn0.sender("flq; {create:always, node:{type:queue, x-declare:{arguments:{'qpid.flow_stop_count':5, 'qpid.flow_resume_count':3}}}}")
+ brokers.first().startQmf()
+ q1 = [q for q in brokers.first().qmf_session.getObjects(_class="queue") if q.name == "flq"][0]
+ oid = q1.getObjectId()
+ self.assertEqual(q1.name, "flq")
+ self.assertEqual(q1.arguments, {u'qpid.flow_stop_count': 5L, u'qpid.flow_resume_count': 3L})
+ assert not q1.flowStopped
+ self.assertEqual(q1.flowStoppedCount, 0)
+
+ # fill the queue on one broker until flow control is active
+ for x in range(5): s0.send(Message(str(x)))
+ sender = ShortTests.BlockedSend(s0, Message(str(6)))
+ sender.start() # Tests that sender does block
+ # Verify the broker queue goes into a flowStopped state
+ deadline = time.time() + 1
+ while not q1.flowStopped and time.time() < deadline: q1.update()
+ assert q1.flowStopped
+ self.assertEqual(q1.flowStoppedCount, 1)
+ sender.assert_blocked() # Still blocked
+
+ # Now verify the both brokers in cluster have same configuration
+ brokers.second().startQmf()
+ qs = brokers.second().qmf_session.getObjects(_objectId=oid)
+ self.assertEqual(len(qs), 1)
+ q2 = qs[0]
+ self.assertEqual(q2.name, "flq")
+ self.assertEqual(q2.arguments, {u'qpid.flow_stop_count': 5L, u'qpid.flow_resume_count': 3L})
+ assert q2.flowStopped
+ self.assertEqual(q2.flowStoppedCount, 1)
+
+ # now drain the queue using a session to the other broker
+ ssn1 = brokers.second().connect().session()
+ r1 = ssn1.receiver("flq", capacity=6)
+ for x in range(4):
+ r1.fetch(timeout=0)
+ ssn1.acknowledge()
+ sender.wait() # Verify no longer blocked.
+
+ # and re-verify state of queue on both brokers
+ q1.update()
+ assert not q1.flowStopped
+ q2.update()
+ assert not q2.flowStopped
+
+ ssn0.connection.close()
+ ssn1.connection.close()
+ cluster_test_logs.verify_logs()
+
+ def test_queue_flowlimit(self):
+ """Test flow limits on a standalone broker"""
+ broker = self.broker()
+ class Brokers:
+ def first(self): return broker
+ def second(self): return broker
+ self.queue_flowlimit_test(Brokers())
+
+ def test_queue_flowlimit_cluster(self):
+ cluster = self.cluster(2)
+ class Brokers:
+ def first(self): return cluster[0]
+ def second(self): return cluster[1]
+ self.queue_flowlimit_test(Brokers())
+
+ def test_queue_flowlimit_cluster_join(self):
+ cluster = self.cluster(1)
+ class Brokers:
+ def first(self): return cluster[0]
+ def second(self):
+ if len(cluster) == 1: cluster.start()
+ return cluster[1]
+ self.queue_flowlimit_test(Brokers())
+
+ def test_queue_flowlimit_replicate(self):
+ """ Verify that a queue which is in flow control BUT has drained BELOW
+ the flow control 'stop' threshold, is correctly replicated when a new
+ broker is added to the cluster.
+ """
+
+ class AsyncSender(Thread):
+ """Send a fixed number of msgs from a sender in a separate thread
+ so it may block without blocking the test.
+ """
+ def __init__(self, broker, address, count=1, size=4):
+ Thread.__init__(self)
+ self.daemon = True
+ self.broker = broker
+ self.queue = address
+ self.count = count
+ self.size = size
+ self.done = False
+
+ def run(self):
+ self.sender = subprocess.Popen(["qpid-send",
+ "--capacity=1",
+ "--content-size=%s" % self.size,
+ "--messages=%s" % self.count,
+ "--failover-updates",
+ "--connection-options={%s}"%(Cluster.CONNECTION_OPTIONS),
+ "--address=%s" % self.queue,
+ "--broker=%s" % self.broker.host_port()])
+ self.sender.wait()
+ self.done = True
+
+ cluster = self.cluster(2)
+ # create a queue with rather draconian flow control settings
+ ssn0 = cluster[0].connect().session()
+ s0 = ssn0.sender("flq; {create:always, node:{type:queue, x-declare:{arguments:{'qpid.flow_stop_count':100, 'qpid.flow_resume_count':20}}}}")
+
+ # fire off the sending thread to broker[0], and wait until the queue
+ # hits flow control on broker[1]
+ sender = AsyncSender(cluster[0], "flq", count=110);
+ sender.start();
+
+ cluster[1].startQmf()
+ q_obj = [q for q in cluster[1].qmf_session.getObjects(_class="queue") if q.name == "flq"][0]
+ deadline = time.time() + 10
+ while not q_obj.flowStopped and time.time() < deadline:
+ q_obj.update()
+ assert q_obj.flowStopped
+ assert not sender.done
+ assert q_obj.msgDepth < 110
+
+ # Now drain enough messages on broker[1] to drop below the flow stop
+ # threshold, but not relieve flow control...
+ receiver = subprocess.Popen(["qpid-receive",
+ "--messages=15",
+ "--timeout=1",
+ "--print-content=no",
+ "--failover-updates",
+ "--connection-options={%s}"%(Cluster.CONNECTION_OPTIONS),
+ "--ack-frequency=1",
+ "--address=flq",
+ "--broker=%s" % cluster[1].host_port()])
+ receiver.wait()
+ q_obj.update()
+ assert q_obj.flowStopped
+ assert not sender.done
+ current_depth = q_obj.msgDepth
+
+ # add a new broker to the cluster, and verify that the queue is in flow
+ # control on that broker
+ cluster.start()
+ cluster[2].startQmf()
+ q_obj = [q for q in cluster[2].qmf_session.getObjects(_class="queue") if q.name == "flq"][0]
+ assert q_obj.flowStopped
+ assert q_obj.msgDepth == current_depth
+
+ # now drain the queue on broker[2], and verify that the sender becomes
+ # unblocked
+ receiver = subprocess.Popen(["qpid-receive",
+ "--messages=95",
+ "--timeout=1",
+ "--print-content=no",
+ "--failover-updates",
+ "--connection-options={%s}"%(Cluster.CONNECTION_OPTIONS),
+ "--ack-frequency=1",
+ "--address=flq",
+ "--broker=%s" % cluster[2].host_port()])
+ receiver.wait()
+ q_obj.update()
+ assert not q_obj.flowStopped
+ self.assertEqual(q_obj.msgDepth, 0)
+
+ # verify that the sender has become unblocked
+ sender.join(timeout=5)
+ assert not sender.isAlive()
+ assert sender.done
+
+ def test_blocked_queue_delete(self):
+ """Verify that producers which are blocked on a queue due to flow
+ control are unblocked when that queue is deleted.
+ """
+
+ cluster = self.cluster(2)
+ cluster[0].startQmf()
+ cluster[1].startQmf()
+
+ # configure a queue with a specific flow limit on first broker
+ ssn0 = cluster[0].connect().session()
+ s0 = ssn0.sender("flq; {create:always, node:{type:queue, x-declare:{arguments:{'qpid.flow_stop_count':5, 'qpid.flow_resume_count':3}}}}")
+ q1 = [q for q in cluster[0].qmf_session.getObjects(_class="queue") if q.name == "flq"][0]
+ oid = q1.getObjectId()
+ self.assertEqual(q1.name, "flq")
+ self.assertEqual(q1.arguments, {u'qpid.flow_stop_count': 5L, u'qpid.flow_resume_count': 3L})
+ assert not q1.flowStopped
+ self.assertEqual(q1.flowStoppedCount, 0)
+
+ # fill the queue on one broker until flow control is active
+ for x in range(5): s0.send(Message(str(x)))
+ sender = ShortTests.BlockedSend(s0, Message(str(6)))
+ sender.start() # Tests that sender does block
+ # Verify the broker queue goes into a flowStopped state
+ deadline = time.time() + 1
+ while not q1.flowStopped and time.time() < deadline: q1.update()
+ assert q1.flowStopped
+ self.assertEqual(q1.flowStoppedCount, 1)
+ sender.assert_blocked() # Still blocked
+
+ # Now verify the both brokers in cluster have same configuration
+ qs = cluster[1].qmf_session.getObjects(_objectId=oid)
+ self.assertEqual(len(qs), 1)
+ q2 = qs[0]
+ self.assertEqual(q2.name, "flq")
+ self.assertEqual(q2.arguments, {u'qpid.flow_stop_count': 5L, u'qpid.flow_resume_count': 3L})
+ assert q2.flowStopped
+ self.assertEqual(q2.flowStoppedCount, 1)
+
+ # now delete the blocked queue from other broker
+ ssn1 = cluster[1].connect().session()
+ self.evaluate_address(ssn1, "flq;{delete:always}")
+ sender.wait() # Verify no longer blocked.
+
+ ssn0.connection.close()
+ ssn1.connection.close()
+ cluster_test_logs.verify_logs()
+
+
+ def test_alternate_exchange_update(self):
+ """Verify that alternate-exchange on exchanges and queues is propagated to new members of a cluster. """
+ cluster = self.cluster(1)
+ s0 = cluster[0].connect().session()
+ # create alt queue bound to amq.fanout exchange, will be destination for alternate exchanges
+ self.evaluate_address(s0, "alt;{create:always,node:{x-bindings:[{exchange:'amq.fanout',queue:alt}]}}")
+ # create direct exchange ex with alternate-exchange amq.fanout and no queues bound
+ self.evaluate_address(s0, "ex;{create:always,node:{type:topic, x-declare:{type:'direct', alternate-exchange:'amq.fanout'}}}")
+ # create queue q with alternate-exchange amq.fanout
+ self.evaluate_address(s0, "q;{create:always,node:{type:queue, x-declare:{alternate-exchange:'amq.fanout'}}}")
+
+ def verify(broker):
+ s = broker.connect().session()
+ # Verify unmatched message goes to ex's alternate.
+ s.sender("ex").send("foo")
+ self.assertEqual("foo", s.receiver("alt").fetch(timeout=0).content)
+ # Verify rejected message goes to q's alternate.
+ s.sender("q").send("bar")
+ msg = s.receiver("q").fetch(timeout=0)
+ self.assertEqual("bar", msg.content)
+ s.acknowledge(msg, Disposition(REJECTED)) # Reject the message
+ self.assertEqual("bar", s.receiver("alt").fetch(timeout=0).content)
+
+ verify(cluster[0])
+ cluster.start()
+ verify(cluster[1])
+
+ def test_binding_order(self):
+ """Regression test for binding order inconsistency in cluster"""
+ cluster = self.cluster(1)
+ c0 = cluster[0].connect()
+ s0 = c0.session()
+ # Declare multiple queues bound to same key on amq.topic
+ def declare(q,max=0):
+ if max: declare = 'x-declare:{arguments:{"qpid.max_count":%d, "qpid.flow_stop_count":0}}'%max
+ else: declare = 'x-declare:{}'
+ bind='x-bindings:[{queue:%s,key:key,exchange:"amq.topic"}]'%(q)
+ s0.sender("%s;{create:always,node:{%s,%s}}" % (q,declare,bind))
+ declare('d',max=4) # Only one with a limit
+ for q in ['c', 'b','a']: declare(q)
+ # Add a cluster member, send enough messages to exceed the max count
+ cluster.start()
+ try:
+ s = s0.sender('amq.topic/key')
+ for m in xrange(1,6): s.send(Message(str(m)))
+ self.fail("Expected capacity exceeded exception")
+ except messaging.exceptions.TargetCapacityExceeded: pass
+ c1 = cluster[1].connect()
+ s1 = c1.session()
+ s0 = c0.session() # Old session s0 is broken by exception.
+ # Verify queue contents are consistent.
+ for q in ['a','b','c','d']:
+ self.assertEqual(self.browse(s0, q), self.browse(s1, q))
+ # Verify queue contents are "best effort"
+ for q in ['a','b','c']: self.assert_browse(s1,q,[str(n) for n in xrange(1,6)])
+ self.assert_browse(s1,'d',[str(n) for n in xrange(1,5)])
+
+ def test_deleted_exchange(self):
+ """QPID-3215: cached exchange reference can cause cluster inconsistencies
+ if exchange is deleted/recreated
+ Verify stand-alone case
+ """
+ cluster = self.cluster()
+ # Verify we do not route message via an exchange that has been destroyed.
+ cluster.start()
+ s0 = cluster[0].connect().session()
+ self.evaluate_address(s0, "ex;{create:always,node:{type:topic}}")
+ self.evaluate_address(s0, "q;{create:always,node:{x-bindings:[{exchange:'ex',queue:q,key:foo}]}}")
+ send0 = s0.sender("ex/foo")
+ send0.send("foo")
+ self.assert_browse(s0, "q", ["foo"])
+ self.evaluate_address(s0, "ex;{delete:always}")
+ try:
+ send0.send("bar") # Should fail, exchange is deleted.
+ self.fail("Expected not-found exception")
+ except qpid.messaging.NotFound: pass
+ self.assert_browse(cluster[0].connect().session(), "q", ["foo"])
+
+ def test_deleted_exchange_inconsistent(self):
+ """QPID-3215: cached exchange reference can cause cluster inconsistencies
+ if exchange is deleted/recreated
+
+ Verify cluster inconsistency.
+ """
+ cluster = self.cluster()
+ cluster.start()
+ s0 = cluster[0].connect().session()
+ self.evaluate_address(s0, "ex;{create:always,node:{type:topic}}")
+ self.evaluate_address(s0, "q;{create:always,node:{x-bindings:[{exchange:'ex',queue:q,key:foo}]}}")
+ send0 = s0.sender("ex/foo")
+ send0.send("foo")
+ self.assert_browse(s0, "q", ["foo"])
+
+ cluster.start()
+ s1 = cluster[1].connect().session()
+ self.evaluate_address(s0, "ex;{delete:always}")
+ try:
+ send0.send("bar")
+ self.fail("Expected not-found exception")
+ except qpid.messaging.NotFound: pass
+
+ self.assert_browse(s1, "q", ["foo"])
+
+
+ def test_ttl_consistent(self):
+ """Ensure we don't get inconsistent errors with message that have TTL very close together"""
+ messages = [ Message(str(i), ttl=i/1000.0) for i in xrange(0,1000)]
+ messages.append(Message("x"))
+ cluster = self.cluster(2)
+ sender = cluster[0].connect().session().sender("q;{create:always}")
+
+ def fetch(b):
+ receiver = b.connect().session().receiver("q;{create:always}")
+ while receiver.fetch().content != "x": pass
+
+ for m in messages: sender.send(m, sync=False)
+ for m in messages: sender.send(m, sync=False)
+ fetch(cluster[0])
+ fetch(cluster[1])
+ for m in messages: sender.send(m, sync=False)
+ cluster.start()
+ fetch(cluster[2])
+
+# Some utility code for transaction tests
+XA_RBROLLBACK = 1
+XA_RBTIMEOUT = 2
+XA_OK = 0
+dtx_branch_counter = 0
+
+class DtxStatusException(Exception):
+ def __init__(self, expect, actual):
+ self.expect = expect
+ self.actual = actual
+
+ def str(self):
+ return "DtxStatusException(expect=%s, actual=%s)"%(self.expect, self.actual)
+
+class DtxTestFixture:
+ """Bundle together some common requirements for dtx tests."""
+ def __init__(self, test, broker, name, exclusive=False):
+ self.test = test
+ self.broker = broker
+ self.name = name
+ # Use old API. DTX is not supported in messaging API.
+ self.connection = broker.connect_old()
+ self.session = self.connection.session(name, 1) # 1 second timeout
+ self.queue = self.session.queue_declare(name, exclusive=exclusive)
+ self.session.dtx_select()
+ self.consumer = None
+
+ def xid(self, id=None):
+ if id is None: id = self.name
+ return self.session.xid(format=0, global_id=id)
+
+ def check_status(self, expect, actual):
+ if expect != actual: raise DtxStatusException(expect, actual)
+
+ def start(self, id=None, resume=False):
+ self.check_status(XA_OK, self.session.dtx_start(xid=self.xid(id), resume=resume).status)
+
+ def end(self, id=None, suspend=False):
+ self.check_status(XA_OK, self.session.dtx_end(xid=self.xid(id), suspend=suspend).status)
+
+ def prepare(self, id=None):
+ self.check_status(XA_OK, self.session.dtx_prepare(xid=self.xid(id)).status)
+
+ def commit(self, id=None, one_phase=True):
+ self.check_status(
+ XA_OK, self.session.dtx_commit(xid=self.xid(id), one_phase=one_phase).status)
+
+ def rollback(self, id=None):
+ self.check_status(XA_OK, self.session.dtx_rollback(xid=self.xid(id)).status)
+
+ def set_timeout(self, timeout, id=None):
+ self.session.dtx_set_timeout(xid=self.xid(id),timeout=timeout)
+
+ def send(self, messages):
+ for m in messages:
+ dp=self.session.delivery_properties(routing_key=self.name)
+ mp=self.session.message_properties()
+ self.session.message_transfer(message=qpid.datatypes.Message(dp, mp, m))
+
+ def accept(self):
+ """Accept 1 message from queue"""
+ consumer_tag="%s-consumer"%(self.name)
+ self.session.message_subscribe(queue=self.name, destination=consumer_tag)
+ self.session.message_flow(unit = self.session.credit_unit.message, value = 1, destination = consumer_tag)
+ self.session.message_flow(unit = self.session.credit_unit.byte, value = 0xFFFFFFFFL, destination = consumer_tag)
+ msg = self.session.incoming(consumer_tag).get(timeout=1)
+ self.session.message_cancel(destination=consumer_tag)
+ self.session.message_accept(qpid.datatypes.RangedSet(msg.id))
+ return msg
+
+
+ def verify(self, sessions, messages):
+ for s in sessions:
+ self.test.assert_browse(s, self.name, messages)
+
+class DtxTests(BrokerTest):
+
+ def test_dtx_update(self):
+ """Verify that DTX transaction state is updated to a new broker.
+ Start a collection of transactions, then add a new cluster member,
+ then verify they commit/rollback correctly on the new broker."""
+
+ # Note: multiple test have been bundled into one to avoid the need to start/stop
+ # multiple brokers per test.
+
+ cluster=self.cluster(1)
+ sessions = [cluster[0].connect().session()] # For verify
+
+ # Transaction that will be open when new member joins, then committed.
+ t1 = DtxTestFixture(self, cluster[0], "t1")
+ t1.start()
+ t1.send(["1", "2"])
+ t1.verify(sessions, []) # Not visible outside of transaction
+
+ # Transaction that will be open when new member joins, then rolled back.
+ t2 = DtxTestFixture(self, cluster[0], "t2")
+ t2.start()
+ t2.send(["1", "2"])
+
+ # Transaction that will be prepared when new member joins, then committed.
+ t3 = DtxTestFixture(self, cluster[0], "t3")
+ t3.start()
+ t3.send(["1", "2"])
+ t3.end()
+ t3.prepare()
+ t1.verify(sessions, []) # Not visible outside of transaction
+
+ # Transaction that will be prepared when new member joins, then rolled back.
+ t4 = DtxTestFixture(self, cluster[0], "t4")
+ t4.start()
+ t4.send(["1", "2"])
+ t4.end()
+ t4.prepare()
+
+ # Transaction using an exclusive queue
+ t5 = DtxTestFixture(self, cluster[0], "t5", exclusive=True)
+ t5.start()
+ t5.send(["1", "2"])
+
+ # Accept messages in a transaction before/after join then commit
+ t6 = DtxTestFixture(self, cluster[0], "t6")
+ t6.send(["a","b","c"])
+ t6.start()
+ self.assertEqual(t6.accept().body, "a");
+
+ # Accept messages in a transaction before/after join then roll back
+ t7 = DtxTestFixture(self, cluster[0], "t7")
+ t7.send(["a","b","c"])
+ t7.start()
+ self.assertEqual(t7.accept().body, "a");
+
+ # Ended, suspended transactions across join.
+ t8 = DtxTestFixture(self, cluster[0], "t8")
+ t8.start(id="1")
+ t8.send(["x"])
+ t8.end(id="1", suspend=True)
+ t8.start(id="2")
+ t8.send(["y"])
+ t8.end(id="2")
+ t8.start()
+ t8.send("z")
+
+
+ # Start new cluster member
+ cluster.start()
+ sessions.append(cluster[1].connect().session())
+
+ # Commit t1
+ t1.send(["3","4"])
+ t1.verify(sessions, [])
+ t1.end()
+ t1.commit(one_phase=True)
+ t1.verify(sessions, ["1","2","3","4"])
+
+ # Rollback t2
+ t2.send(["3","4"])
+ t2.end()
+ t2.rollback()
+ t2.verify(sessions, [])
+
+ # Commit t3
+ t3.commit(one_phase=False)
+ t3.verify(sessions, ["1","2"])
+
+ # Rollback t4
+ t4.rollback()
+ t4.verify(sessions, [])
+
+ # Commit t5
+ t5.send(["3","4"])
+ t5.verify(sessions, [])
+ t5.end()
+ t5.commit(one_phase=True)
+ t5.verify(sessions, ["1","2","3","4"])
+
+ # Commit t6
+ self.assertEqual(t6.accept().body, "b");
+ t6.verify(sessions, ["c"])
+ t6.end()
+ t6.commit(one_phase=True)
+ t6.session.close() # Make sure they're not requeued by the session.
+ t6.verify(sessions, ["c"])
+
+ # Rollback t7
+ self.assertEqual(t7.accept().body, "b");
+ t7.end()
+ t7.rollback()
+ t7.verify(sessions, ["a", "b", "c"])
+
+ # Resume t8
+ t8.end()
+ t8.commit(one_phase=True)
+ t8.start("1", resume=True)
+ t8.end("1")
+ t8.commit("1", one_phase=True)
+ t8.commit("2", one_phase=True)
+ t8.verify(sessions, ["z", "x","y"])
+
+
+ def test_dtx_failover_rollback(self):
+ """Kill a broker during a transaction, verify we roll back correctly"""
+ cluster=self.cluster(1, expect=EXPECT_EXIT_FAIL)
+ cluster.start(expect=EXPECT_RUNNING)
+
+ # Test unprepared at crash
+ t1 = DtxTestFixture(self, cluster[0], "t1")
+ t1.send(["a"]) # Not in transaction
+ t1.start()
+ t1.send(["b"]) # In transaction
+
+ # Test prepared at crash
+ t2 = DtxTestFixture(self, cluster[0], "t2")
+ t2.send(["a"]) # Not in transaction
+ t2.start()
+ t2.send(["b"]) # In transaction
+ t2.end()
+ t2.prepare()
+
+ # Crash the broker
+ cluster[0].kill()
+
+ # Transactional changes should not appear
+ s = cluster[1].connect().session();
+ self.assert_browse(s, "t1", ["a"])
+ self.assert_browse(s, "t2", ["a"])
+
+ def test_dtx_timeout(self):
+ """Verify that dtx timeout works"""
+ cluster = self.cluster(1)
+ t1 = DtxTestFixture(self, cluster[0], "t1")
+ t1.start()
+ t1.set_timeout(1)
+ time.sleep(1.1)
+ try:
+ t1.end()
+ self.fail("Expected rollback timeout.")
+ except DtxStatusException, e:
+ self.assertEqual(e.actual, XA_RBTIMEOUT)
+
+class TxTests(BrokerTest):
+
+ def test_tx_update(self):
+ """Verify that transaction state is updated to a new broker"""
+
+ def make_message(session, body=None, key=None, id=None):
+ dp=session.delivery_properties(routing_key=key)
+ mp=session.message_properties(correlation_id=id)
+ return qpid.datatypes.Message(dp, mp, body)
+
+ cluster=self.cluster(1)
+ # Use old API. TX is not supported in messaging API.
+ c = cluster[0].connect_old()
+ s = c.session("tx-session", 1)
+ s.queue_declare(queue="q")
+ # Start transaction
+ s.tx_select()
+ s.message_transfer(message=make_message(s, "1", "q"))
+ # Start new member mid-transaction
+ cluster.start()
+ # Do more work
+ s.message_transfer(message=make_message(s, "2", "q"))
+ # Commit the transaction and verify the results.
+ s.tx_commit()
+ for b in cluster: self.assert_browse(b.connect().session(), "q", ["1","2"])
+
+
class LongTests(BrokerTest):
"""Tests that can run for a long time if -DDURATION=<minutes> is set"""
def duration(self):
@@ -316,22 +1023,28 @@ class LongTests(BrokerTest):
# Original cluster will all be killed so expect exit with failure
cluster = self.cluster(3, expect=EXPECT_EXIT_FAIL)
+ for b in cluster: b.ready() # Wait for brokers to be ready
for b in cluster: ErrorGenerator(b)
# Start sender and receiver threads
cluster[0].declare_queue("test-queue")
- sender = NumberedSender(cluster[1], 1000) # Max queue depth
- receiver = NumberedReceiver(cluster[2], sender)
+ sender = NumberedSender(cluster[0], 1000) # Max queue depth
+ receiver = NumberedReceiver(cluster[0], sender)
receiver.start()
sender.start()
+ # Wait for sender & receiver to get up and running
+ retry(lambda: receiver.received > 0)
# Kill original brokers, start new ones for the duration.
endtime = time.time() + self.duration()
i = 0
while time.time() < endtime:
+ sender.sender.assert_running()
+ receiver.receiver.assert_running()
cluster[i].kill()
i += 1
b = cluster.start(expect=EXPECT_EXIT_FAIL)
+ for b in cluster[i:]: b.ready()
ErrorGenerator(b)
time.sleep(5)
sender.stop()
@@ -362,24 +1075,24 @@ class LongTests(BrokerTest):
if self.stopped: break
self.process = self.broker.test.popen(
self.cmd, expect=EXPECT_UNKNOWN)
- finally: self.lock.release()
- try: exit = self.process.wait()
+ finally:
+ self.lock.release()
+ try:
+ exit = self.process.wait()
except OSError, e:
- # Seems to be a race in wait(), it throws
- # "no such process" during test shutdown.
- # Doesn't indicate a test error, ignore.
- return
+ # Process may already have been killed by self.stop()
+ break
except Exception, e:
self.process.unexpected(
"client of %s: %s"%(self.broker.name, e))
self.lock.acquire()
try:
- # Quit and ignore errors if stopped or expecting failure.
if self.stopped: break
if exit != 0:
self.process.unexpected(
"client of %s exit code %s"%(self.broker.name, exit))
- finally: self.lock.release()
+ finally:
+ self.lock.release()
except Exception, e:
self.error = RethrownException("Error in ClientLoop.run")
@@ -401,7 +1114,7 @@ class LongTests(BrokerTest):
args += ["--log-enable=trace+:management"]
# Use store if present.
if BrokerTest.store_lib: args +=["--load-module", BrokerTest.store_lib]
- cluster = self.cluster(3, args)
+ cluster = self.cluster(3, args, expect=EXPECT_EXIT_FAIL) # brokers will be killed
clients = [] # Per-broker list of clients that only connect to one broker.
mclients = [] # Management clients that connect to every broker in the cluster.
@@ -410,10 +1123,12 @@ class LongTests(BrokerTest):
"""Start ordinary clients for a broker."""
cmds=[
["qpid-tool", "localhost:%s"%(broker.port())],
- ["qpid-perftest", "--count", 50000,
+ ["qpid-perftest", "--count=5000", "--durable=yes",
"--base-name", str(qpid.datatypes.uuid4()), "--port", broker.port()],
- ["qpid-queue-stats", "-a", "localhost:%s" %(broker.port())],
- ["testagent", "localhost", str(broker.port())] ]
+ ["qpid-txtest", "--queue-base-name", "tx-%s"%str(qpid.datatypes.uuid4()),
+ "--port", broker.port()],
+ ["qpid-queue-stats", "-a", "localhost:%s" %(broker.port())]
+ ]
clients.append([ClientLoop(broker, cmd) for cmd in cmds])
def start_mclients(broker):
@@ -422,7 +1137,8 @@ class LongTests(BrokerTest):
mclients.append(ClientLoop(broker, cmd))
endtime = time.time() + self.duration()
- runtime = self.duration() / 4 # First run is longer, use quarter of duration.
+ # For long duration, first run is a quarter of the duration.
+ runtime = min(5.0, self.duration() / 3.0)
alive = 0 # First live cluster member
for i in range(len(cluster)): start_clients(cluster[i])
start_mclients(cluster[alive])
@@ -433,7 +1149,7 @@ class LongTests(BrokerTest):
for b in cluster[alive:]: b.ready() # Check if a broker crashed.
# Kill the first broker, expect the clients to fail.
b = cluster[alive]
- b.expect = EXPECT_EXIT_FAIL
+ b.ready()
b.kill()
# Stop the brokers clients and all the mclients.
for c in clients[alive] + mclients:
@@ -443,26 +1159,251 @@ class LongTests(BrokerTest):
mclients = []
# Start another broker and clients
alive += 1
- cluster.start()
+ cluster.start(expect=EXPECT_EXIT_FAIL)
+ cluster[-1].ready() # Wait till its ready
start_clients(cluster[-1])
start_mclients(cluster[alive])
for c in chain(mclients, *clients):
c.stop()
-
+ for b in cluster[alive:]:
+ b.ready() # Verify still alive
+ b.kill()
# Verify that logs are consistent
cluster_test_logs.verify_logs()
def test_management_qmf2(self):
self.test_management(args=["--mgmt-qmf2=yes"])
- def test_connect_consistent(self): # FIXME aconway 2011-01-18:
+ def test_connect_consistent(self):
args=["--mgmt-pub-interval=1","--log-enable=trace+:management"]
cluster = self.cluster(2, args=args)
end = time.time() + self.duration()
while (time.time() < end): # Get a management interval
for i in xrange(1000): cluster[0].connect().close()
+ cluster_test_logs.verify_logs()
+
+ def test_flowlimit_failover(self):
+ """Test fail-over during continuous send-receive with flow control
+ active.
+ """
+
+ # Original cluster will all be killed so expect exit with failure
+ cluster = self.cluster(3, expect=EXPECT_EXIT_FAIL)
+ for b in cluster: b.ready() # Wait for brokers to be ready
+
+ # create a queue with rather draconian flow control settings
+ ssn0 = cluster[0].connect().session()
+ s0 = ssn0.sender("test-queue; {create:always, node:{type:queue, x-declare:{arguments:{'qpid.flow_stop_count':2000, 'qpid.flow_resume_count':100}}}}")
+
+ receiver = NumberedReceiver(cluster[0])
+ receiver.start()
+ senders = [NumberedSender(cluster[0]) for i in range(1,3)]
+ for s in senders:
+ s.start()
+ # Wait for senders & receiver to get up and running
+ retry(lambda: receiver.received > 2*senders)
+
+ # Kill original brokers, start new ones for the duration.
+ endtime = time.time() + self.duration();
+ i = 0
+ while time.time() < endtime:
+ for s in senders: s.sender.assert_running()
+ receiver.receiver.assert_running()
+ for b in cluster[i:]: b.ready() # Check if any broker crashed.
+ cluster[i].kill()
+ i += 1
+ b = cluster.start(expect=EXPECT_EXIT_FAIL)
+ time.sleep(5)
+ for s in senders:
+ s.stop()
+ receiver.stop()
+ for i in range(i, len(cluster)): cluster[i].kill()
+
+ def test_ttl_failover(self):
+ """Test that messages with TTL don't cause problems in a cluster with failover"""
+
+ class Client(StoppableThread):
+
+ def __init__(self, broker):
+ StoppableThread.__init__(self)
+ self.connection = broker.connect(reconnect=True)
+ self.auto_fetch_reconnect_urls(self.connection)
+ self.session = self.connection.session()
+
+ def auto_fetch_reconnect_urls(self, conn):
+ """Replacment for qpid.messaging.util version which is noisy"""
+ ssn = conn.session("auto-fetch-reconnect-urls")
+ rcv = ssn.receiver("amq.failover")
+ rcv.capacity = 10
+
+ def main():
+ while True:
+ try:
+ msg = rcv.fetch()
+ qpid.messaging.util.set_reconnect_urls(conn, msg)
+ ssn.acknowledge(msg, sync=False)
+ except messaging.exceptions.LinkClosed: return
+ except messaging.exceptions.ConnectionError: return
+
+ thread = Thread(name="auto-fetch-reconnect-urls", target=main)
+ thread.setDaemon(True)
+ thread.start()
+
+ def stop(self):
+ StoppableThread.stop(self)
+ self.connection.detach()
+
+ class Sender(Client):
+ def __init__(self, broker, address):
+ Client.__init__(self, broker)
+ self.sent = 0 # Number of messages _reliably_ sent.
+ self.sender = self.session.sender(address, capacity=1000)
+
+ def send_counted(self, ttl):
+ self.sender.send(Message(str(self.sent), ttl=ttl))
+ self.sent += 1
+
+ def run(self):
+ while not self.stopped:
+ choice = random.randint(0,4)
+ if choice == 0: self.send_counted(None) # No ttl
+ elif choice == 1: self.send_counted(100000) # Large ttl
+ else: # Small ttl, might expire
+ self.sender.send(Message("", ttl=random.random()/10))
+ self.sender.send(Message("z"), sync=True) # Chaser.
+
+ class Receiver(Client):
+
+ def __init__(self, broker, address):
+ Client.__init__(self, broker)
+ self.received = 0 # Number of non-empty (reliable) messages received.
+ self.receiver = self.session.receiver(address, capacity=1000)
+ def run(self):
+ try:
+ while True:
+ m = self.receiver.fetch(1)
+ if m.content == "z": break
+ if m.content: # Ignore unreliable messages
+ # Ignore duplicates
+ if int(m.content) == self.received: self.received += 1
+ except Exception,e: self.error = e
+
+ # def test_ttl_failover
+
+ # Original cluster will all be killed so expect exit with failure
+ # Set small purge interval.
+ cluster = self.cluster(3, expect=EXPECT_EXIT_FAIL, args=["--queue-purge-interval=1"])
+ for b in cluster: b.ready() # Wait for brokers to be ready
+
+ # Python client failover produces noisy WARN logs, disable temporarily
+ logger = logging.getLogger()
+ log_level = logger.getEffectiveLevel()
+ logger.setLevel(logging.ERROR)
+ sender = None
+ receiver = None
+ try:
+ # Start sender and receiver threads
+ receiver = Receiver(cluster[0], "q;{create:always}")
+ receiver.start()
+ sender = Sender(cluster[0], "q;{create:always}")
+ sender.start()
+ # Wait for sender & receiver to get up and running
+ retry(lambda: receiver.received > 0)
+
+ # Kill brokers in a cycle.
+ endtime = time.time() + self.duration()
+ runtime = min(5.0, self.duration() / 4.0)
+ i = 0
+ while time.time() < endtime:
+ for b in cluster[i:]: b.ready() # Check if any broker crashed.
+ cluster[i].kill()
+ i += 1
+ b = cluster.start(expect=EXPECT_EXIT_FAIL)
+ b.ready()
+ time.sleep(runtime)
+ sender.stop()
+ receiver.stop()
+ for b in cluster[i:]:
+ b.ready() # Check it didn't crash
+ b.kill()
+ self.assertEqual(sender.sent, receiver.received)
cluster_test_logs.verify_logs()
+ finally:
+ # Detach to avoid slow reconnect attempts during shut-down if test fails.
+ if sender: sender.connection.detach()
+ if receiver: receiver.connection.detach()
+ logger.setLevel(log_level)
+
+ def test_msg_group_failover(self):
+ """Test fail-over during continuous send-receive of grouped messages.
+ """
+
+ class GroupedTrafficGenerator(Thread):
+ def __init__(self, url, queue, group_key):
+ Thread.__init__(self)
+ self.url = url
+ self.queue = queue
+ self.group_key = group_key
+ self.status = -1
+
+ def run(self):
+ # generate traffic for approx 10 seconds (2011msgs / 200 per-sec)
+ cmd = ["msg_group_test",
+ "--broker=%s" % self.url,
+ "--address=%s" % self.queue,
+ "--connection-options={%s}" % (Cluster.CONNECTION_OPTIONS),
+ "--group-key=%s" % self.group_key,
+ "--receivers=2",
+ "--senders=3",
+ "--messages=2011",
+ "--send-rate=200",
+ "--capacity=11",
+ "--ack-frequency=23",
+ "--allow-duplicates",
+ "--group-size=37",
+ "--randomize-group-size",
+ "--interleave=13"]
+ # "--trace"]
+ self.generator = Popen( cmd );
+ self.status = self.generator.wait()
+ return self.status
+
+ def results(self):
+ self.join(timeout=30) # 3x assumed duration
+ if self.isAlive(): return -1
+ return self.status
+
+ # Original cluster will all be killed so expect exit with failure
+ cluster = self.cluster(3, expect=EXPECT_EXIT_FAIL, args=["-t"])
+ for b in cluster: b.ready() # Wait for brokers to be ready
+
+ # create a queue with rather draconian flow control settings
+ ssn0 = cluster[0].connect().session()
+ q_args = "{'qpid.group_header_key':'group-id', 'qpid.shared_msg_group':1}"
+ s0 = ssn0.sender("test-group-q; {create:always, node:{type:queue, x-declare:{arguments:%s}}}" % q_args)
+
+ # Kill original brokers, start new ones for the duration.
+ endtime = time.time() + self.duration();
+ i = 0
+ while time.time() < endtime:
+ traffic = GroupedTrafficGenerator( cluster[i].host_port(),
+ "test-group-q", "group-id" )
+ traffic.start()
+ time.sleep(1)
+
+ for x in range(2):
+ for b in cluster[i:]: b.ready() # Check if any broker crashed.
+ cluster[i].kill()
+ i += 1
+ b = cluster.start(expect=EXPECT_EXIT_FAIL)
+ time.sleep(1)
+
+ # wait for traffic to finish, verify success
+ self.assertEqual(0, traffic.results())
+
+ for i in range(i, len(cluster)): cluster[i].kill()
+
class StoreTests(BrokerTest):
"""
Propchange: qpid/branches/QPID-2519/cpp/src/tests/cluster_tests.py
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Fri Oct 21 14:42:12 2011
@@ -0,0 +1,3 @@
+/qpid/branches/qpid-2935/qpid/cpp/src/tests/cluster_tests.py:1061302-1072333
+/qpid/branches/qpid-3346/qpid/cpp/src/tests/cluster_tests.py:1144319-1179855
+/qpid/trunk/qpid/cpp/src/tests/cluster_tests.py:1072051-1187351
Modified: qpid/branches/QPID-2519/cpp/src/tests/exception_test.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/tests/exception_test.cpp?rev=1187375&r1=1187374&r2=1187375&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/tests/exception_test.cpp (original)
+++ qpid/branches/QPID-2519/cpp/src/tests/exception_test.cpp Fri Oct 21 14:42:12 2011
@@ -92,32 +92,30 @@ QPID_AUTO_TEST_CASE(TestSessionBusy) {
}
QPID_AUTO_TEST_CASE(DisconnectedPop) {
- ProxySessionFixture fix;
- ProxyConnection c(fix.broker->getPort(Broker::TCP_TRANSPORT));
+ SessionFixture fix;
fix.session.queueDeclare(arg::queue="q");
fix.subs.subscribe(fix.lq, "q");
Catcher<TransportFailure> pop(bind(&LocalQueue::pop, &fix.lq, sys::TIME_SEC));
- fix.connection.proxy.close();
+ fix.shutdownBroker();
BOOST_CHECK(pop.join());
}
QPID_AUTO_TEST_CASE(DisconnectedListen) {
- ProxySessionFixture fix;
+ SessionFixture fix;
struct NullListener : public MessageListener {
void received(Message&) { BOOST_FAIL("Unexpected message"); }
} l;
- ProxyConnection c(fix.broker->getPort(Broker::TCP_TRANSPORT));
fix.session.queueDeclare(arg::queue="q");
fix.subs.subscribe(l, "q");
Catcher<TransportFailure> runner(bind(&SubscriptionManager::run, boost::ref(fix.subs)));
- fix.connection.proxy.close();
- runner.join();
+ fix.shutdownBroker();
+ runner.join();
BOOST_CHECK_THROW(fix.session.queueDeclare(arg::queue="x"), TransportFailure);
}
QPID_AUTO_TEST_CASE(NoSuchQueueTest) {
- ProxySessionFixture fix;
+ SessionFixture fix;
ScopedSuppressLogging sl; // Suppress messages for expected errors.
BOOST_CHECK_THROW(fix.subs.subscribe(fix.lq, "no such queue"), NotFoundException);
}
Modified: qpid/branches/QPID-2519/cpp/src/tests/federated_topic_test
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/tests/federated_topic_test?rev=1187375&r1=1187374&r2=1187375&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/tests/federated_topic_test (original)
+++ qpid/branches/QPID-2519/cpp/src/tests/federated_topic_test Fri Oct 21 14:42:12 2011
@@ -42,13 +42,12 @@ while getopts "s:m:b:" opt ; do
esac
done
-MY_DIR=$(dirname $(which $0))
source ./test_env.sh
trap stop_brokers EXIT
start_broker() {
- ${MY_DIR}/../qpidd --daemon --port 0 --no-module-dir --no-data-dir --auth no > qpidd.port
+ $QPIDD_EXEC --daemon --port 0 --no-module-dir --no-data-dir --auth no > qpidd.port
}
start_brokers() {
@@ -76,39 +75,39 @@ subscribe() {
echo Subscriber $1 connecting on $MY_PORT
LOG="subscriber_$1.log"
- ${MY_DIR}/topic_listener -p $MY_PORT > $LOG 2>&1 && rm -f $LOG
+ ./qpid-topic-listener -p $MY_PORT > $LOG 2>&1 && rm -f $LOG
}
publish() {
- ${MY_DIR}/topic_publisher --messages $MESSAGES --batches $BATCHES --subscribers $SUBSCRIBERS -p $PORT_A
+ ./qpid-topic-publisher --messages $MESSAGES --batches $BATCHES --subscribers $SUBSCRIBERS -p $PORT_A
}
setup_routes() {
- BROKER_A="localhost:$PORT_A"
- BROKER_B="localhost:$PORT_B"
- BROKER_C="localhost:$PORT_C"
+ BROKER_A="daffodil:$PORT_A"
+ BROKER_B="daffodil:$PORT_B"
+ BROKER_C="daffodil:$PORT_C"
if (($VERBOSE)); then
echo "Establishing routes for topic..."
fi
- $PYTHON_COMMANDS/qpid-route route add $BROKER_B $BROKER_A amq.topic topic_control B B
- $PYTHON_COMMANDS/qpid-route route add $BROKER_C $BROKER_B amq.topic topic_control C C
+ $QPID_ROUTE_EXEC route add $BROKER_B $BROKER_A amq.topic topic_control B B
+ $QPID_ROUTE_EXEC route add $BROKER_C $BROKER_B amq.topic topic_control C C
if (($VERBOSE)); then
echo "linked A->B->C"
fi
- $PYTHON_COMMANDS/qpid-route route add $BROKER_B $BROKER_C amq.topic topic_control B B
- $PYTHON_COMMANDS/qpid-route route add $BROKER_A $BROKER_B amq.topic topic_control A A
+ $QPID_ROUTE_EXEC route add $BROKER_B $BROKER_C amq.topic topic_control B B
+ $QPID_ROUTE_EXEC route add $BROKER_A $BROKER_B amq.topic topic_control A A
if (($VERBOSE)); then
echo "linked C->B->A"
echo "Establishing routes for response queue..."
fi
- $PYTHON_COMMANDS/qpid-route route add $BROKER_B $BROKER_C amq.direct response B B
- $PYTHON_COMMANDS/qpid-route route add $BROKER_A $BROKER_B amq.direct response A A
+ $QPID_ROUTE_EXEC route add $BROKER_B $BROKER_C amq.direct response B B
+ $QPID_ROUTE_EXEC route add $BROKER_A $BROKER_B amq.direct response A A
if (($VERBOSE)); then
echo "linked C->B->A"
for b in $BROKER_A $BROKER_B $BROKER_C; do
echo "Routes for $b"
- $PYTHON_COMMANDS/qpid-route route list $b
+ $QPID_ROUTE_EXEC route list $b
done
fi
}
Modified: qpid/branches/QPID-2519/cpp/src/tests/federation.py
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/tests/federation.py?rev=1187375&r1=1187374&r2=1187375&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/tests/federation.py (original)
+++ qpid/branches/QPID-2519/cpp/src/tests/federation.py Fri Oct 21 14:42:12 2011
@@ -23,7 +23,7 @@ from qpid.testlib import TestBase010
from qpid.datatypes import Message
from qpid.queue import Empty
from qpid.util import URL
-from time import sleep
+from time import sleep, time
class _FedBroker(object):
@@ -111,18 +111,18 @@ class FederationTests(TestBase010):
broker = qmf.getObjects(_class="broker")[0]
result = broker.connect(self.remote_host(), self.remote_port(), False, "PLAIN", "guest", "guest", "tcp")
- self.assertEqual(result.status, 0)
+ self.assertEqual(result.status, 0, result)
link = qmf.getObjects(_class="link")[0]
result = link.bridge(False, "amq.direct", "amq.direct", "my-key", "", "", False, False, False, 0)
- self.assertEqual(result.status, 0)
+ self.assertEqual(result.status, 0, result)
bridge = qmf.getObjects(_class="bridge")[0]
result = bridge.close()
- self.assertEqual(result.status, 0)
+ self.assertEqual(result.status, 0, result)
result = link.close()
- self.assertEqual(result.status, 0)
+ self.assertEqual(result.status, 0, result)
self.verify_cleanup()
@@ -133,11 +133,11 @@ class FederationTests(TestBase010):
qmf = self.qmf
broker = qmf.getObjects(_class="broker")[0]
result = broker.connect(self.remote_host(), self.remote_port(), False, "PLAIN", "guest", "guest", "tcp")
- self.assertEqual(result.status, 0)
+ self.assertEqual(result.status, 0, result)
link = qmf.getObjects(_class="link")[0]
result = link.bridge(False, "amq.direct", "amq.fanout", "my-key", "", "", False, False, False, 0)
- self.assertEqual(result.status, 0)
+ self.assertEqual(result.status, 0, result)
bridge = qmf.getObjects(_class="bridge")[0]
@@ -165,9 +165,9 @@ class FederationTests(TestBase010):
except Empty: None
result = bridge.close()
- self.assertEqual(result.status, 0)
+ self.assertEqual(result.status, 0, result)
result = link.close()
- self.assertEqual(result.status, 0)
+ self.assertEqual(result.status, 0, result)
self.verify_cleanup()
@@ -178,11 +178,11 @@ class FederationTests(TestBase010):
qmf = self.qmf
broker = qmf.getObjects(_class="broker")[0]
result = broker.connect(self.remote_host(), self.remote_port(), False, "PLAIN", "guest", "guest", "tcp")
- self.assertEqual(result.status, 0)
+ self.assertEqual(result.status, 0, result)
link = qmf.getObjects(_class="link")[0]
result = link.bridge(False, "amq.direct", "amq.fanout", "my-key", "", "", False, True, False, 0)
- self.assertEqual(result.status, 0)
+ self.assertEqual(result.status, 0, result)
bridge = qmf.getObjects(_class="bridge")[0]
@@ -209,9 +209,9 @@ class FederationTests(TestBase010):
except Empty: None
result = bridge.close()
- self.assertEqual(result.status, 0)
+ self.assertEqual(result.status, 0, result)
result = link.close()
- self.assertEqual(result.status, 0)
+ self.assertEqual(result.status, 0, result)
self.verify_cleanup()
@@ -236,11 +236,11 @@ class FederationTests(TestBase010):
qmf = self.qmf
broker = qmf.getObjects(_class="broker")[0]
result = broker.connect(self.remote_host(), self.remote_port(), False, "PLAIN", "guest", "guest", "tcp")
- self.assertEqual(result.status, 0)
+ self.assertEqual(result.status, 0, result)
link = qmf.getObjects(_class="link")[0]
result = link.bridge(False, "my-bridge-queue", "amq.fanout", "my-key", "", "", True, False, False, 1)
- self.assertEqual(result.status, 0)
+ self.assertEqual(result.status, 0, result)
bridge = qmf.getObjects(_class="bridge")[0]
sleep(3)
@@ -262,6 +262,63 @@ class FederationTests(TestBase010):
except Empty: None
result = bridge.close()
+ self.assertEqual(result.status, 0, result)
+ result = link.close()
+ self.assertEqual(result.status, 0, result)
+
+ self.verify_cleanup()
+
+ def test_pull_from_queue_recovery(self):
+ session = self.session
+
+ #setup queue on remote broker and add some messages
+ r_conn = self.connect(host=self.remote_host(), port=self.remote_port())
+ r_session = r_conn.session("test_pull_from_queue_recovery")
+ r_session.queue_declare(queue="my-bridge-queue", auto_delete=True)
+ for i in range(1, 6):
+ dp = r_session.delivery_properties(routing_key="my-bridge-queue")
+ r_session.message_transfer(message=Message(dp, "Message %d" % i))
+
+ #setup queue to receive messages from local broker
+ session.queue_declare(queue="fed1", exclusive=True, auto_delete=True)
+ session.exchange_bind(queue="fed1", exchange="amq.fanout")
+ self.subscribe(queue="fed1", destination="f1")
+ queue = session.incoming("f1")
+
+ self.startQmf()
+ qmf = self.qmf
+ broker = qmf.getObjects(_class="broker")[0]
+ result = broker.connect(self.remote_host(), self.remote_port(), False, "PLAIN", "guest", "guest", "tcp")
+ self.assertEqual(result.status, 0, result)
+
+ link = qmf.getObjects(_class="link")[0]
+ result = link.bridge(False, "my-bridge-queue", "amq.fanout", "my-key", "", "", True, False, False, 1)
+ self.assertEqual(result.status, 0, result)
+
+ bridge = qmf.getObjects(_class="bridge")[0]
+ sleep(5)
+
+ #recreate the remote bridge queue to invalidate the bridge session
+ r_session.queue_delete (queue="my-bridge-queue", if_empty=False, if_unused=False)
+ r_session.queue_declare(queue="my-bridge-queue", auto_delete=True)
+
+ #add some more messages (i.e. after bridge was created)
+ for i in range(6, 11):
+ dp = r_session.delivery_properties(routing_key="my-bridge-queue")
+ r_session.message_transfer(message=Message(dp, "Message %d" % i))
+
+ for i in range(1, 11):
+ try:
+ msg = queue.get(timeout=5)
+ self.assertEqual("Message %d" % i, msg.body)
+ except Empty:
+ self.fail("Failed to find expected message containing 'Message %d'" % i)
+ try:
+ extra = queue.get(timeout=1)
+ self.fail("Got unexpected message in queue: " + extra.body)
+ except Empty: None
+
+ result = bridge.close()
self.assertEqual(result.status, 0)
result = link.close()
self.assertEqual(result.status, 0)
@@ -649,10 +706,17 @@ class FederationTests(TestBase010):
self.verify_cleanup()
- def test_dynamic_headers(self):
+ def test_dynamic_headers_any(self):
+ self.do_test_dynamic_headers('any')
+
+ def test_dynamic_headers_all(self):
+ self.do_test_dynamic_headers('all')
+
+
+ def do_test_dynamic_headers(self, match_mode):
session = self.session
r_conn = self.connect(host=self.remote_host(), port=self.remote_port())
- r_session = r_conn.session("test_dynamic_headers")
+ r_session = r_conn.session("test_dynamic_headers_%s" % match_mode)
session.exchange_declare(exchange="fed.headers", type="headers")
r_session.exchange_declare(exchange="fed.headers", type="headers")
@@ -671,7 +735,7 @@ class FederationTests(TestBase010):
sleep(5)
session.queue_declare(queue="fed1", exclusive=True, auto_delete=True)
- session.exchange_bind(queue="fed1", exchange="fed.headers", binding_key="key1", arguments={'x-match':'any', 'class':'first'})
+ session.exchange_bind(queue="fed1", exchange="fed.headers", binding_key="key1", arguments={'x-match':match_mode, 'class':'first'})
self.subscribe(queue="fed1", destination="f1")
queue = session.incoming("f1")
@@ -1791,3 +1855,301 @@ class FederationTests(TestBase010):
if headers:
return headers[name]
return None
+
+ def test_dynamic_topic_bounce(self):
+ """ Bounce the connection between federated Topic Exchanges.
+ """
+ class Params:
+ def exchange_type(self): return "topic"
+ def bind_queue(self, ssn, qname, ename):
+ ssn.exchange_bind(queue=qname, exchange=ename,
+ binding_key="spud.*")
+ def unbind_queue(self, ssn, qname, ename):
+ ssn.exchange_unbind(queue=qname, exchange=ename, binding_key="spud.*")
+ def delivery_properties(self, ssn):
+ return ssn.delivery_properties(routing_key="spud.boy")
+
+ self.generic_dynamic_bounce_test(Params())
+
+ def test_dynamic_direct_bounce(self):
+ """ Bounce the connection between federated Direct Exchanges.
+ """
+ class Params:
+ def exchange_type(self): return "direct"
+ def bind_queue(self, ssn, qname, ename):
+ ssn.exchange_bind(queue=qname, exchange=ename, binding_key="spud")
+ def unbind_queue(self, ssn, qname, ename):
+ ssn.exchange_unbind(queue=qname, exchange=ename, binding_key="spud")
+ def delivery_properties(self, ssn):
+ return ssn.delivery_properties(routing_key="spud")
+ self.generic_dynamic_bounce_test(Params())
+
+ def test_dynamic_fanout_bounce(self):
+ """ Bounce the connection between federated Fanout Exchanges.
+ """
+ class Params:
+ def exchange_type(self): return "fanout"
+ def bind_queue(self, ssn, qname, ename):
+ ssn.exchange_bind(queue=qname, exchange=ename)
+ def unbind_queue(self, ssn, qname, ename):
+ ssn.exchange_unbind(queue=qname, exchange=ename)
+ def delivery_properties(self, ssn):
+ return ssn.delivery_properties(routing_key="spud")
+ self.generic_dynamic_bounce_test(Params())
+
+ def test_dynamic_headers_bounce(self):
+ """ Bounce the connection between federated Headers Exchanges.
+ """
+ class Params:
+ def exchange_type(self): return "headers"
+ def bind_queue(self, ssn, qname, ename):
+ ssn.exchange_bind(queue=qname, exchange=ename,
+ binding_key="spud", arguments={'x-match':'any', 'class':'first'})
+ def unbind_queue(self, ssn, qname, ename):
+ ssn.exchange_unbind(queue=qname, exchange=ename, binding_key="spud")
+ def delivery_properties(self, ssn):
+ return ssn.message_properties(application_headers={'class':'first'})
+ ## @todo KAG - re-enable once federation bugs with headers exchanges
+ ## are fixed.
+ #self.generic_dynamic_bounce_test(Params())
+ return
+
+
+ def generic_dynamic_bounce_test(self, params):
+ """ Verify that a federated broker can maintain a binding to a local
+ queue using the same key as a remote binding. Destroy and reconnect
+ the federation link, and verify routes are restored correctly.
+ See QPID-3170.
+ Topology:
+
+ Queue1 <---"Key"---B0<==[Federated Exchange]==>B1---"Key"--->Queue2
+ """
+ session = self.session
+
+ # create the federation
+
+ self.startQmf()
+ qmf = self.qmf
+
+ self._setup_brokers()
+
+ # create exchange on each broker, and retrieve the corresponding
+ # management object for that exchange
+
+ exchanges=[]
+ for _b in self._brokers[0:2]:
+ _b.client_session.exchange_declare(exchange="fedX", type=params.exchange_type())
+ self.assertEqual(_b.client_session.exchange_query(name="fedX").type,
+ params.exchange_type(), "exchange_declare failed!")
+ # pull the exchange out of qmf...
+ retries = 0
+ my_exchange = None
+ timeout = time() + 10
+ while my_exchange is None and time() <= timeout:
+ objs = qmf.getObjects(_broker=_b.qmf_broker, _class="exchange")
+ for ooo in objs:
+ if ooo.name == "fedX":
+ my_exchange = ooo
+ break
+ if my_exchange is None:
+ self.fail("QMF failed to find new exchange!")
+ exchanges.append(my_exchange)
+
+ #
+ # on each broker, create a local queue bound to the exchange with the
+ # same key value.
+ #
+
+ self._brokers[0].client_session.queue_declare(queue="fedX1", exclusive=True, auto_delete=True)
+ params.bind_queue(self._brokers[0].client_session, "fedX1", "fedX")
+ self.subscribe(self._brokers[0].client_session, queue="fedX1", destination="f1")
+ queue_0 = self._brokers[0].client_session.incoming("f1")
+
+ self._brokers[1].client_session.queue_declare(queue="fedX1", exclusive=True, auto_delete=True)
+ params.bind_queue(self._brokers[1].client_session, "fedX1", "fedX")
+ self.subscribe(self._brokers[1].client_session, queue="fedX1", destination="f1")
+ queue_1 = self._brokers[1].client_session.incoming("f1")
+
+ # now federate the two brokers
+
+ # connect B0 --> B1
+ result = self._brokers[1].qmf_object.connect(self._brokers[0].host,
+ self._brokers[0].port,
+ False, "PLAIN", "guest", "guest", "tcp")
+ self.assertEqual(result.status, 0)
+
+ # connect B1 --> B0
+ result = self._brokers[0].qmf_object.connect(self._brokers[1].host,
+ self._brokers[1].port,
+ False, "PLAIN", "guest", "guest", "tcp")
+ self.assertEqual(result.status, 0)
+
+ # for each link, bridge the "fedX" exchanges:
+
+ for _l in qmf.getObjects(_class="link"):
+ # print("Link=%s:%s %s" % (_l.host, _l.port, str(_l.getBroker())))
+ result = _l.bridge(False, # durable
+ "fedX", # src
+ "fedX", # dst
+ "", # key
+ "", # tag
+ "", # excludes
+ False, # srcIsQueue
+ False, # srcIsLocal
+ True, # dynamic
+ 0) # sync
+ self.assertEqual(result.status, 0)
+
+ # wait for all the inter-broker links to become operational
+ operational = False
+ timeout = time() + 10
+ while not operational and time() <= timeout:
+ operational = True
+ for _l in qmf.getObjects(_class="link"):
+ #print("Link=%s:%s %s" % (_l.host, _l.port, str(_l.state)))
+ if _l.state != "Operational":
+ operational = False
+ self.failUnless(operational, "inter-broker links failed to become operational.")
+
+ # @todo - There is no way to determine when the bridge objects become
+ # active.
+
+ # wait until the binding key has propagated to each broker - each
+ # broker should see 2 bindings (1 local, 1 remote)
+
+ binding_counts = [2, 2]
+ self.assertEqual(len(binding_counts), len(exchanges), "Update Test!")
+ for i in range(2):
+ exchanges[i].update()
+ timeout = time() + 10
+ while exchanges[i].bindingCount < binding_counts[i] and time() <= timeout:
+ exchanges[i].update()
+ self.failUnless(exchanges[i].bindingCount == binding_counts[i])
+
+ # send 10 msgs to B0
+ for i in range(1, 11):
+ # dp = self._brokers[0].client_session.delivery_properties(routing_key=params.routing_key())
+ dp = params.delivery_properties(self._brokers[0].client_session)
+ self._brokers[0].client_session.message_transfer(destination="fedX", message=Message(dp, "Message_trp %d" % i))
+
+ # get exactly 10 msgs on B0's local queue and B1's queue
+ for i in range(1, 11):
+ try:
+ msg = queue_0.get(timeout=5)
+ self.assertEqual("Message_trp %d" % i, msg.body)
+ msg = queue_1.get(timeout=5)
+ self.assertEqual("Message_trp %d" % i, msg.body)
+ except Empty:
+ self.fail("Only got %d msgs - expected 10" % i)
+ try:
+ extra = queue_0.get(timeout=1)
+ self.fail("Got unexpected message in queue_0: " + extra.body)
+ except Empty: None
+
+ try:
+ extra = queue_1.get(timeout=1)
+ self.fail("Got unexpected message in queue_1: " + extra.body)
+ except Empty: None
+
+ #
+ # Tear down the bridges between the two exchanges, then wait
+ # for the bindings to be cleaned up
+ #
+
+ for _b in qmf.getObjects(_class="bridge"):
+ result = _b.close()
+ self.assertEqual(result.status, 0)
+
+ binding_counts = [1, 1]
+ self.assertEqual(len(binding_counts), len(exchanges), "Update Test!")
+ for i in range(2):
+ exchanges[i].update()
+ timeout = time() + 10
+ while exchanges[i].bindingCount != binding_counts[i] and time() <= timeout:
+ exchanges[i].update()
+ self.failUnless(exchanges[i].bindingCount == binding_counts[i])
+
+ #
+ # restore the bridges between the two exchanges, and wait for the
+ # bindings to propagate.
+ #
+
+ for _l in qmf.getObjects(_class="link"):
+ # print("Link=%s:%s %s" % (_l.host, _l.port, str(_l.getBroker())))
+ result = _l.bridge(False, # durable
+ "fedX", # src
+ "fedX", # dst
+ "", # key
+ "", # tag
+ "", # excludes
+ False, # srcIsQueue
+ False, # srcIsLocal
+ True, # dynamic
+ 0) # sync
+ self.assertEqual(result.status, 0)
+
+ binding_counts = [2, 2]
+ self.assertEqual(len(binding_counts), len(exchanges), "Update Test!")
+ for i in range(2):
+ exchanges[i].update()
+ timeout = time() + 10
+ while exchanges[i].bindingCount != binding_counts[i] and time() <= timeout:
+ exchanges[i].update()
+ self.failUnless(exchanges[i].bindingCount == binding_counts[i])
+
+ #
+ # verify traffic flows correctly
+ #
+
+ for i in range(1, 11):
+ #dp = self._brokers[1].client_session.delivery_properties(routing_key=params.routing_key())
+ dp = params.delivery_properties(self._brokers[1].client_session)
+ self._brokers[1].client_session.message_transfer(destination="fedX", message=Message(dp, "Message_trp %d" % i))
+
+ # get exactly 10 msgs on B0's queue and B1's queue
+ for i in range(1, 11):
+ try:
+ msg = queue_0.get(timeout=5)
+ self.assertEqual("Message_trp %d" % i, msg.body)
+ msg = queue_1.get(timeout=5)
+ self.assertEqual("Message_trp %d" % i, msg.body)
+ except Empty:
+ self.fail("Only got %d msgs - expected 10" % i)
+ try:
+ extra = queue_0.get(timeout=1)
+ self.fail("Got unexpected message in queue_0: " + extra.body)
+ except Empty: None
+
+ try:
+ extra = queue_1.get(timeout=1)
+ self.fail("Got unexpected message in queue_1: " + extra.body)
+ except Empty: None
+
+
+ #
+ # cleanup
+ #
+ params.unbind_queue(self._brokers[0].client_session, "fedX1", "fedX")
+ self._brokers[0].client_session.message_cancel(destination="f1")
+ self._brokers[0].client_session.queue_delete(queue="fedX1")
+
+ params.unbind_queue(self._brokers[1].client_session, "fedX1", "fedX")
+ self._brokers[1].client_session.message_cancel(destination="f1")
+ self._brokers[1].client_session.queue_delete(queue="fedX1")
+
+ for _b in qmf.getObjects(_class="bridge"):
+ result = _b.close()
+ self.assertEqual(result.status, 0)
+
+ for _l in qmf.getObjects(_class="link"):
+ result = _l.close()
+ self.assertEqual(result.status, 0)
+
+ for _b in self._brokers[0:2]:
+ _b.client_session.exchange_delete(exchange="fedX")
+
+ self._teardown_brokers()
+
+ self.verify_cleanup()
+
+
Modified: qpid/branches/QPID-2519/cpp/src/tests/python_tests
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/tests/python_tests?rev=1187375&r1=1187374&r2=1187375&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/tests/python_tests (original)
+++ qpid/branches/QPID-2519/cpp/src/tests/python_tests Fri Oct 21 14:42:12 2011
@@ -1,4 +1,4 @@
-#!/bin/sh
+#!/bin/bash
#
# Licensed to the Apache Software Foundation (ASF) under one
Modified: qpid/branches/QPID-2519/cpp/src/tests/qpid-cluster-benchmark
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/tests/qpid-cluster-benchmark?rev=1187375&r1=1187374&r2=1187375&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/tests/qpid-cluster-benchmark (original)
+++ qpid/branches/QPID-2519/cpp/src/tests/qpid-cluster-benchmark Fri Oct 21 14:42:12 2011
@@ -7,9 +7,9 @@
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
-#
+#
# http://www.apache.org/licenses/LICENSE-2.0
-#
+#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -19,21 +19,40 @@
#
# Benchmark script for comparing cluster performance.
-#PORT=":5555"
-BROKER=`echo $HOSTS | awk '{print $1}'` # Single broker
-BROKERS=`echo $HOSTS | sed "s/\>/$PORT/g;s/ /,/g"` # Broker URL list
-COUNT=100000
-RATE=20000 # Rate to throttle senders for latency results
-run_test() { echo $*; "$@"; echo; echo; echo; }
-# Thruput, unshared queue
-run_test qpid-cpp-benchmark --repeat 10 -b $BROKER --no-timestamp -m $COUNT
+# Default values
+PORT="5672"
+COUNT=10000
+FLOW=100 # Flow control limit on queue depth for latency.
+REPEAT=10
+QUEUES=4
+CLIENTS=3
-# Latency
-run_test qpid-cpp-benchmark --repeat 10 -b $BROKER --connection-options '{tcp-nodelay:true}' -m `expr $COUNT / 2` --send-rate $RATE
+while getopts "p:c:f:r:t:b:q:c" opt; do
+ case $opt in
+ p) PORT=$OPTARG;;
+ c) COUNT=$OPTARG;;
+ f) FLOW=$OPTARG;;
+ r) REPEAT=$OPTARG;;
+ s) SCALE=$OPTARG;;
+ b) BROKERS=$OPTARG;;
+ q) QUEUES=$OPTARG;;
+ c) CLIENTS=$OPTARG;;
+ *) echo "Unknown option"; exit 1;;
+ esac
+done
+
+BROKERS=${BROKERS:-$(echo $HOSTS | sed "s/\>/:$PORT/g;s/ /,/g")} # Broker URL list
+BROKER=`echo $BROKERS | awk -F, '{print $1}'` # First broker
+
+run_test() { echo $*; shift; "$@"; echo; echo; echo; }
# Multiple pubs/subs connect via multiple brokers (active-active)
-run_test qpid-cpp-benchmark --repeat 10 -b $BROKERS --no-timestamp --summarize -s10 -r10 -m `expr $COUNT / 10`
+run_test "multi-host-thruput" qpid-cpp-benchmark --repeat $REPEAT -b $BROKERS --no-timestamp --summarize -q$QUEUES -s$CLIENTS -r$CLIENTS -m $COUNT
# Multiple pubs/subs connect via single broker (active-passive)
-run_test qpid-cpp-benchmark --repeat 10 -b $BROKER --no-timestamp --summarize -s10 -r10 -m `expr $COUNT / 10`
+run_test "single-host-thruput" qpid-cpp-benchmark --repeat $REPEAT -b $BROKER --no-timestamp --summarize -q$QUEUES -s$CLIENTS -r$CLIENTS -m $COUNT
+
+# Latency
+run_test "latency" qpid-cpp-benchmark --repeat $REPEAT -b $BROKER --connection-options '{tcp-nodelay:true}' -m $COUNT --flow-control $FLOW
+
Modified: qpid/branches/QPID-2519/cpp/src/tests/qpid-cpp-benchmark
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/tests/qpid-cpp-benchmark?rev=1187375&r1=1187374&r2=1187375&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/tests/qpid-cpp-benchmark (original)
+++ qpid/branches/QPID-2519/cpp/src/tests/qpid-cpp-benchmark Fri Oct 21 14:42:12 2011
@@ -77,6 +77,20 @@ def ssh_command(host, command):
"""Convert command into an ssh command on host with quoting"""
return ["ssh", host] + [posix_quote(arg) for arg in command]
+class Clients:
+ def __init__(self): self.clients=[]
+
+ def add(self, client):
+ self.clients.append(client)
+ return client
+
+ def kill(self):
+ for c in self.clients:
+ try: c.kill()
+ except: pass
+
+clients = Clients()
+
def start_receive(queue, index, opts, ready_queue, broker, host):
address_opts=["create:receiver"] + opts.receive_option
if opts.durable: address_opts += ["node:{durable:true}"]
@@ -101,7 +115,7 @@ def start_receive(queue, index, opts, re
if opts.connection_options:
command += ["--connection-options",opts.connection_options]
if host: command = ssh_command(host, command)
- return Popen(command, stdout=PIPE)
+ return clients.add(Popen(command, stdout=PIPE))
def start_send(queue, opts, broker, host):
address="%s;{%s}"%(queue,",".join(opts.send_option))
@@ -122,7 +136,7 @@ def start_send(queue, opts, broker, host
if opts.connection_options:
command += ["--connection-options",opts.connection_options]
if host: command = ssh_command(host, command)
- return Popen(command, stdout=PIPE)
+ return clients.add(Popen(command, stdout=PIPE))
def first_line(p):
out,err=p.communicate()
@@ -133,7 +147,11 @@ def delete_queues(queues, broker):
c = qpid.messaging.Connection(broker)
c.open()
for q in queues:
- try: s = c.session().sender("%s;{delete:always}"%(q))
+ try:
+ s = c.session()
+ snd = s.sender("%s;{delete:always}"%(q))
+ snd.close()
+ s.sync()
except qpid.messaging.exceptions.NotFound: pass # Ignore "no such queue"
c.close()
@@ -145,7 +163,6 @@ def print_header(timestamp):
def parse(parser, lines): # Parse sender/receiver output
for l in lines:
fn_val = zip(parser, l)
-
return [map(lambda p: p[0](p[1]), zip(parser,line.split())) for line in lines]
def parse_senders(senders):
@@ -156,11 +173,12 @@ def parse_receivers(receivers):
def print_data(send_stats, recv_stats):
for send,recv in map(None, send_stats, recv_stats):
- if send: print send[0],
+ line=""
+ if send: line += "%d"%send[0]
if recv:
- print "\t\t%d"%recv[0],
- if len(recv) == 4: print "\t%.2f\t%.2f\t%.2f"%tuple(recv[1:]),
- print
+ line += "\t\t%d"%recv[0]
+ if len(recv) == 4: line += "\t%.2f\t%.2f\t%.2f"%tuple(recv[1:])
+ print line
def print_summary(send_stats, recv_stats):
def avg(s): sum(s) / len(s)
@@ -184,11 +202,11 @@ class ReadyReceiver:
self.receiver = self.connection.session().receiver(
"%s;{create:receiver,delete:receiver,node:{durable:false}}"%(queue))
self.receiver.session.sync()
- self.timeout=2
+ self.timeout=10
def wait(self, receivers):
try:
- for i in xrange(len(receivers)): self.receiver.fetch(self.timeout)
+ for i in receivers: self.receiver.fetch(self.timeout)
self.connection.close()
except qpid.messaging.Empty:
for r in receivers:
@@ -197,7 +215,8 @@ class ReadyReceiver:
raise Exception("Receiver error: %s"%(out))
raise Exception("Timed out waiting for receivers to be ready")
-def flatten(l): return sum(map(lambda s: s.split(","), l),[])
+def flatten(l):
+ return sum(map(lambda s: re.split(re.compile("\s*,\s*|\s+"), s), l), [])
class RoundRobin:
def __init__(self,items):
@@ -221,20 +240,22 @@ def main():
receive_out = ""
ready_queue="%s-ready"%(opts.queue_name)
queues = ["%s-%s"%(opts.queue_name, i) for i in xrange(opts.queues)]
- for i in xrange(opts.repeat):
- delete_queues(queues, opts.broker[0])
- ready_receiver = ReadyReceiver(ready_queue, opts.broker[0])
- receivers = [start_receive(q, j, opts, ready_queue, brokers.next(), client_hosts.next())
- for q in queues for j in xrange(opts.receivers)]
- ready_receiver.wait(filter(None, receivers)) # Wait for receivers to be ready.
- senders = [start_send(q, opts,brokers.next(), client_hosts.next())
- for q in queues for j in xrange(opts.senders)]
- if opts.report_header and i == 0: print_header(opts.timestamp)
- send_stats=parse_senders(senders)
- recv_stats=parse_receivers(receivers)
- if opts.summarize: print_summary(send_stats, recv_stats)
- else: print_data(send_stats, recv_stats)
- delete_queues(queues, opts.broker[0])
+ try:
+ for i in xrange(opts.repeat):
+ delete_queues(queues, opts.broker[0])
+ ready_receiver = ReadyReceiver(ready_queue, opts.broker[0])
+ receivers = [start_receive(q, j, opts, ready_queue, brokers.next(), client_hosts.next())
+ for q in queues for j in xrange(opts.receivers)]
+ ready_receiver.wait(filter(None, receivers)) # Wait for receivers to be ready.
+ senders = [start_send(q, opts,brokers.next(), client_hosts.next())
+ for q in queues for j in xrange(opts.senders)]
+ if opts.report_header and i == 0: print_header(opts.timestamp)
+ send_stats=parse_senders(senders)
+ recv_stats=parse_receivers(receivers)
+ if opts.summarize: print_summary(send_stats, recv_stats)
+ else: print_data(send_stats, recv_stats)
+ delete_queues(queues, opts.broker[0])
+ finally: clients.kill() # No strays
if __name__ == "__main__": main()
Modified: qpid/branches/QPID-2519/cpp/src/tests/qpid-ctrl
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/tests/qpid-ctrl?rev=1187375&r1=1187374&r2=1187375&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/tests/qpid-ctrl (original)
+++ qpid/branches/QPID-2519/cpp/src/tests/qpid-ctrl Fri Oct 21 14:42:12 2011
@@ -92,7 +92,10 @@ try:
arguments = {}
for a in args:
name, val = nameval(a)
- arguments[name] = val
+ if val[0] == '{' or val[0] == '[':
+ arguments[name] = eval(val)
+ else:
+ arguments[name] = val
content = {
"_object_id": {"_object_name": object_name},
"_method_name": method_name,
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org