You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rh...@apache.org on 2010/08/09 13:53:25 UTC
svn commit: r983597 - in /qpid/trunk/qpid/python/qpid/messaging: driver.py
exceptions.py
Author: rhs
Date: Mon Aug 9 11:53:25 2010
New Revision: 983597
URL: http://svn.apache.org/viewvc?rev=983597&view=rev
Log:
fixed heartbeating
Modified:
qpid/trunk/qpid/python/qpid/messaging/driver.py
qpid/trunk/qpid/python/qpid/messaging/exceptions.py
Modified: qpid/trunk/qpid/python/qpid/messaging/driver.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/python/qpid/messaging/driver.py?rev=983597&r1=983596&r2=983597&view=diff
==============================================================================
--- qpid/trunk/qpid/python/qpid/messaging/driver.py (original)
+++ qpid/trunk/qpid/python/qpid/messaging/driver.py Mon Aug 9 11:53:25 2010
@@ -339,6 +339,7 @@ class Driver:
self._reconnect_log = self.connection.reconnect_log
self._host = 0
self._retrying = False
+ self._next_retry = None
self._transport = None
self._timeout = None
@@ -427,7 +428,7 @@ class Driver:
delay = self._delay
self._delay = min(2*self._delay,
self.connection.reconnect_interval_max)
- self._timeout = time.time() + delay
+ self._next_retry = time.time() + delay
if self._reconnect_log:
log.warn("recoverable error[attempt %s]: %s" % (self._attempts, e))
if delay > 0:
@@ -437,6 +438,8 @@ class Driver:
else:
self.engine.close(e)
+ self.schedule()
+
def update_status(self):
status = self.engine.status()
return getattr(self, "st_%s" % status.lower())()
@@ -471,6 +474,18 @@ class Driver:
def timeout(self):
self.dispatch()
self._notify()
+ self.schedule()
+
+ def schedule(self):
+ times = []
+ if self.connection.heartbeat:
+ times.append(time.time() + self.connection.heartbeat)
+ if self._next_retry:
+ times.append(self._next_retry)
+ if times:
+ self._timeout = min(times)
+ else:
+ self._timeout = None
def dispatch(self):
try:
@@ -479,12 +494,17 @@ class Driver:
self.connect()
else:
self.engine.dispatch()
+ except HeartbeatTimeout, e:
+ self.close_engine(e)
except:
# XXX: Does socket get leaked if this occurs?
msg = compat.format_exc()
self.connection.error = InternalError(text=msg)
def connect(self):
+ if self._retrying and time.time() < self._next_retry:
+ return
+
try:
# XXX: should make this non blocking
host, port = self._next_host()
@@ -500,11 +520,12 @@ class Driver:
raise ConnectError("no such transport: %s" % self.connection.transport)
if self._retrying and self._reconnect_log:
log.warn("reconnect succeeded: %s:%s", host, port)
- self._timeout = None
+ self._next_retry = None
self._attempts = 0
self._host = 0
self._delay = self.connection.reconnect_interval_min
self._retrying = False
+ self.schedule()
except socket.error, e:
self.close_engine(ConnectError(text=str(e)))
@@ -556,6 +577,8 @@ class Engine:
self._status = CLOSED
self._buf = ""
self._hdr = ""
+ self._last_in = None
+ self._last_out = None
self._op_enc = OpEncoder()
self._seg_enc = SegmentEncoder()
self._frame_enc = FrameEncoder()
@@ -595,6 +618,7 @@ class Engine:
return self._status
def write(self, data):
+ self._last_in = time.time()
try:
if self._sasl_decode:
data = self._sasl.decode(data)
@@ -652,6 +676,7 @@ class Engine:
if self._sasl_encode:
bytes = self._sasl.encode(bytes)
self._buf += bytes
+ self._last_out = time.time()
def do_header(self, hdr):
cli_major = 0; cli_minor = 10
@@ -689,8 +714,8 @@ class Engine:
self._sasl_decode = True
self.connection._transport_connected = True
- def connection_heartbeat(self, hrt):
- self.write_op(ConnectionHeartbeat())
+ def do_connection_heartbeat(self, hrt):
+ pass
def do_connection_close(self, close):
self.write_op(ConnectionCloseOk())
@@ -766,6 +791,14 @@ class Engine:
self.attach(ssn)
self.process(ssn)
+ if self.connection.heartbeat and self._status != CLOSED:
+ now = time.time()
+ if self._last_in is not None and \
+ now - self._last_in > 2*self.connection.heartbeat:
+ raise HeartbeatTimeout(text="heartbeat timeout")
+ if self._last_out is None or now - self._last_out >= self.connection.heartbeat/2.0:
+ self.write_op(ConnectionHeartbeat())
+
def open(self):
self._reset()
self._status = OPEN
Modified: qpid/trunk/qpid/python/qpid/messaging/exceptions.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/python/qpid/messaging/exceptions.py?rev=983597&r1=983596&r2=983597&view=diff
==============================================================================
--- qpid/trunk/qpid/python/qpid/messaging/exceptions.py (original)
+++ qpid/trunk/qpid/python/qpid/messaging/exceptions.py Mon Aug 9 11:53:25 2010
@@ -63,6 +63,9 @@ class AuthenticationFailure(ConnectError
class ConnectionClosed(ConnectionError):
pass
+class HeartbeatTimeout(ConnectionError):
+ pass
+
## Session Errors
class SessionError(MessagingError):
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org