You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kg...@apache.org on 2010/06/17 19:15:13 UTC

svn commit: r955676 - /qpid/trunk/qpid/extras/qmf/src/py/qmf/console.py

Author: kgiusti
Date: Thu Jun 17 17:15:12 2010
New Revision: 955676

URL: http://svn.apache.org/viewvc?rev=955676&view=rev
Log:
QPID-2663: rate limit heartbeat processing, set minimum heartbeat interval

Modified:
    qpid/trunk/qpid/extras/qmf/src/py/qmf/console.py

Modified: qpid/trunk/qpid/extras/qmf/src/py/qmf/console.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/extras/qmf/src/py/qmf/console.py?rev=955676&r1=955675&r2=955676&view=diff
==============================================================================
--- qpid/trunk/qpid/extras/qmf/src/py/qmf/console.py (original)
+++ qpid/trunk/qpid/extras/qmf/src/py/qmf/console.py Thu Jun 17 17:15:12 2010
@@ -580,6 +580,8 @@ class Session:
     self.v1BindingKeyList, self.v2BindingKeyList = self._bindingKeys()
     self.manageConnections = manageConnections
     self.agent_filter = []  # (vendor, product, instance)
+    self.agent_heartbeat_min = 10 # minimum agent heartbeat timeout interval
+    self.agent_heartbeat_miss = 3 # # of heartbeats to miss before deleting agent
 
     if self.userBindings and not self.rcvObjects:
       raise Exception("userBindings can't be set unless rcvObjects is set and a console is provided")
@@ -1036,7 +1038,6 @@ class Session:
     timestamp = codec.read_uint64()
     if self.console != None and agent != None:
       self.console.heartbeat(agent, timestamp)
-    broker._ageAgents()
 
 
   def _handleSchemaResp(self, broker, codec, seq, agent_addr):
@@ -1074,7 +1075,6 @@ class Session:
     ##
     if '_vendor' in values and values['_vendor'] == 'apache.org' and \
           '_product' in values and values['_product'] == 'qpidd':
-      broker._ageAgents()
       return
 
     agent = broker.getAgent(1, agentName)
@@ -1086,7 +1086,6 @@ class Session:
       agent.touch()
     if self.console and agent:
       self.console.heartbeat(agent, timestamp)
-    broker._ageAgents()
 
 
   def _v2HandleAgentLocateRsp(self, broker, mp, ah, content):
@@ -1928,6 +1927,7 @@ class Broker(Thread):
     self.rcv_queue = Queue() # for msg received on session
     self.amqpSessionId = "%s.%d.%d" % (platform.uname()[1], os.getpid(), Broker.nextSeq)
     Broker.nextSeq += 1
+    self.last_age_check = time()
 
     # thread control
     self.setDaemon(True)
@@ -2199,8 +2199,11 @@ class Broker(Thread):
       self.session.console.newAgent(agent)
 
   def _ageAgents(self):
+    if (time() - self.last_age_check) < self.session.agent_heartbeat_min:
+      # don't age if it's too soon
+      return
+    self.cv.acquire()
     try:
-      self.cv.acquire()
       to_delete = []
       to_notify = []
       for key in self.agents:
@@ -2210,6 +2213,7 @@ class Broker(Thread):
         agent = self.agents.pop(key)
         agent.close()
         to_notify.append(agent)
+      self.last_age_check = time()
     finally:
       self.cv.release()
     if self.session.console:
@@ -2414,6 +2418,10 @@ class Broker(Thread):
     except:
       pass
 
+    # mark agent as being alive
+    if agent:
+      agent.touch()
+
   def _v2Cb(self, msg):
     """ Callback from session receive thread for V2 messages
     """
@@ -2466,6 +2474,7 @@ class Broker(Thread):
           if agent_addr in self.agents:
             agent = self.agents[agent_addr]
             agent._handleQmfV2Message(opcode, mp, ah, content)
+            agent.touch()
 
     # ignore failures as the session may be shutting down...
     try:
@@ -2523,65 +2532,75 @@ class Broker(Thread):
 
     while not self.canceled:
 
-      item = self.rcv_queue.get()
-
-      if self.canceled:
-        return
+      try:
+        item = self.rcv_queue.get(timeout=self.session.agent_heartbeat_min)
+      except Empty:
+        item = None
+
+      while not self.canceled and item is not None:
+
+        if not self.connected:
+          # connection failure
+          while item:
+            # drain the queue
+            try:
+              item = self.rcv_queue.get(block=False)
+            except Empty:
+              item = None
+              break
 
-      if not self.connected:
-        # connection failure
-        while item:
-          # drain the queue
+          # notify any waiters, and callback
+          self.cv.acquire()
           try:
-            item = self.rcv_queue.get(block=False)
-          except Empty:
-            break
-
-        # notify any waiters, and callback
-        self.cv.acquire()
-        try:
-          edata = self.error;
-          if self.syncInFlight:
-            self.cv.notify()
-        finally:
-          self.cv.release()
-        self.session._handleError(edata)
-        self.session._handleBrokerDisconnect(self)
-
-        if not self.session.manageConnections:
-          return  # do not attempt recovery
+            edata = self.error;
+            if self.syncInFlight:
+              self.cv.notify()
+          finally:
+            self.cv.release()
+          self.session._handleError(edata)
+          self.session._handleBrokerDisconnect(self)
+
+          if not self.session.manageConnections:
+            return  # do not attempt recovery
+
+          # retry connection setup
+          delay = self.DELAY_MIN
+          while not self.canceled:
+            if self._tryToConnect():
+              break
+            # managed connection - try again
+            count = 0
+            while not self.canceled and count < delay:
+              sleep(1)
+              count += 1
+            if delay < self.DELAY_MAX:
+              delay *= self.DELAY_FACTOR
 
-        # retry connection setup
-        delay = self.DELAY_MIN
-        while not self.canceled:
-          if self._tryToConnect():
-            break
-          # managed connection - try again
-          count = 0
-          while not self.canceled and count < delay:
-            sleep(1)
-            count += 1
-          if delay < self.DELAY_MAX:
-            delay *= self.DELAY_FACTOR
+          if self.canceled:
+            return
 
-        if self.canceled:
-          return
+          # connection successful!
+          self.cv.acquire()
+          try:
+            self.connected = True
+          finally:
+            self.cv.release()
+
+          self.session._handleBrokerConnect(self)
+
+        elif item.typecode == Broker._q_item.type_v1msg:
+          self._v1Dispatch(item.data)
+        elif item.typecode == Broker._q_item.type_v2msg:
+          self._v2Dispatch(item.data)
 
-        # connection successful!
-        self.cv.acquire()
         try:
-          self.connected = True
-        finally:
-          self.cv.release()
-
-        self.session._handleBrokerConnect(self)
-
-      elif item.typecode == Broker._q_item.type_v1msg:
-        self._v1Dispatch(item.data)
-      elif item.typecode == Broker._q_item.type_v2msg:
-        self._v2Dispatch(item.data)
-
-
+          item = self.rcv_queue.get(block=False)
+        except Empty:
+          item = None
+
+      # queue drained, age the agents...
+      if not self.canceled:
+        self._ageAgents()
 
 #===================================================================================================
 # Agent
@@ -2598,7 +2617,12 @@ class Agent:
     self.agentBank = str(agentBank)
     self.label = label
     self.isV2 = isV2
-    self.heartbeatInterval = interval
+    self.heartbeatInterval = 0
+    if interval:
+      if interval < self.session.agent_heartbeat_min:
+        self.heartbeatInterval = self.session.agent_heartbeat_min
+      else:
+        self.heartbeatInterval = interval
     self.lock = Lock()
     self.seqMgr = self.session.seqMgr
     self.contextMap = {}
@@ -2629,7 +2653,8 @@ class Agent:
 
 
   def touch(self):
-    self.lastSeenTime = time()
+    if self.heartbeatInterval:
+      self.lastSeenTime = time()
 
 
   def setEpoch(self, epoch):
@@ -2647,7 +2672,7 @@ class Agent:
   def isOld(self):
     if self.heartbeatInterval == 0:
       return None
-    if time() - self.lastSeenTime > (2.0 * self.heartbeatInterval):
+    if time() - self.lastSeenTime > (self.session.agent_heartbeat_miss * self.heartbeatInterval):
       return True
     return None
 



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org