You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kg...@apache.org on 2010/06/17 19:15:13 UTC
svn commit: r955676 - /qpid/trunk/qpid/extras/qmf/src/py/qmf/console.py
Author: kgiusti
Date: Thu Jun 17 17:15:12 2010
New Revision: 955676
URL: http://svn.apache.org/viewvc?rev=955676&view=rev
Log:
QPID-2663: rate limit heartbeat processing, set minimum heartbeat interval
Modified:
qpid/trunk/qpid/extras/qmf/src/py/qmf/console.py
Modified: qpid/trunk/qpid/extras/qmf/src/py/qmf/console.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/extras/qmf/src/py/qmf/console.py?rev=955676&r1=955675&r2=955676&view=diff
==============================================================================
--- qpid/trunk/qpid/extras/qmf/src/py/qmf/console.py (original)
+++ qpid/trunk/qpid/extras/qmf/src/py/qmf/console.py Thu Jun 17 17:15:12 2010
@@ -580,6 +580,8 @@ class Session:
self.v1BindingKeyList, self.v2BindingKeyList = self._bindingKeys()
self.manageConnections = manageConnections
self.agent_filter = [] # (vendor, product, instance)
+ self.agent_heartbeat_min = 10 # minimum agent heartbeat timeout interval
+ self.agent_heartbeat_miss = 3 # # of heartbeats to miss before deleting agent
if self.userBindings and not self.rcvObjects:
raise Exception("userBindings can't be set unless rcvObjects is set and a console is provided")
@@ -1036,7 +1038,6 @@ class Session:
timestamp = codec.read_uint64()
if self.console != None and agent != None:
self.console.heartbeat(agent, timestamp)
- broker._ageAgents()
def _handleSchemaResp(self, broker, codec, seq, agent_addr):
@@ -1074,7 +1075,6 @@ class Session:
##
if '_vendor' in values and values['_vendor'] == 'apache.org' and \
'_product' in values and values['_product'] == 'qpidd':
- broker._ageAgents()
return
agent = broker.getAgent(1, agentName)
@@ -1086,7 +1086,6 @@ class Session:
agent.touch()
if self.console and agent:
self.console.heartbeat(agent, timestamp)
- broker._ageAgents()
def _v2HandleAgentLocateRsp(self, broker, mp, ah, content):
@@ -1928,6 +1927,7 @@ class Broker(Thread):
self.rcv_queue = Queue() # for msg received on session
self.amqpSessionId = "%s.%d.%d" % (platform.uname()[1], os.getpid(), Broker.nextSeq)
Broker.nextSeq += 1
+ self.last_age_check = time()
# thread control
self.setDaemon(True)
@@ -2199,8 +2199,11 @@ class Broker(Thread):
self.session.console.newAgent(agent)
def _ageAgents(self):
+ if (time() - self.last_age_check) < self.session.agent_heartbeat_min:
+ # don't age if it's too soon
+ return
+ self.cv.acquire()
try:
- self.cv.acquire()
to_delete = []
to_notify = []
for key in self.agents:
@@ -2210,6 +2213,7 @@ class Broker(Thread):
agent = self.agents.pop(key)
agent.close()
to_notify.append(agent)
+ self.last_age_check = time()
finally:
self.cv.release()
if self.session.console:
@@ -2414,6 +2418,10 @@ class Broker(Thread):
except:
pass
+ # mark agent as being alive
+ if agent:
+ agent.touch()
+
def _v2Cb(self, msg):
""" Callback from session receive thread for V2 messages
"""
@@ -2466,6 +2474,7 @@ class Broker(Thread):
if agent_addr in self.agents:
agent = self.agents[agent_addr]
agent._handleQmfV2Message(opcode, mp, ah, content)
+ agent.touch()
# ignore failures as the session may be shutting down...
try:
@@ -2523,65 +2532,75 @@ class Broker(Thread):
while not self.canceled:
- item = self.rcv_queue.get()
-
- if self.canceled:
- return
+ try:
+ item = self.rcv_queue.get(timeout=self.session.agent_heartbeat_min)
+ except Empty:
+ item = None
+
+ while not self.canceled and item is not None:
+
+ if not self.connected:
+ # connection failure
+ while item:
+ # drain the queue
+ try:
+ item = self.rcv_queue.get(block=False)
+ except Empty:
+ item = None
+ break
- if not self.connected:
- # connection failure
- while item:
- # drain the queue
+ # notify any waiters, and callback
+ self.cv.acquire()
try:
- item = self.rcv_queue.get(block=False)
- except Empty:
- break
-
- # notify any waiters, and callback
- self.cv.acquire()
- try:
- edata = self.error;
- if self.syncInFlight:
- self.cv.notify()
- finally:
- self.cv.release()
- self.session._handleError(edata)
- self.session._handleBrokerDisconnect(self)
-
- if not self.session.manageConnections:
- return # do not attempt recovery
+ edata = self.error;
+ if self.syncInFlight:
+ self.cv.notify()
+ finally:
+ self.cv.release()
+ self.session._handleError(edata)
+ self.session._handleBrokerDisconnect(self)
+
+ if not self.session.manageConnections:
+ return # do not attempt recovery
+
+ # retry connection setup
+ delay = self.DELAY_MIN
+ while not self.canceled:
+ if self._tryToConnect():
+ break
+ # managed connection - try again
+ count = 0
+ while not self.canceled and count < delay:
+ sleep(1)
+ count += 1
+ if delay < self.DELAY_MAX:
+ delay *= self.DELAY_FACTOR
- # retry connection setup
- delay = self.DELAY_MIN
- while not self.canceled:
- if self._tryToConnect():
- break
- # managed connection - try again
- count = 0
- while not self.canceled and count < delay:
- sleep(1)
- count += 1
- if delay < self.DELAY_MAX:
- delay *= self.DELAY_FACTOR
+ if self.canceled:
+ return
- if self.canceled:
- return
+ # connection successful!
+ self.cv.acquire()
+ try:
+ self.connected = True
+ finally:
+ self.cv.release()
+
+ self.session._handleBrokerConnect(self)
+
+ elif item.typecode == Broker._q_item.type_v1msg:
+ self._v1Dispatch(item.data)
+ elif item.typecode == Broker._q_item.type_v2msg:
+ self._v2Dispatch(item.data)
- # connection successful!
- self.cv.acquire()
try:
- self.connected = True
- finally:
- self.cv.release()
-
- self.session._handleBrokerConnect(self)
-
- elif item.typecode == Broker._q_item.type_v1msg:
- self._v1Dispatch(item.data)
- elif item.typecode == Broker._q_item.type_v2msg:
- self._v2Dispatch(item.data)
-
-
+ item = self.rcv_queue.get(block=False)
+ except Empty:
+ item = None
+
+ # queue drained, age the agents...
+ if not self.canceled:
+ self._ageAgents()
#===================================================================================================
# Agent
@@ -2598,7 +2617,12 @@ class Agent:
self.agentBank = str(agentBank)
self.label = label
self.isV2 = isV2
- self.heartbeatInterval = interval
+ self.heartbeatInterval = 0
+ if interval:
+ if interval < self.session.agent_heartbeat_min:
+ self.heartbeatInterval = self.session.agent_heartbeat_min
+ else:
+ self.heartbeatInterval = interval
self.lock = Lock()
self.seqMgr = self.session.seqMgr
self.contextMap = {}
@@ -2629,7 +2653,8 @@ class Agent:
def touch(self):
- self.lastSeenTime = time()
+ if self.heartbeatInterval:
+ self.lastSeenTime = time()
def setEpoch(self, epoch):
@@ -2647,7 +2672,7 @@ class Agent:
def isOld(self):
if self.heartbeatInterval == 0:
return None
- if time() - self.lastSeenTime > (2.0 * self.heartbeatInterval):
+ if time() - self.lastSeenTime > (self.session.agent_heartbeat_miss * self.heartbeatInterval):
return True
return None
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org