You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rh...@apache.org on 2010/04/01 22:39:31 UTC
svn commit: r930084 - in /qpid/trunk/qpid/python: examples/api/drain
examples/api/server examples/api/spout qpid/messaging/driver.py
qpid/messaging/endpoints.py qpid/tests/messaging/endpoints.py
Author: rhs
Date: Thu Apr 1 20:39:31 2010
New Revision: 930084
URL: http://svn.apache.org/viewvc?rev=930084&view=rev
Log:
updated reconnect option names to match C++ API
Modified:
qpid/trunk/qpid/python/examples/api/drain
qpid/trunk/qpid/python/examples/api/server
qpid/trunk/qpid/python/examples/api/spout
qpid/trunk/qpid/python/qpid/messaging/driver.py
qpid/trunk/qpid/python/qpid/messaging/endpoints.py
qpid/trunk/qpid/python/qpid/tests/messaging/endpoints.py
Modified: qpid/trunk/qpid/python/examples/api/drain
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/python/examples/api/drain?rev=930084&r1=930083&r2=930084&view=diff
==============================================================================
--- qpid/trunk/qpid/python/examples/api/drain (original)
+++ qpid/trunk/qpid/python/examples/api/drain Thu Apr 1 20:39:31 2010
@@ -33,8 +33,8 @@ parser.add_option("-f", "--forever", act
help="ignore timeout and wait forever")
parser.add_option("-r", "--reconnect", action="store_true",
help="enable auto reconnect")
-parser.add_option("-d", "--reconnect-delay", type=float, default=3,
- help="delay between reconnect attempts")
+parser.add_option("-i", "--reconnect-interval", type=float, default=3,
+ help="interval between reconnect attempts")
parser.add_option("-l", "--reconnect-limit", type=int,
help="maximum number of reconnect attempts")
parser.add_option("-t", "--timeout", type=float, default=0,
@@ -77,7 +77,7 @@ conn = Connection(url.host, url.port,
username=url.user,
password=url.password,
reconnect=opts.reconnect,
- reconnect_delay=opts.reconnect_delay,
+ reconnect_interval=opts.reconnect_interval,
reconnect_limit=opts.reconnect_limit)
try:
conn.connect()
Modified: qpid/trunk/qpid/python/examples/api/server
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/python/examples/api/server?rev=930084&r1=930083&r2=930084&view=diff
==============================================================================
--- qpid/trunk/qpid/python/examples/api/server (original)
+++ qpid/trunk/qpid/python/examples/api/server Thu Apr 1 20:39:31 2010
@@ -30,8 +30,8 @@ parser.add_option("-b", "--broker", defa
help="connect to specified BROKER (default %default)")
parser.add_option("-r", "--reconnect", action="store_true",
help="enable auto reconnect")
-parser.add_option("-d", "--reconnect-delay", type=float, default=3,
- help="delay between reconnect attempts")
+parser.add_option("-i", "--reconnect-interval", type=float, default=3,
+ help="interval between reconnect attempts")
parser.add_option("-l", "--reconnect-limit", type=int,
help="maximum number of reconnect attempts")
parser.add_option("-v", dest="verbose", action="store_true",
@@ -55,7 +55,7 @@ conn = Connection(url.host, url.port,
username=url.user,
password=url.password,
reconnect=opts.reconnect,
- reconnect_delay=opts.reconnect_delay,
+ reconnect_interval=opts.reconnect_interval,
reconnect_limit=opts.reconnect_limit)
def dispatch(msg):
msg_type = msg.properties.get("type")
Modified: qpid/trunk/qpid/python/examples/api/spout
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/python/examples/api/spout?rev=930084&r1=930083&r2=930084&view=diff
==============================================================================
--- qpid/trunk/qpid/python/examples/api/spout (original)
+++ qpid/trunk/qpid/python/examples/api/spout Thu Apr 1 20:39:31 2010
@@ -39,15 +39,15 @@ parser.add_option("-b", "--broker", defa
help="connect to specified BROKER (default %default)")
parser.add_option("-r", "--reconnect", action="store_true",
help="enable auto reconnect")
-parser.add_option("-d", "--reconnect-delay", type=float, default=3,
- help="delay between reconnect attempts")
+parser.add_option("-i", "--reconnect-interval", type=float, default=3,
+ help="interval between reconnect attempts")
parser.add_option("-l", "--reconnect-limit", type=int,
help="maximum number of reconnect attempts")
parser.add_option("-c", "--count", type=int, default=1,
help="stop after count messages have been sent, zero disables (default %default)")
parser.add_option("-t", "--timeout", type=float, default=None,
help="exit after the specified time")
-parser.add_option("-i", "--id", help="use the supplied id instead of generating one")
+parser.add_option("-I", "--id", help="use the supplied id instead of generating one")
parser.add_option("-S", "--subject", help="specify a subject")
parser.add_option("-R", "--reply-to", help="specify reply-to address")
parser.add_option("-P", "--property", dest="properties", action="append", default=[],
@@ -97,7 +97,7 @@ conn = Connection(url.host, url.port,
username=url.user,
password=url.password,
reconnect=opts.reconnect,
- reconnect_delay=opts.reconnect_delay,
+ reconnect_interval=opts.reconnect_interval,
reconnect_limit=opts.reconnect_limit)
try:
conn.connect()
Modified: qpid/trunk/qpid/python/qpid/messaging/driver.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/python/qpid/messaging/driver.py?rev=930084&r1=930083&r2=930084&view=diff
==============================================================================
--- qpid/trunk/qpid/python/qpid/messaging/driver.py (original)
+++ qpid/trunk/qpid/python/qpid/messaging/driver.py Thu Apr 1 20:39:31 2010
@@ -323,6 +323,7 @@ class Driver:
self._selector = Selector.default()
self._attempts = 0
+ self._delay = self.connection.reconnect_interval_min
self._hosts = [(self.connection.host, self.connection.port)] + \
self.connection.backups
self._host = 0
@@ -390,7 +391,9 @@ class Driver:
if self._host > 0:
delay = 0
else:
- delay = self.connection.reconnect_delay
+ delay = self._delay
+ self._delay = min(2*self._delay,
+ self.connection.reconnect_interval_max)
self._timeout = time.time() + delay
log.warn("recoverable error[attempt %s]: %s" % (self._attempts, e))
if delay > 0:
@@ -951,6 +954,7 @@ class Engine:
# XXX: we're ignoring acks that get lost when disconnected,
# could we deal this via some message-id based purge?
if m._transfer_id is None:
+ ssn.acked.remove(m)
continue
ids.add(m._transfer_id)
disp = m._disposition or DEFAULT_DISPOSITION
Modified: qpid/trunk/qpid/python/qpid/messaging/endpoints.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/python/qpid/messaging/endpoints.py?rev=930084&r1=930083&r2=930084&view=diff
==============================================================================
--- qpid/trunk/qpid/python/qpid/messaging/endpoints.py (original)
+++ qpid/trunk/qpid/python/qpid/messaging/endpoints.py Thu Apr 1 20:39:31 2010
@@ -84,7 +84,15 @@ class Connection:
self.mechanisms = options.get("mechanisms")
self.heartbeat = options.get("heartbeat")
self.reconnect = options.get("reconnect", False)
- self.reconnect_delay = options.get("reconnect_delay", 3)
+ self.reconnect_timeout = options.get("reconnect_timeout")
+ if "reconnect_interval_min" in options:
+ self.reconnect_interval_min = options["reconnect_interval_min"]
+ else:
+ self.reconnect_interval_min = options.get("reconnect_interval", 1)
+ if "reconnect_interval_max" in options:
+ self.reconnect_interval_max = options["reconnect_interval_max"]
+ else:
+ self.reconnect_interval_max = options.get("reconnect_interval", 2*60)
self.reconnect_limit = options.get("reconnect_limit")
self.transport = options.get("transport", "plain")
self.backups = options.get("backups", [])
Modified: qpid/trunk/qpid/python/qpid/tests/messaging/endpoints.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/python/qpid/tests/messaging/endpoints.py?rev=930084&r1=930083&r2=930084&view=diff
==============================================================================
--- qpid/trunk/qpid/python/qpid/tests/messaging/endpoints.py (original)
+++ qpid/trunk/qpid/python/qpid/tests/messaging/endpoints.py Thu Apr 1 20:39:31 2010
@@ -72,6 +72,81 @@ class SetupTests(Base):
while fds:
os.close(fds.pop())
+ def testReconnect(self):
+ options = self.connection_options()
+ import socket
+ from qpid.messaging import transports
+ transport = options.get("transport", "plain")
+ real = getattr(transports, transport)
+
+ class flaky:
+
+ def __init__(self, host, port):
+ self.real = real(host, port)
+ self.sent_count = 0
+ self.recv_count = 0
+
+ def fileno(self):
+ return self.real.fileno()
+
+ def reading(self, reading):
+ return self.real.reading(reading)
+
+ def writing(self, writing):
+ return self.real.writing(writing)
+
+ def send(self, bytes):
+ if self.sent_count > 1024:
+ raise socket.error("fake error")
+ n = self.real.send(bytes)
+ self.sent_count += n
+ return n
+
+ def recv(self, n):
+ if self.recv_count > 1024:
+ return ""
+ bytes = self.real.recv(n)
+ self.recv_count += len(bytes)
+ return bytes
+
+ def close(self):
+ self.real.close()
+
+ transports.flaky = flaky
+
+ options["reconnect"] = True
+ options["reconnect_interval"] = 0
+ options["transport"] = "flaky"
+
+ self.conn = Connection.open(self.broker.host, self.broker.port, **options)
+ ssn = self.conn.session()
+ snd = ssn.sender("test-reconnect-queue; {create: always, delete: always}")
+ rcv = ssn.receiver(snd.target)
+
+ msgs = [self.message("testReconnect", i) for i in range(10)]
+ for m in msgs:
+ snd.send(m)
+
+ content = set()
+ drained = []
+ duplicates = []
+ try:
+ while True:
+ m = rcv.fetch(timeout=0)
+ if m.content not in content:
+ content.add(m.content)
+ drained.append(m)
+ else:
+ duplicates.append(m)
+ ssn.acknowledge(m)
+ except Empty:
+ pass
+ assert duplicates, "no duplicates"
+ redelivered = 3*[False] + 3*[True, False] + [True]
+ assert len(drained) == len(msgs) == len(redelivered)
+ for m, d, r in zip(msgs, drained, redelivered):
+ self.assertEcho(m, d, r)
+
class ConnectionTests(Base):
def setup_connection(self):
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org