You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2014/01/09 23:30:27 UTC
svn commit: r1556971 - in /qpid/trunk/qpid: cpp/src/tests/ha_tests.py
python/qpid/messaging/driver.py
Author: aconway
Date: Thu Jan 9 22:30:26 2014
New Revision: 1556971
URL: http://svn.apache.org/r1556971
Log:
QPID-5428: Heartbeats not in use when attempting to connect with python client.
Heartbeats ignored when opening a connection, could hang indefinitely
Need to cover 3 cases (test included):
- Connect sucessful but then broker stalls.
- Connect to a stalled broker that never responds.
- Fail-over to a stalled broker that never responds
All cases are handled by the following fixes to driver.py:
- Check for heartbeats even before engine._connected since we may time out
before receiving open-ok if the peer is stalled and never sends data.
- Set _last_in and _last_out so that we time heartbeats from the start of the
connection if no data is ever sent or received.
- Call self.update_status in Driver.timeout to detect connection closed due to
heartbeat timeout (rather than a readable or writeable event.)
Make update_status a no-op if engine or transport are not yet set up.
- Don't consider reconnect complete in connect(), wait till we get the open-ok.
See the comment on Driver._check_retry_ok()
Modified:
qpid/trunk/qpid/cpp/src/tests/ha_tests.py
qpid/trunk/qpid/python/qpid/messaging/driver.py
Modified: qpid/trunk/qpid/cpp/src/tests/ha_tests.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/ha_tests.py?rev=1556971&r1=1556970&r2=1556971&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/ha_tests.py (original)
+++ qpid/trunk/qpid/cpp/src/tests/ha_tests.py Thu Jan 9 22:30:26 2014
@@ -233,6 +233,33 @@ class ReplicationTests(HaBrokerTest):
c.close()
finally: l.restore()
+
+ def test_heartbeat_python(self):
+ """Verify that a python client with a heartbeat specified disconnects
+ from a stalled broker and does not hang indefinitely."""
+
+ broker = Broker(self)
+ broker_addr = broker.host_port()
+
+ # Case 1: Connect before stalling the broker, use the connection after stalling.
+ c = Connection(broker_addr, heartbeat=1)
+ c.open()
+ os.kill(broker.pid, signal.SIGSTOP) # Stall the broker
+ self.assertRaises(ConnectionError, c.session().sender, "foo")
+
+ # Case 2: Connect to a stalled broker
+ c = Connection(broker_addr, heartbeat=1)
+ self.assertRaises(ConnectionError, c.open)
+
+ # Case 3: Re-connect to a stalled broker.
+ broker2 = Broker(self)
+ c = Connection(broker2.host_port(), heartbeat=1, reconnect_limit=1,
+ reconnect=True, reconnect_urls=[broker_addr],
+ reconnect_log=False) # Hide expected warnings
+ c.open()
+ broker2.kill() # Cause re-connection to broker
+ self.assertRaises(ConnectionError, c.session().sender, "foo")
+
def test_failover_cpp(self):
"""Verify that failover works in the C++ client."""
cluster = HaCluster(self, 2)
Modified: qpid/trunk/qpid/python/qpid/messaging/driver.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/python/qpid/messaging/driver.py?rev=1556971&r1=1556970&r2=1556971&view=diff
==============================================================================
--- qpid/trunk/qpid/python/qpid/messaging/driver.py (original)
+++ qpid/trunk/qpid/python/qpid/messaging/driver.py Thu Jan 9 22:30:26 2014
@@ -362,11 +362,11 @@ class Driver:
[(u.host, default(u.port, 5672)) for u in urls]
if self._host >= len(hosts):
self._host = 0
- result = hosts[self._host]
+ self._last_host = hosts[self._host]
if self._host == 0:
self._attempts += 1
self._host = self._host + 1
- return result
+ return self._last_host
def _num_hosts(self):
return len(self.connection.reconnect_urls) + 1
@@ -401,6 +401,24 @@ class Driver:
def timing(self):
return self._timeout
+ def _check_retry_ok(self):
+ """We consider a reconnect to have suceeded only when we have received
+ open-ok from the peer.
+
+ If we declared success as soon as the transport connected, then we could get
+ into an infinite heartbeat loop if the remote process is hung and never
+ sends us any data. We would fail the connection after 2 missed heartbeats,
+ reconnect the transport, declare the reconnect ok, then fail again after 2
+ missed heartbeats and so on.
+ """
+ if self._retrying and self.engine._connected: # Means we have received open-ok.
+ if self._reconnect_log:
+ log.warn("reconnect succeeded: %s:%s", *self._last_host)
+ self._next_retry = None
+ self._attempts = 0
+ self._delay = self.connection.reconnect_interval_min
+ self._retrying = False
+
@synchronized
def readable(self):
try:
@@ -410,6 +428,7 @@ class Driver:
elif data:
rawlog.debug("READ[%s]: %r", self.log_id, data)
self.engine.write(data)
+ self._check_retry_ok()
else:
self.close_engine()
except socket.error, e:
@@ -451,13 +470,14 @@ class Driver:
self.schedule()
def update_status(self):
+ if not self.engine: return False
status = self.engine.status()
return getattr(self, "st_%s" % status.lower())()
def st_closed(self):
# XXX: this log statement seems to sometimes hit when the socket is not connected
# XXX: rawlog.debug("CLOSE[%s]: %s", self.log_id, self._socket.getpeername())
- self._transport.close()
+ if self._transport: self._transport.close()
self._transport = None
self.engine = None
return True
@@ -483,6 +503,7 @@ class Driver:
@synchronized
def timeout(self):
self.dispatch()
+ self.update_status()
self._notify()
self.schedule()
@@ -531,12 +552,6 @@ class Driver:
self._transport = trans(self.connection, host, port)
else:
raise ConnectError("no such transport: %s" % self.connection.transport)
- if self._retrying and self._reconnect_log:
- log.warn("reconnect succeeded: %s:%s", host, port)
- self._next_retry = None
- self._attempts = 0
- self._delay = self.connection.reconnect_interval_min
- self._retrying = False
self.schedule()
except socket.error, e:
self.close_engine(ConnectError(text=str(e)))
@@ -589,8 +604,10 @@ class Engine:
self._status = CLOSED
self._buf = ""
self._hdr = ""
- self._last_in = None
- self._last_out = None
+ # Set _last_in and _last_out here so heartbeats will be timed from the
+ # beginning of connection if no data is sent/received.
+ self._last_in = time.time()
+ self._last_out = time.time()
self._op_enc = OpEncoder()
self._seg_enc = SegmentEncoder()
self._frame_enc = FrameEncoder()
@@ -813,13 +830,15 @@ class Engine:
self.attach(ssn)
self.process(ssn)
- if self.connection.heartbeat and self._status != CLOSED:
- now = time.time()
- if self._last_in is not None and \
- now - self._last_in > 2*self.connection.heartbeat:
- raise HeartbeatTimeout(text="heartbeat timeout")
- if self._last_out is None or now - self._last_out >= self.connection.heartbeat/2.0:
- self.write_op(ConnectionHeartbeat())
+ # We need to check heartbeat even if not self._connected since we may have
+ # heartbeat timeout before receiving an open-ok
+ if self.connection.heartbeat and self._status != CLOSED and not self._closing:
+ now = time.time()
+ if now - self._last_in > 2*self.connection.heartbeat:
+ raise HeartbeatTimeout(text="heartbeat timeout")
+ # Only send heartbeats if we are connected.
+ if self._connected and now - self._last_out >= self.connection.heartbeat/2.0:
+ self.write_op(ConnectionHeartbeat())
def open(self):
self._reset()
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org