You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rh...@apache.org on 2010/04/01 22:39:31 UTC

svn commit: r930084 - in /qpid/trunk/qpid/python: examples/api/drain examples/api/server examples/api/spout qpid/messaging/driver.py qpid/messaging/endpoints.py qpid/tests/messaging/endpoints.py

Author: rhs
Date: Thu Apr  1 20:39:31 2010
New Revision: 930084

URL: http://svn.apache.org/viewvc?rev=930084&view=rev
Log:
updated reconnect option names to match C++ API

Modified:
    qpid/trunk/qpid/python/examples/api/drain
    qpid/trunk/qpid/python/examples/api/server
    qpid/trunk/qpid/python/examples/api/spout
    qpid/trunk/qpid/python/qpid/messaging/driver.py
    qpid/trunk/qpid/python/qpid/messaging/endpoints.py
    qpid/trunk/qpid/python/qpid/tests/messaging/endpoints.py

Modified: qpid/trunk/qpid/python/examples/api/drain
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/python/examples/api/drain?rev=930084&r1=930083&r2=930084&view=diff
==============================================================================
--- qpid/trunk/qpid/python/examples/api/drain (original)
+++ qpid/trunk/qpid/python/examples/api/drain Thu Apr  1 20:39:31 2010
@@ -33,8 +33,8 @@ parser.add_option("-f", "--forever", act
                   help="ignore timeout and wait forever")
 parser.add_option("-r", "--reconnect", action="store_true",
                   help="enable auto reconnect")
-parser.add_option("-d", "--reconnect-delay", type=float, default=3,
-                  help="delay between reconnect attempts")
+parser.add_option("-i", "--reconnect-interval", type=float, default=3,
+                  help="interval between reconnect attempts")
 parser.add_option("-l", "--reconnect-limit", type=int,
                   help="maximum number of reconnect attempts")
 parser.add_option("-t", "--timeout", type=float, default=0,
@@ -77,7 +77,7 @@ conn = Connection(url.host, url.port,
                   username=url.user,
                   password=url.password,
                   reconnect=opts.reconnect,
-                  reconnect_delay=opts.reconnect_delay,
+                  reconnect_interval=opts.reconnect_interval,
                   reconnect_limit=opts.reconnect_limit)
 try:
   conn.connect()

Modified: qpid/trunk/qpid/python/examples/api/server
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/python/examples/api/server?rev=930084&r1=930083&r2=930084&view=diff
==============================================================================
--- qpid/trunk/qpid/python/examples/api/server (original)
+++ qpid/trunk/qpid/python/examples/api/server Thu Apr  1 20:39:31 2010
@@ -30,8 +30,8 @@ parser.add_option("-b", "--broker", defa
                   help="connect to specified BROKER (default %default)")
 parser.add_option("-r", "--reconnect", action="store_true",
                   help="enable auto reconnect")
-parser.add_option("-d", "--reconnect-delay", type=float, default=3,
-                  help="delay between reconnect attempts")
+parser.add_option("-i", "--reconnect-interval", type=float, default=3,
+                  help="interval between reconnect attempts")
 parser.add_option("-l", "--reconnect-limit", type=int,
                   help="maximum number of reconnect attempts")
 parser.add_option("-v", dest="verbose", action="store_true",
@@ -55,7 +55,7 @@ conn = Connection(url.host, url.port,
                   username=url.user,
                   password=url.password,
                   reconnect=opts.reconnect,
-                  reconnect_delay=opts.reconnect_delay,
+                  reconnect_interval=opts.reconnect_interval,
                   reconnect_limit=opts.reconnect_limit)
 def dispatch(msg):
   msg_type = msg.properties.get("type")

Modified: qpid/trunk/qpid/python/examples/api/spout
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/python/examples/api/spout?rev=930084&r1=930083&r2=930084&view=diff
==============================================================================
--- qpid/trunk/qpid/python/examples/api/spout (original)
+++ qpid/trunk/qpid/python/examples/api/spout Thu Apr  1 20:39:31 2010
@@ -39,15 +39,15 @@ parser.add_option("-b", "--broker", defa
                   help="connect to specified BROKER (default %default)")
 parser.add_option("-r", "--reconnect", action="store_true",
                   help="enable auto reconnect")
-parser.add_option("-d", "--reconnect-delay", type=float, default=3,
-                  help="delay between reconnect attempts")
+parser.add_option("-i", "--reconnect-interval", type=float, default=3,
+                  help="interval between reconnect attempts")
 parser.add_option("-l", "--reconnect-limit", type=int,
                   help="maximum number of reconnect attempts")
 parser.add_option("-c", "--count", type=int, default=1,
                   help="stop after count messages have been sent, zero disables (default %default)")
 parser.add_option("-t", "--timeout", type=float, default=None,
                   help="exit after the specified time")
-parser.add_option("-i", "--id", help="use the supplied id instead of generating one")
+parser.add_option("-I", "--id", help="use the supplied id instead of generating one")
 parser.add_option("-S", "--subject", help="specify a subject")
 parser.add_option("-R", "--reply-to", help="specify reply-to address")
 parser.add_option("-P", "--property", dest="properties", action="append", default=[],
@@ -97,7 +97,7 @@ conn = Connection(url.host, url.port,
                   username=url.user,
                   password=url.password,
                   reconnect=opts.reconnect,
-                  reconnect_delay=opts.reconnect_delay,
+                  reconnect_interval=opts.reconnect_interval,
                   reconnect_limit=opts.reconnect_limit)
 try:
   conn.connect()

Modified: qpid/trunk/qpid/python/qpid/messaging/driver.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/python/qpid/messaging/driver.py?rev=930084&r1=930083&r2=930084&view=diff
==============================================================================
--- qpid/trunk/qpid/python/qpid/messaging/driver.py (original)
+++ qpid/trunk/qpid/python/qpid/messaging/driver.py Thu Apr  1 20:39:31 2010
@@ -323,6 +323,7 @@ class Driver:
 
     self._selector = Selector.default()
     self._attempts = 0
+    self._delay = self.connection.reconnect_interval_min
     self._hosts = [(self.connection.host, self.connection.port)] + \
         self.connection.backups
     self._host = 0
@@ -390,7 +391,9 @@ class Driver:
       if self._host > 0:
         delay = 0
       else:
-        delay = self.connection.reconnect_delay
+        delay = self._delay
+        self._delay = min(2*self._delay,
+                          self.connection.reconnect_interval_max)
       self._timeout = time.time() + delay
       log.warn("recoverable error[attempt %s]: %s" % (self._attempts, e))
       if delay > 0:
@@ -951,6 +954,7 @@ class Engine:
           # XXX: we're ignoring acks that get lost when disconnected,
           # could we deal this via some message-id based purge?
           if m._transfer_id is None:
+            ssn.acked.remove(m)
             continue
           ids.add(m._transfer_id)
           disp = m._disposition or DEFAULT_DISPOSITION

Modified: qpid/trunk/qpid/python/qpid/messaging/endpoints.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/python/qpid/messaging/endpoints.py?rev=930084&r1=930083&r2=930084&view=diff
==============================================================================
--- qpid/trunk/qpid/python/qpid/messaging/endpoints.py (original)
+++ qpid/trunk/qpid/python/qpid/messaging/endpoints.py Thu Apr  1 20:39:31 2010
@@ -84,7 +84,15 @@ class Connection:
     self.mechanisms = options.get("mechanisms")
     self.heartbeat = options.get("heartbeat")
     self.reconnect = options.get("reconnect", False)
-    self.reconnect_delay = options.get("reconnect_delay", 3)
+    self.reconnect_timeout = options.get("reconnect_timeout")
+    if "reconnect_interval_min" in options:
+      self.reconnect_interval_min = options["reconnect_interval_min"]
+    else:
+      self.reconnect_interval_min = options.get("reconnect_interval", 1)
+    if "reconnect_interval_max" in options:
+      self.reconnect_interval_max = options["reconnect_interval_max"]
+    else:
+      self.reconnect_interval_max = options.get("reconnect_interval", 2*60)
     self.reconnect_limit = options.get("reconnect_limit")
     self.transport = options.get("transport", "plain")
     self.backups = options.get("backups", [])

Modified: qpid/trunk/qpid/python/qpid/tests/messaging/endpoints.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/python/qpid/tests/messaging/endpoints.py?rev=930084&r1=930083&r2=930084&view=diff
==============================================================================
--- qpid/trunk/qpid/python/qpid/tests/messaging/endpoints.py (original)
+++ qpid/trunk/qpid/python/qpid/tests/messaging/endpoints.py Thu Apr  1 20:39:31 2010
@@ -72,6 +72,81 @@ class SetupTests(Base):
       while fds:
         os.close(fds.pop())
 
+  def testReconnect(self):
+    options = self.connection_options()
+    import socket
+    from qpid.messaging import transports
+    transport = options.get("transport", "plain")
+    real = getattr(transports, transport)
+
+    class flaky:
+
+      def __init__(self, host, port):
+        self.real = real(host, port)
+        self.sent_count = 0
+        self.recv_count = 0
+
+      def fileno(self):
+        return self.real.fileno()
+
+      def reading(self, reading):
+        return self.real.reading(reading)
+
+      def writing(self, writing):
+        return self.real.writing(writing)
+
+      def send(self, bytes):
+        if self.sent_count > 1024:
+          raise socket.error("fake error")
+        n = self.real.send(bytes)
+        self.sent_count += n
+        return n
+
+      def recv(self, n):
+        if self.recv_count > 1024:
+          return ""
+        bytes = self.real.recv(n)
+        self.recv_count += len(bytes)
+        return bytes
+
+      def close(self):
+        self.real.close()
+
+    transports.flaky = flaky
+
+    options["reconnect"] = True
+    options["reconnect_interval"] = 0
+    options["transport"] = "flaky"
+
+    self.conn = Connection.open(self.broker.host, self.broker.port, **options)
+    ssn = self.conn.session()
+    snd = ssn.sender("test-reconnect-queue; {create: always, delete: always}")
+    rcv = ssn.receiver(snd.target)
+
+    msgs = [self.message("testReconnect", i) for i in range(10)]
+    for m in msgs:
+      snd.send(m)
+
+    content = set()
+    drained = []
+    duplicates = []
+    try:
+      while True:
+        m = rcv.fetch(timeout=0)
+        if m.content not in content:
+          content.add(m.content)
+          drained.append(m)
+        else:
+          duplicates.append(m)
+        ssn.acknowledge(m)
+    except Empty:
+      pass
+    assert duplicates, "no duplicates"
+    redelivered = 3*[False] + 3*[True, False] + [True]
+    assert len(drained) == len(msgs) == len(redelivered)
+    for m, d, r in zip(msgs, drained, redelivered):
+      self.assertEcho(m, d, r)
+
 class ConnectionTests(Base):
 
   def setup_connection(self):



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org