You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rh...@apache.org on 2009/10/11 07:42:24 UTC

svn commit: r824028 - in /qpid/trunk/qpid/python/qpid: messaging.py tests/messaging.py

Author: rhs
Date: Sun Oct 11 05:42:24 2009
New Revision: 824028

URL: http://svn.apache.org/viewvc?rev=824028&view=rev
Log:
added durable and reconnect options to the tests

Modified:
    qpid/trunk/qpid/python/qpid/messaging.py
    qpid/trunk/qpid/python/qpid/tests/messaging.py

Modified: qpid/trunk/qpid/python/qpid/messaging.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/python/qpid/messaging.py?rev=824028&r1=824027&r2=824028&view=diff
==============================================================================
--- qpid/trunk/qpid/python/qpid/messaging.py (original)
+++ qpid/trunk/qpid/python/qpid/messaging.py Sun Oct 11 05:42:24 2009
@@ -78,7 +78,7 @@
 
   @static
   def open(host, port=None, username="guest", password="guest",
-           mechanism="PLAIN", heartbeat=None):
+           mechanism="PLAIN", heartbeat=None, **options):
     """
     Creates an AMQP connection and connects it to the given host and port.
 
@@ -89,12 +89,12 @@
     @rtype: Connection
     @return: a connected Connection
     """
-    conn = Connection(host, port, username, password, mechanism, heartbeat)
+    conn = Connection(host, port, username, password, mechanism, heartbeat, **options)
     conn.connect()
     return conn
 
   def __init__(self, host, port=None, username="guest", password="guest",
-               mechanism="PLAIN", heartbeat=None):
+               mechanism="PLAIN", heartbeat=None, **options):
     """
     Creates a connection. A newly created connection must be connected
     with the Connection.connect() method before it can be started.
@@ -117,7 +117,7 @@
     self.id = str(uuid4())
     self.session_counter = 0
     self.sessions = {}
-    self.reconnect = False
+    self.reconnect = options.get("reconnect", False)
     self._connected = False
     self._lock = RLock()
     self._condition = Condition(self._lock)
@@ -524,6 +524,7 @@
     self.target = target
     self.options = options
     self.capacity = options.get("capacity", UNLIMITED)
+    self.durable = options.get("durable")
     self.queued = Serial(0)
     self.acked = Serial(0)
     self.error = None
@@ -586,6 +587,9 @@
     else:
       message = Message(object)
 
+    if message.durable is None:
+      message.durable = self.durable
+
     if self.capacity is not UNLIMITED:
       if self.capacity <= 0:
         raise InsufficientCapacity("capacity = %s" % self.capacity)
@@ -851,7 +855,7 @@
     self.to = None
     self.reply_to = None
     self.correlation_id = None
-    self.durable = False
+    self.durable = None
     self.properties = {}
     self.content_type = get_type(content)
     self.content = content

Modified: qpid/trunk/qpid/python/qpid/tests/messaging.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/python/qpid/tests/messaging.py?rev=824028&r1=824027&r2=824028&view=diff
==============================================================================
--- qpid/trunk/qpid/python/qpid/tests/messaging.py (original)
+++ qpid/trunk/qpid/python/qpid/tests/messaging.py Sun Oct 11 05:42:24 2009
@@ -50,6 +50,8 @@
       raise Skipped(e)
     self.ssn = self.setup_session()
     self.snd = self.setup_sender()
+    if self.snd is not None:
+      self.snd.durable = self.durable()
     self.rcv = self.setup_receiver()
 
   def teardown(self):
@@ -65,7 +67,7 @@
   def ping(self, ssn):
     PING_Q = 'ping-queue {create: always}'
     # send a message
-    sender = ssn.sender(PING_Q)
+    sender = ssn.sender(PING_Q, durable=self.durable())
     content = self.content("ping")
     sender.send(content)
     receiver = ssn.receiver(PING_Q)
@@ -98,16 +100,27 @@
   def delay(self):
     return float(self.config.defines.get("delay", "2"))
 
+  def get_bool(self, name):
+    return self.config.defines.get(name, "false").lower() in ("true", "yes", "1")
+
+  def durable(self):
+    return self.get_bool("durable")
+
+  def reconnect(self):
+    return self.get_bool("reconnect")
+
 class SetupTests(Base):
 
   def testOpen(self):
     # XXX: need to flesh out URL support/syntax
-    self.conn = Connection.open(self.broker.host, self.broker.port)
+    self.conn = Connection.open(self.broker.host, self.broker.port,
+                                reconnect=self.reconnect())
     self.ping(self.conn.session())
 
   def testConnect(self):
     # XXX: need to flesh out URL support/syntax
-    self.conn = Connection(self.broker.host, self.broker.port)
+    self.conn = Connection(self.broker.host, self.broker.port,
+                           reconnect=self.reconnect())
     self.conn.connect()
     self.ping(self.conn.session())
 
@@ -122,7 +135,8 @@
 class ConnectionTests(Base):
 
   def setup_connection(self):
-    return Connection.open(self.broker.host, self.broker.port)
+    return Connection.open(self.broker.host, self.broker.port,
+                           reconnect=self.reconnect())
 
   def testSessionAnon(self):
     ssn1 = self.conn.session()
@@ -180,14 +194,16 @@
 class SessionTests(Base):
 
   def setup_connection(self):
-    return Connection.open(self.broker.host, self.broker.port)
+    return Connection.open(self.broker.host, self.broker.port,
+                           reconnect=self.reconnect())
 
   def setup_session(self):
     return self.conn.session()
 
   def testSender(self):
-    snd = self.ssn.sender('test-snd-queue {create: always}')
-    snd2 = self.ssn.sender(snd.target)
+    snd = self.ssn.sender('test-snd-queue {create: always}',
+                          durable=self.durable())
+    snd2 = self.ssn.sender(snd.target, durable=self.durable())
     assert snd is not snd2
     snd2.close()
 
@@ -205,7 +221,7 @@
     rcv2.close()
 
     content = self.content("testReceiver")
-    snd = self.ssn.sender(rcv.source)
+    snd = self.ssn.sender(rcv.source, durable=self.durable())
     snd.send(content)
     msg = rcv.fetch(0)
     assert msg.content == content
@@ -234,7 +250,7 @@
   # empty on setup, and possibly also to drain queues on teardown
   def ackTest(self, acker, ack_capacity=None):
     # send a bunch of messages
-    snd = self.ssn.sender(ACK_Q)
+    snd = self.ssn.sender(ACK_Q, durable=self.durable())
     contents = [self.content("ackTest", i) for i in range(15)]
     for c in contents:
       snd.send(c)
@@ -289,7 +305,7 @@
     self.ackTest(lambda ssn: ssn.acknowledge(sync=False), UNLIMITED)
 
   def send(self, ssn, queue, base, count=1):
-    snd = ssn.sender(queue)
+    snd = ssn.sender(queue, durable=self.durable())
     contents = []
     for i in range(count):
       c = self.content(base, i)
@@ -304,7 +320,7 @@
     txssn = self.conn.session(transactional=True)
     contents = self.send(self.ssn, TX_Q, "txTest", 3)
     txrcv = txssn.receiver(TX_Q)
-    txsnd = txssn.sender(TX_Q_COPY)
+    txsnd = txssn.sender(TX_Q_COPY, durable=self.durable())
     rcv = self.ssn.receiver(txrcv.source)
     copy_rcv = self.ssn.receiver(txsnd.target)
     self.assertEmpty(copy_rcv)
@@ -403,7 +419,8 @@
 class ReceiverTests(Base):
 
   def setup_connection(self):
-    return Connection.open(self.broker.host, self.broker.port)
+    return Connection.open(self.broker.host, self.broker.port,
+                           reconnect=self.reconnect())
 
   def setup_session(self):
     return self.conn.session()
@@ -527,7 +544,7 @@
     self.assertPending(self.rcv, 5)
 
     drained = self.drain(self.rcv)
-    assert len(drained) == 10
+    assert len(drained) == 10, "%s, %s" % (len(drained), drained)
     self.assertPending(self.rcv, 0)
 
     self.ssn.acknowledge()
@@ -556,13 +573,14 @@
 class AddressErrorTests(Base):
 
   def setup_connection(self):
-    return Connection.open(self.broker.host, self.broker.port)
+    return Connection.open(self.broker.host, self.broker.port,
+                           reconnect=self.reconnect())
 
   def setup_session(self):
     return self.conn.session()
 
   def sendErrorTest(self, addr, exc, check=lambda e: True):
-    snd = self.ssn.sender(addr)
+    snd = self.ssn.sender(addr, durable=self.durable())
     try:
       snd.send("hello")
       assert False, "send succeeded"
@@ -612,7 +630,8 @@
 class SenderTests(Base):
 
   def setup_connection(self):
-    return Connection.open(self.broker.host, self.broker.port)
+    return Connection.open(self.broker.host, self.broker.port,
+                           reconnect=self.reconnect())
 
   def setup_session(self):
     return self.conn.session()
@@ -720,7 +739,8 @@
 class MessageEchoTests(Base):
 
   def setup_connection(self):
-    return Connection.open(self.broker.host, self.broker.port)
+    return Connection.open(self.broker.host, self.broker.port,
+                           reconnect=self.reconnect())
 
   def setup_session(self):
     return self.conn.session()



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org