You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rh...@apache.org on 2010/08/09 13:53:25 UTC

svn commit: r983597 - in /qpid/trunk/qpid/python/qpid/messaging: driver.py exceptions.py

Author: rhs
Date: Mon Aug  9 11:53:25 2010
New Revision: 983597

URL: http://svn.apache.org/viewvc?rev=983597&view=rev
Log:
fixed heartbeating

Modified:
    qpid/trunk/qpid/python/qpid/messaging/driver.py
    qpid/trunk/qpid/python/qpid/messaging/exceptions.py

Modified: qpid/trunk/qpid/python/qpid/messaging/driver.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/python/qpid/messaging/driver.py?rev=983597&r1=983596&r2=983597&view=diff
==============================================================================
--- qpid/trunk/qpid/python/qpid/messaging/driver.py (original)
+++ qpid/trunk/qpid/python/qpid/messaging/driver.py Mon Aug  9 11:53:25 2010
@@ -339,6 +339,7 @@ class Driver:
     self._reconnect_log = self.connection.reconnect_log
     self._host = 0
     self._retrying = False
+    self._next_retry = None
     self._transport = None
 
     self._timeout = None
@@ -427,7 +428,7 @@ class Driver:
         delay = self._delay
         self._delay = min(2*self._delay,
                           self.connection.reconnect_interval_max)
-      self._timeout = time.time() + delay
+      self._next_retry = time.time() + delay
       if self._reconnect_log:
         log.warn("recoverable error[attempt %s]: %s" % (self._attempts, e))
         if delay > 0:
@@ -437,6 +438,8 @@ class Driver:
     else:
       self.engine.close(e)
 
+    self.schedule()
+
   def update_status(self):
     status = self.engine.status()
     return getattr(self, "st_%s" % status.lower())()
@@ -471,6 +474,18 @@ class Driver:
   def timeout(self):
     self.dispatch()
     self._notify()
+    self.schedule()
+
+  def schedule(self):
+    times = []
+    if self.connection.heartbeat:
+      times.append(time.time() + self.connection.heartbeat)
+    if self._next_retry:
+      times.append(self._next_retry)
+    if times:
+      self._timeout = min(times)
+    else:
+      self._timeout = None
 
   def dispatch(self):
     try:
@@ -479,12 +494,17 @@ class Driver:
           self.connect()
       else:
         self.engine.dispatch()
+    except HeartbeatTimeout, e:
+      self.close_engine(e)
     except:
       # XXX: Does socket get leaked if this occurs?
       msg = compat.format_exc()
       self.connection.error = InternalError(text=msg)
 
   def connect(self):
+    if self._retrying and time.time() < self._next_retry:
+      return
+
     try:
       # XXX: should make this non blocking
       host, port = self._next_host()
@@ -500,11 +520,12 @@ class Driver:
         raise ConnectError("no such transport: %s" % self.connection.transport)
       if self._retrying and self._reconnect_log:
         log.warn("reconnect succeeded: %s:%s", host, port)
-      self._timeout = None
+      self._next_retry = None
       self._attempts = 0
       self._host = 0
       self._delay = self.connection.reconnect_interval_min
       self._retrying = False
+      self.schedule()
     except socket.error, e:
       self.close_engine(ConnectError(text=str(e)))
 
@@ -556,6 +577,8 @@ class Engine:
     self._status = CLOSED
     self._buf = ""
     self._hdr = ""
+    self._last_in = None
+    self._last_out = None
     self._op_enc = OpEncoder()
     self._seg_enc = SegmentEncoder()
     self._frame_enc = FrameEncoder()
@@ -595,6 +618,7 @@ class Engine:
     return self._status
 
   def write(self, data):
+    self._last_in = time.time()
     try:
       if self._sasl_decode:
         data = self._sasl.decode(data)
@@ -652,6 +676,7 @@ class Engine:
     if self._sasl_encode:
       bytes = self._sasl.encode(bytes)
     self._buf += bytes
+    self._last_out = time.time()
 
   def do_header(self, hdr):
     cli_major = 0; cli_minor = 10
@@ -689,8 +714,8 @@ class Engine:
     self._sasl_decode = True
     self.connection._transport_connected = True
 
-  def connection_heartbeat(self, hrt):
-    self.write_op(ConnectionHeartbeat())
+  def do_connection_heartbeat(self, hrt):
+    pass
 
   def do_connection_close(self, close):
     self.write_op(ConnectionCloseOk())
@@ -766,6 +791,14 @@ class Engine:
         self.attach(ssn)
         self.process(ssn)
 
+      if self.connection.heartbeat and self._status != CLOSED:
+        now = time.time()
+        if self._last_in is not None and \
+              now - self._last_in > 2*self.connection.heartbeat:
+          raise HeartbeatTimeout(text="heartbeat timeout")
+        if self._last_out is None or now - self._last_out >= self.connection.heartbeat/2.0:
+          self.write_op(ConnectionHeartbeat())
+
   def open(self):
     self._reset()
     self._status = OPEN

Modified: qpid/trunk/qpid/python/qpid/messaging/exceptions.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/python/qpid/messaging/exceptions.py?rev=983597&r1=983596&r2=983597&view=diff
==============================================================================
--- qpid/trunk/qpid/python/qpid/messaging/exceptions.py (original)
+++ qpid/trunk/qpid/python/qpid/messaging/exceptions.py Mon Aug  9 11:53:25 2010
@@ -63,6 +63,9 @@ class AuthenticationFailure(ConnectError
 class ConnectionClosed(ConnectionError):
   pass
 
+class HeartbeatTimeout(ConnectionError):
+  pass
+
 ## Session Errors
 
 class SessionError(MessagingError):



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org