You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2014/01/09 23:30:27 UTC

svn commit: r1556971 - in /qpid/trunk/qpid: cpp/src/tests/ha_tests.py python/qpid/messaging/driver.py

Author: aconway
Date: Thu Jan  9 22:30:26 2014
New Revision: 1556971

URL: http://svn.apache.org/r1556971
Log:
QPID-5428: Heartbeats not in use when attempting to connect with python client.

Heartbeats ignored when opening a connection, could hang indefinitely
Need to cover 3 cases (test included):
- Connect sucessful but then broker stalls.
- Connect to a stalled broker that never responds.
- Fail-over to a stalled broker that never responds

All cases are handled by the following fixes to driver.py:
- Check for heartbeats even before engine._connected since we may time out
  before receiving open-ok if the peer is stalled and never sends data.
- Set _last_in and _last_out so that we time heartbeats from the start of the
  connection if no data is ever sent or received.
- Call self.update_status in Driver.timeout to detect connection closed due to
  heartbeat timeout (rather than a readable or writeable event.)
  Make update_status a no-op if engine or transport are not yet set up.
- Don't consider reconnect complete in connect(), wait till we get the open-ok.
  See the comment on Driver._check_retry_ok()

Modified:
    qpid/trunk/qpid/cpp/src/tests/ha_tests.py
    qpid/trunk/qpid/python/qpid/messaging/driver.py

Modified: qpid/trunk/qpid/cpp/src/tests/ha_tests.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/ha_tests.py?rev=1556971&r1=1556970&r2=1556971&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/ha_tests.py (original)
+++ qpid/trunk/qpid/cpp/src/tests/ha_tests.py Thu Jan  9 22:30:26 2014
@@ -233,6 +233,33 @@ class ReplicationTests(HaBrokerTest):
             c.close()
         finally: l.restore()
 
+
+    def test_heartbeat_python(self):
+        """Verify that a python client with a heartbeat specified disconnects
+        from a stalled broker and does not hang indefinitely."""
+
+        broker = Broker(self)
+        broker_addr = broker.host_port()
+
+        # Case 1: Connect before stalling the broker, use the connection after stalling.
+        c = Connection(broker_addr, heartbeat=1)
+        c.open()
+        os.kill(broker.pid, signal.SIGSTOP) # Stall the broker
+        self.assertRaises(ConnectionError, c.session().sender, "foo")
+
+        # Case 2: Connect to a stalled broker
+        c = Connection(broker_addr, heartbeat=1)
+        self.assertRaises(ConnectionError, c.open)
+
+        # Case 3: Re-connect to a stalled broker.
+        broker2 = Broker(self)
+        c = Connection(broker2.host_port(), heartbeat=1, reconnect_limit=1,
+                       reconnect=True, reconnect_urls=[broker_addr],
+                       reconnect_log=False) # Hide expected warnings
+        c.open()
+        broker2.kill()          # Cause re-connection to broker
+        self.assertRaises(ConnectionError, c.session().sender, "foo")
+
     def test_failover_cpp(self):
         """Verify that failover works in the C++ client."""
         cluster = HaCluster(self, 2)

Modified: qpid/trunk/qpid/python/qpid/messaging/driver.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/python/qpid/messaging/driver.py?rev=1556971&r1=1556970&r2=1556971&view=diff
==============================================================================
--- qpid/trunk/qpid/python/qpid/messaging/driver.py (original)
+++ qpid/trunk/qpid/python/qpid/messaging/driver.py Thu Jan  9 22:30:26 2014
@@ -362,11 +362,11 @@ class Driver:
         [(u.host, default(u.port, 5672)) for u in urls]
     if self._host >= len(hosts):
       self._host = 0
-    result = hosts[self._host]
+    self._last_host = hosts[self._host]
     if self._host == 0:
       self._attempts += 1
     self._host = self._host + 1
-    return result
+    return self._last_host
 
   def _num_hosts(self):
     return len(self.connection.reconnect_urls) + 1
@@ -401,6 +401,24 @@ class Driver:
   def timing(self):
     return self._timeout
 
+  def _check_retry_ok(self):
+    """We consider a reconnect to have suceeded only when we have received
+    open-ok from the peer.
+
+    If we declared success as soon as the transport connected, then we could get
+    into an infinite heartbeat loop if the remote process is hung and never
+    sends us any data. We would fail the connection after 2 missed heartbeats,
+    reconnect the transport, declare the reconnect ok, then fail again after 2
+    missed heartbeats and so on.
+    """
+    if self._retrying and self.engine._connected: # Means we have received open-ok.
+      if self._reconnect_log:
+        log.warn("reconnect succeeded: %s:%s", *self._last_host)
+      self._next_retry = None
+      self._attempts = 0
+      self._delay = self.connection.reconnect_interval_min
+      self._retrying = False
+
   @synchronized
   def readable(self):
     try:
@@ -410,6 +428,7 @@ class Driver:
       elif data:
         rawlog.debug("READ[%s]: %r", self.log_id, data)
         self.engine.write(data)
+        self._check_retry_ok()
       else:
         self.close_engine()
     except socket.error, e:
@@ -451,13 +470,14 @@ class Driver:
     self.schedule()
 
   def update_status(self):
+    if not self.engine: return False
     status = self.engine.status()
     return getattr(self, "st_%s" % status.lower())()
 
   def st_closed(self):
     # XXX: this log statement seems to sometimes hit when the socket is not connected
     # XXX: rawlog.debug("CLOSE[%s]: %s", self.log_id, self._socket.getpeername())
-    self._transport.close()
+    if self._transport: self._transport.close()
     self._transport = None
     self.engine = None
     return True
@@ -483,6 +503,7 @@ class Driver:
   @synchronized
   def timeout(self):
     self.dispatch()
+    self.update_status()
     self._notify()
     self.schedule()
 
@@ -531,12 +552,6 @@ class Driver:
         self._transport = trans(self.connection, host, port)
       else:
         raise ConnectError("no such transport: %s" % self.connection.transport)
-      if self._retrying and self._reconnect_log:
-        log.warn("reconnect succeeded: %s:%s", host, port)
-      self._next_retry = None
-      self._attempts = 0
-      self._delay = self.connection.reconnect_interval_min
-      self._retrying = False
       self.schedule()
     except socket.error, e:
       self.close_engine(ConnectError(text=str(e)))
@@ -589,8 +604,10 @@ class Engine:
     self._status = CLOSED
     self._buf = ""
     self._hdr = ""
-    self._last_in = None
-    self._last_out = None
+    # Set _last_in and _last_out here so heartbeats will be timed from the
+    # beginning of connection if no data is sent/received.
+    self._last_in = time.time()
+    self._last_out = time.time()
     self._op_enc = OpEncoder()
     self._seg_enc = SegmentEncoder()
     self._frame_enc = FrameEncoder()
@@ -813,13 +830,15 @@ class Engine:
         self.attach(ssn)
         self.process(ssn)
 
-      if self.connection.heartbeat and self._status != CLOSED:
-        now = time.time()
-        if self._last_in is not None and \
-              now - self._last_in > 2*self.connection.heartbeat:
-          raise HeartbeatTimeout(text="heartbeat timeout")
-        if self._last_out is None or now - self._last_out >= self.connection.heartbeat/2.0:
-          self.write_op(ConnectionHeartbeat())
+    # We need to check heartbeat even if not self._connected since we may have
+    # heartbeat timeout before receiving an open-ok
+    if self.connection.heartbeat and self._status != CLOSED and not self._closing:
+      now = time.time()
+      if now - self._last_in > 2*self.connection.heartbeat:
+        raise HeartbeatTimeout(text="heartbeat timeout")
+      # Only send heartbeats if we are connected.
+      if self._connected and now - self._last_out >= self.connection.heartbeat/2.0:
+        self.write_op(ConnectionHeartbeat())
 
   def open(self):
     self._reset()



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org