You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rh...@apache.org on 2009/10/11 07:42:24 UTC
svn commit: r824028 - in /qpid/trunk/qpid/python/qpid: messaging.py
tests/messaging.py
Author: rhs
Date: Sun Oct 11 05:42:24 2009
New Revision: 824028
URL: http://svn.apache.org/viewvc?rev=824028&view=rev
Log:
added durable and reconnect options to the tests
Modified:
qpid/trunk/qpid/python/qpid/messaging.py
qpid/trunk/qpid/python/qpid/tests/messaging.py
Modified: qpid/trunk/qpid/python/qpid/messaging.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/python/qpid/messaging.py?rev=824028&r1=824027&r2=824028&view=diff
==============================================================================
--- qpid/trunk/qpid/python/qpid/messaging.py (original)
+++ qpid/trunk/qpid/python/qpid/messaging.py Sun Oct 11 05:42:24 2009
@@ -78,7 +78,7 @@
@static
def open(host, port=None, username="guest", password="guest",
- mechanism="PLAIN", heartbeat=None):
+ mechanism="PLAIN", heartbeat=None, **options):
"""
Creates an AMQP connection and connects it to the given host and port.
@@ -89,12 +89,12 @@
@rtype: Connection
@return: a connected Connection
"""
- conn = Connection(host, port, username, password, mechanism, heartbeat)
+ conn = Connection(host, port, username, password, mechanism, heartbeat, **options)
conn.connect()
return conn
def __init__(self, host, port=None, username="guest", password="guest",
- mechanism="PLAIN", heartbeat=None):
+ mechanism="PLAIN", heartbeat=None, **options):
"""
Creates a connection. A newly created connection must be connected
with the Connection.connect() method before it can be started.
@@ -117,7 +117,7 @@
self.id = str(uuid4())
self.session_counter = 0
self.sessions = {}
- self.reconnect = False
+ self.reconnect = options.get("reconnect", False)
self._connected = False
self._lock = RLock()
self._condition = Condition(self._lock)
@@ -524,6 +524,7 @@
self.target = target
self.options = options
self.capacity = options.get("capacity", UNLIMITED)
+ self.durable = options.get("durable")
self.queued = Serial(0)
self.acked = Serial(0)
self.error = None
@@ -586,6 +587,9 @@
else:
message = Message(object)
+ if message.durable is None:
+ message.durable = self.durable
+
if self.capacity is not UNLIMITED:
if self.capacity <= 0:
raise InsufficientCapacity("capacity = %s" % self.capacity)
@@ -851,7 +855,7 @@
self.to = None
self.reply_to = None
self.correlation_id = None
- self.durable = False
+ self.durable = None
self.properties = {}
self.content_type = get_type(content)
self.content = content
Modified: qpid/trunk/qpid/python/qpid/tests/messaging.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/python/qpid/tests/messaging.py?rev=824028&r1=824027&r2=824028&view=diff
==============================================================================
--- qpid/trunk/qpid/python/qpid/tests/messaging.py (original)
+++ qpid/trunk/qpid/python/qpid/tests/messaging.py Sun Oct 11 05:42:24 2009
@@ -50,6 +50,8 @@
raise Skipped(e)
self.ssn = self.setup_session()
self.snd = self.setup_sender()
+ if self.snd is not None:
+ self.snd.durable = self.durable()
self.rcv = self.setup_receiver()
def teardown(self):
@@ -65,7 +67,7 @@
def ping(self, ssn):
PING_Q = 'ping-queue {create: always}'
# send a message
- sender = ssn.sender(PING_Q)
+ sender = ssn.sender(PING_Q, durable=self.durable())
content = self.content("ping")
sender.send(content)
receiver = ssn.receiver(PING_Q)
@@ -98,16 +100,27 @@
def delay(self):
return float(self.config.defines.get("delay", "2"))
+ def get_bool(self, name):
+ return self.config.defines.get(name, "false").lower() in ("true", "yes", "1")
+
+ def durable(self):
+ return self.get_bool("durable")
+
+ def reconnect(self):
+ return self.get_bool("reconnect")
+
class SetupTests(Base):
def testOpen(self):
# XXX: need to flesh out URL support/syntax
- self.conn = Connection.open(self.broker.host, self.broker.port)
+ self.conn = Connection.open(self.broker.host, self.broker.port,
+ reconnect=self.reconnect())
self.ping(self.conn.session())
def testConnect(self):
# XXX: need to flesh out URL support/syntax
- self.conn = Connection(self.broker.host, self.broker.port)
+ self.conn = Connection(self.broker.host, self.broker.port,
+ reconnect=self.reconnect())
self.conn.connect()
self.ping(self.conn.session())
@@ -122,7 +135,8 @@
class ConnectionTests(Base):
def setup_connection(self):
- return Connection.open(self.broker.host, self.broker.port)
+ return Connection.open(self.broker.host, self.broker.port,
+ reconnect=self.reconnect())
def testSessionAnon(self):
ssn1 = self.conn.session()
@@ -180,14 +194,16 @@
class SessionTests(Base):
def setup_connection(self):
- return Connection.open(self.broker.host, self.broker.port)
+ return Connection.open(self.broker.host, self.broker.port,
+ reconnect=self.reconnect())
def setup_session(self):
return self.conn.session()
def testSender(self):
- snd = self.ssn.sender('test-snd-queue {create: always}')
- snd2 = self.ssn.sender(snd.target)
+ snd = self.ssn.sender('test-snd-queue {create: always}',
+ durable=self.durable())
+ snd2 = self.ssn.sender(snd.target, durable=self.durable())
assert snd is not snd2
snd2.close()
@@ -205,7 +221,7 @@
rcv2.close()
content = self.content("testReceiver")
- snd = self.ssn.sender(rcv.source)
+ snd = self.ssn.sender(rcv.source, durable=self.durable())
snd.send(content)
msg = rcv.fetch(0)
assert msg.content == content
@@ -234,7 +250,7 @@
# empty on setup, and possibly also to drain queues on teardown
def ackTest(self, acker, ack_capacity=None):
# send a bunch of messages
- snd = self.ssn.sender(ACK_Q)
+ snd = self.ssn.sender(ACK_Q, durable=self.durable())
contents = [self.content("ackTest", i) for i in range(15)]
for c in contents:
snd.send(c)
@@ -289,7 +305,7 @@
self.ackTest(lambda ssn: ssn.acknowledge(sync=False), UNLIMITED)
def send(self, ssn, queue, base, count=1):
- snd = ssn.sender(queue)
+ snd = ssn.sender(queue, durable=self.durable())
contents = []
for i in range(count):
c = self.content(base, i)
@@ -304,7 +320,7 @@
txssn = self.conn.session(transactional=True)
contents = self.send(self.ssn, TX_Q, "txTest", 3)
txrcv = txssn.receiver(TX_Q)
- txsnd = txssn.sender(TX_Q_COPY)
+ txsnd = txssn.sender(TX_Q_COPY, durable=self.durable())
rcv = self.ssn.receiver(txrcv.source)
copy_rcv = self.ssn.receiver(txsnd.target)
self.assertEmpty(copy_rcv)
@@ -403,7 +419,8 @@
class ReceiverTests(Base):
def setup_connection(self):
- return Connection.open(self.broker.host, self.broker.port)
+ return Connection.open(self.broker.host, self.broker.port,
+ reconnect=self.reconnect())
def setup_session(self):
return self.conn.session()
@@ -527,7 +544,7 @@
self.assertPending(self.rcv, 5)
drained = self.drain(self.rcv)
- assert len(drained) == 10
+ assert len(drained) == 10, "%s, %s" % (len(drained), drained)
self.assertPending(self.rcv, 0)
self.ssn.acknowledge()
@@ -556,13 +573,14 @@
class AddressErrorTests(Base):
def setup_connection(self):
- return Connection.open(self.broker.host, self.broker.port)
+ return Connection.open(self.broker.host, self.broker.port,
+ reconnect=self.reconnect())
def setup_session(self):
return self.conn.session()
def sendErrorTest(self, addr, exc, check=lambda e: True):
- snd = self.ssn.sender(addr)
+ snd = self.ssn.sender(addr, durable=self.durable())
try:
snd.send("hello")
assert False, "send succeeded"
@@ -612,7 +630,8 @@
class SenderTests(Base):
def setup_connection(self):
- return Connection.open(self.broker.host, self.broker.port)
+ return Connection.open(self.broker.host, self.broker.port,
+ reconnect=self.reconnect())
def setup_session(self):
return self.conn.session()
@@ -720,7 +739,8 @@
class MessageEchoTests(Base):
def setup_connection(self):
- return Connection.open(self.broker.host, self.broker.port)
+ return Connection.open(self.broker.host, self.broker.port,
+ reconnect=self.reconnect())
def setup_session(self):
return self.conn.session()
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org