You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kg...@apache.org on 2010/10/29 18:07:38 UTC

svn commit: r1028819 - /qpid/trunk/qpid/extras/qmf/src/py/qmf/console.py

Author: kgiusti
Date: Fri Oct 29 16:07:38 2010
New Revision: 1028819

URL: http://svn.apache.org/viewvc?rev=1028819&view=rev
Log:
QPID-2782: enhance the console's ability to selectively filter unsolicited events.

Modified:
    qpid/trunk/qpid/extras/qmf/src/py/qmf/console.py

Modified: qpid/trunk/qpid/extras/qmf/src/py/qmf/console.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/extras/qmf/src/py/qmf/console.py?rev=1028819&r1=1028818&r2=1028819&view=diff
==============================================================================
--- qpid/trunk/qpid/extras/qmf/src/py/qmf/console.py (original)
+++ qpid/trunk/qpid/extras/qmf/src/py/qmf/console.py Fri Oct 29 16:07:38 2010
@@ -210,10 +210,6 @@ class Object(object):
     """ Return the broker from which this object was sent """
     return self._broker
 
-  def getAgent(self):
-    """ Return the agent from which this object was sent """
-    return self._agent
-
   def getV2RoutingKey(self):
     """ Get the QMFv2 routing key to address this object """
     return self._agent.getV2RoutingKey()
@@ -579,12 +575,15 @@ class Session:
       self.rcvHeartbeats = False
     self.v1BindingKeyList, self.v2BindingKeyList = self._bindingKeys()
     self.manageConnections = manageConnections
-    self.agent_filter = []  # (vendor, product, instance)
+    # callback filters:
+    self.agent_filter = []  # (vendor, product, instance) || v1-agent-label-str
+    self.class_filter = []  # (pkg, class)
+    self.event_filter = []  # (pkg, event)
     self.agent_heartbeat_min = 10 # minimum agent heartbeat timeout interval
     self.agent_heartbeat_miss = 3 # # of heartbeats to miss before deleting agent
 
-    if self.userBindings and not self.rcvObjects:
-      raise Exception("userBindings can't be set unless rcvObjects is set and a console is provided")
+    if self.userBindings and not self.console:
+      raise Exception("userBindings can't be set unless a console is provided.")
 
   def close(self):
     """ Releases all resources held by the session.  Must be called by the
@@ -647,9 +646,6 @@ class Session:
     'broker' argument is the object returned from the addBroker call.  Errors
     are ignored.
     """
-    if self.console:
-      for agent in broker.getAgents():
-        self.console.delAgent(agent)
     broker._shutdown()
     self.brokers.remove(broker)
     del broker
@@ -677,13 +673,21 @@ class Session:
 
 
   def bindPackage(self, packageName):
-    """ Request object updates and events for all elements of a package. Only
-    valid if userBindings is True."""
-    if not self.userBindings or not self.rcvObjects:
-      raise Exception("userBindings option not set for Session")
+    """ Filter object and event callbacks to only those elements of the
+    specified package. Also filters newPackage and newClass callbacks to the
+    given package. Only valid if userBindings is True.
+    """
+    if not self.userBindings:
+      raise Exception("userBindings option must be set for this Session.")
+    if not self.rcvObjects and not self.rcvEvents:
+      raise Exception("Session needs to be configured to receive events or objects.")
     v1keys = ["console.obj.*.*.%s.#" % packageName, "console.event.*.*.%s.#" % packageName]
     v2keys = ["agent.ind.data.%s.#" % packageName.replace(".", "_"),
               "agent.ind.event.%s.#" % packageName.replace(".", "_"),]
+    if (packageName, None) not in self.class_filter:
+      self.class_filter.append((packageName, None))
+    if (packageName, None) not in self.event_filter:
+      self.event_filter.append((packageName, None))
     self.v1BindingKeyList.extend(v1keys)
     self.v2BindingKeyList.extend(v2keys)
     for broker in self.brokers:
@@ -697,12 +701,15 @@ class Session:
 
 
   def bindClass(self, pname, cname=None):
-    """ Request object updates for a particular class given package and class
-    name, or all classes of a particular package if cname=None.  Only valid if
-    userBindings is True.
+    """ Filter object callbacks to only those objects of the specified package
+    and optional class. Will also filter newPackage/newClass callbacks to the
+    specified package and class.  Only valid if userBindings is True and
+    rcvObjects is True.
     """
-    if not self.userBindings or not self.rcvObjects:
-      raise Exception("userBindings option not set for Session")
+    if not self.userBindings:
+      raise Exception("userBindings option must be set for this Session.")
+    if not self.rcvObjects:
+      raise Exception("Session needs to be configured with rcvObjects=True.")
     if cname is not None:
       v1key = "console.obj.*.*.%s.%s.#" % (pname, cname)
       v2key = "agent.ind.data.%s.%s.#" % (pname.replace(".", "_"), cname.replace(".", "_"))
@@ -711,6 +718,8 @@ class Session:
       v2key = "agent.ind.data.%s.#" % pname.replace(".", "_")
     self.v1BindingKeyList.append(v1key)
     self.v2BindingKeyList.append(v2key)
+    if (pname, cname) not in self.class_filter:
+      self.class_filter.append((pname, cname))
     for broker in self.brokers:
       if broker.isConnected():
         broker.amqpSession.exchange_bind(exchange="qpid.management", queue=broker.topicName, binding_key=v1key)
@@ -720,19 +729,25 @@ class Session:
 
       
   def bindClassKey(self, classKey):
-    """ Request object updates for a particular table class by class key.  Only
-    valid if userBindings is True.
+    """ Filter object callbacks to only those objects of the specified
+    class. Will also filter newPackage/newClass callbacks to the specified
+    package and class.  Only valid if userBindings is True and rcvObjects is
+    True.
     """
     pname = classKey.getPackageName()
     cname = classKey.getClassName()
     self.bindClass(pname, cname)
 
   def bindEvent(self, pname, ename=None):
-    """ Request events from a particular class by package and event name, or
-    all events is ename=None.  Only valid if userBindings is True.
+    """ Filter event callbacks only from a particular class by package and
+    event name, or all events in a package if ename=None.  Will also filter
+    newPackage/newClass callbacks to the specified package and class. Only
+    valid if userBindings is True and rcvEvents is True.
     """
-    if not self.userBindings or not self.rcvEvents:
-      raise Exception("userBindings option not set for Session")
+    if not self.userBindings:
+      raise Exception("userBindings option must be set for this Session.")
+    if not self.rcvEvents:
+      raise Exception("Session needs to be configured with rcvEvents=True.")
     if ename is not None:
       v1key = "console.event.*.*.%s.%s.#" % (pname, ename)
       v2key = "agent.ind.event.%s.%s.#" % (pname.replace(".", "_"), ename.replace(".", "_"))
@@ -741,6 +756,8 @@ class Session:
       v2key = "agent.ind.event.%s.#" % pname.replace(".", "_")
     self.v1BindingKeyList.append(v1key)
     self.v2BindingKeyList.append(v2key)
+    if (pname, ename) not in self.event_filter:
+      self.event_filter.append((pname, ename))
     for broker in self.brokers:
       if broker.isConnected():
         broker.amqpSession.exchange_bind(exchange="qpid.management", queue=broker.topicName, binding_key=v1key)
@@ -749,40 +766,58 @@ class Session:
           broker.amqpSession.exchange_bind(exchange="qmf.default.topic", queue=broker.v2_topic_queue_ui, binding_key=v2key)
 
   def bindEventKey(self, eventKey):
-    """ Request events for a particular class by class key. Only valid if
-    userBindings is True. """
+    """ Filter event callbacks only from a particular class key. Will also
+    filter newPackage/newClass callbacks to the specified package and
+    class. Only valid if userBindings is True and rcvEvents is True.
+    """
     pname = eventKey.getPackageName()
     ename = eventKey.getClassName()
     self.bindEvent(pname, ename)
 
-  def bindAgent(self, vendor, product=None, instance=None):
-    """ Listen for heartbeats/events/data indications only for those agent(s)
-    that match the vendor and, optionally, the product strings.  Only valid if
-    userBindings is True.
+  def bindAgent(self, vendor=None, product=None, instance=None, label=None):
+    """ Receive heartbeats, newAgent and delAgent callbacks only for those
+    agent(s) that match the passed identification criteria:
+    V2 agents: vendor, optionally product and instance strings
+    V1 agents: the label string.
+    Only valid if userBindings is True.
     """
     if not self.userBindings:
       raise Exception("Session not configured for binding specific agents.")
-    if product is not None:
-      v2key = "agent.ind.heartbeat.%s.%s.#" % (vendor.replace(".", "_"), product.replace(".", "_"))
-    else:
-      v2key = "agent.ind.heartbeat.%s.#" % vendor.replace(".", "_")
-    self.v2BindingKeyList.append(v2key)
+    if vendor is None and label is None:
+      raise Exception("Must specify at least a vendor (V2 agents)"
+                      " or label (V1 agents).")
+
+    if vendor:   # V2 agent identification
+      if product is not None:
+        v2key = "agent.ind.heartbeat.%s.%s.#" % (vendor.replace(".", "_"), product.replace(".", "_"))
+      else:
+        v2key = "agent.ind.heartbeat.%s.#" % vendor.replace(".", "_")
+      self.v2BindingKeyList.append(v2key)
 
-    # allow wildcards - only add filter if a non-wildcarded component is given
-    if vendor == "*":
-      vendor = None
-    if product == "*":
-      product = None
-    if instance == "*":
-      instance = None
-    if vendor or product or instance:
-      self.agent_filter.append((vendor, product, instance))
+      # allow wildcards - only add filter if a non-wildcarded component is given
+      if vendor == "*":
+        vendor = None
+      if product == "*":
+        product = None
+      if instance == "*":
+        instance = None
+      if vendor or product or instance:
+        if (vendor, product, instance) not in self.agent_filter:
+          self.agent_filter.append((vendor, product, instance))
+
+      for broker in self.brokers:
+        if broker.isConnected():
+          if broker.brokerSupportsV2:
+            # heartbeats should arrive on the heartbeat queue
+            broker.amqpSession.exchange_bind(exchange="qmf.default.topic",
+                                             queue=broker.v2_topic_queue_hb,
+                                             binding_key=v2key)
+    elif label != "*": # non-wildcard V1 agent label
+      # V1 format heartbeats do not have any agent identifier in the routing
+      # key, so we cannot filter them by bindings.
+      if label not in self.agent_filter:
+        self.agent_filter.append(label)
 
-    for broker in self.brokers:
-      if broker.isConnected():
-        if broker.brokerSupportsV2:
-          # heartbeats should arrive on the heartbeat queue
-          broker.amqpSession.exchange_bind(exchange="qmf.default.topic", queue=broker.v2_topic_queue_hb, binding_key=v2key)
 
   def getAgents(self, broker=None):
     """ Get a list of currently known agents """
@@ -915,14 +950,14 @@ class Session:
     product = kwargs.get("product", "*")
     severity = kwargs.get("severity", "*")
 
-    if package is "*" and event is not "*":
+    if package == "*" and event != "*":
       raise Exception("'package' parameter required if 'event' parameter"
                       " supplied")
 
     # V1 key - can only filter on package (and event)
-    if package is not "*":
+    if package == "*":
       key = "console.event.*.*." + str(package)
-      if event is not "*":
+      if event != "*":
         key += "." + str(event)
       key += ".#"
 
@@ -951,6 +986,15 @@ class Session:
       except:
         pass
 
+    if package != "*":
+      if event != "*":
+        f = (package, event)
+      else:
+        f = (package, None)
+      if f not in self.event_filter:
+        self.event_filter.append(f)
+
+
   def addAgentFilter(self, vendor, product=None):
     """ Deprecate - use bindAgent() instead
     """
@@ -1030,14 +1074,14 @@ class Session:
   def _handleBrokerConnect(self, broker):
     if self.console:
       for agent in broker.getAgents():
-        self.console.newAgent(agent)
+        self._newAgentCallback(agent)
       self.console.brokerConnected(broker)
 
 
   def _handleBrokerDisconnect(self, broker):
     if self.console:
       for agent in broker.getAgents():
-        self.console.delAgent(agent)
+        self._delAgentCallback(agent)
       self.console.brokerDisconnected(broker)
 
 
@@ -1059,7 +1103,7 @@ class Session:
     pname = str(codec.read_str8())
     notify = self.schemaCache.declarePackage(pname)
     if notify and self.console != None:
-      self.console.newPackage(pname)
+      self._newPackageCallback(pname)
 
     # Send a class request
     broker._incOutstanding()
@@ -1132,9 +1176,9 @@ class Session:
         agentBank = 0
 
     agent = broker.getAgent(brokerBank, agentBank)
-    timestamp = codec.read_uint64()
     if self.rcvHeartbeats and self.console != None and agent != None:
-      self.console.heartbeat(agent, timestamp)
+      timestamp = codec.read_uint64()
+      self._heartbeatCallback(agent, timestamp)
 
 
   def _handleSchemaResp(self, broker, codec, seq, agent_addr):
@@ -1148,9 +1192,9 @@ class Session:
       broker._decOutstanding()
     if self.console != None:
       if new_pkg:
-        self.console.newPackage(classKey.getPackageName())
+        self._newPackageCallback(classKey.getPackageName())
       if new_cls:
-        self.console.newClass(kind, classKey)
+        self._newClassCallback(kind, classKey)
 
     if agent_addr and (agent_addr.__class__ == str or agent_addr.__class__ == unicode):
       agent = self._getAgentForAgentAddr(agent_addr)
@@ -1179,7 +1223,7 @@ class Session:
       return
 
     if self.agent_filter:
-      # only allow agents that satisfy the filter
+      # only allow V2 agents that satisfy the filter
       v = agentName.split(":", 2)
       if len(v) != 3 or ((v[0], None, None) not in self.agent_filter
                          and (v[0], v[1], None) not in self.agent_filter
@@ -1194,7 +1238,7 @@ class Session:
     else:
       agent.touch()
     if self.rcvHeartbeats and self.console and agent:
-      self.console.heartbeat(agent, timestamp)
+      self._heartbeatCallback(agent, timestamp)
     agent.update_schema_timestamp(values.get("schema_timestamp", 0))
 
 
@@ -1479,6 +1523,88 @@ class Session:
         return seq
     return None
 
+  def _newPackageCallback(self, pname):
+    """
+    Invokes the console.newPackage() callback if the callback is present and
+    the package is not filtered.
+    """
+    if self.console:
+      if len(self.class_filter) == 0 and len(self.event_filter) == 0:
+        self.console.newPackage(pname)
+      else:
+        for x in self.class_filter:
+          if x[0] == pname:
+            self.console.newPackage(pname)
+            return
+
+        for x in self.event_filter:
+          if x[0] == pname:
+            self.console.newPackage(pname)
+            return
+
+
+  def _newClassCallback(self, ctype, ckey):
+    """
+    Invokes the console.newClass() callback if the callback is present and the
+    class is not filtered.
+    """
+    if self.console:
+      if ctype == ClassKey.TYPE_DATA:
+        if (len(self.class_filter) == 0
+            or (ckey.getPackageName(), ckey.getClassName()) in self.class_filter):
+          self.console.newClass(ctype, ckey)
+      elif ctype == ClassKey.TYPE_EVENT:
+        if (len(self.event_filter) == 0
+            or (ckey.getPackageName(), ckey.getClassName()) in self.event_filter):
+          self.console.newClass(ctype, ckey)
+      else: # old class keys did not contain type info, check both filters
+        if ((len(self.class_filter) == 0 and len(self.event_filter) == 0)
+            or (ckey.getPackageName(), ckey.getClassName()) in self.class_filter
+            or (ckey.getPackageName(), ckey.getClassName()) in self.event_filter):
+          self.console.newClass(ctype, ckey)
+
+  def _agentAllowed(self, agentName, isV2):
+    """ True if the agent is NOT filtered.
+    """
+    if self.agent_filter:
+      if isV2:
+        v = agentName.split(":", 2)
+        return ((len(v) > 2 and (v[0], v[1], v[2]) in self.agent_filter)
+                or (len(v) > 1 and (v[0], v[1], None) in self.agent_filter)
+                or (v and (v[0], None, None) in self.agent_filter));
+      else:
+        return agentName in self.agent_filter
+    return True
+
+  def _heartbeatCallback(self, agent, timestamp):
+    """
+    Invokes the console.heartbeat() callback if the callback is present and the
+    agent is not filtered.
+    """
+    if self.console and self.rcvHeartbeats:
+      if ((agent.isV2 and self._agentAllowed(agent.agentBank, True))
+          or ((not agent.isV2) and self._agentAllowed(agent.label, False))):
+        self.console.heartbeat(agent, timestamp)
+
+  def _newAgentCallback(self, agent):
+    """
+    Invokes the console.newAgent() callback if the callback is present and the
+    agent is not filtered.
+    """
+    if self.console:
+      if ((agent.isV2 and self._agentAllowed(agent.agentBank, True))
+          or ((not agent.isV2) and self._agentAllowed(agent.label, False))):
+        self.console.newAgent(agent)
+
+  def _delAgentCallback(self, agent):
+    """
+    Invokes the console.delAgent() callback if the callback is present and the
+    agent is not filtered.
+    """
+    if self.console:
+      if ((agent.isV2 and self._agentAllowed(agent.agentBank, True))
+          or ((not agent.isV2) and self._agentAllowed(agent.label, False))):
+        self.console.delAgent(agent)
 
 #===================================================================================================
 # SessionGetRequest
@@ -2385,18 +2511,24 @@ class Broker(Thread):
       return False     # connection failed
 
   def _updateAgent(self, obj):
+    """
+    Just received an object of class "org.apache.qpid.broker:agent", which
+    represents a V1 agent.  Add or update the list of agent proxies.
+    """
     bankKey = str(obj.agentBank)
     agent = None
     if obj._deleteTime == 0:
       try:
         self.cv.acquire()
         if bankKey not in self.agents:
-          agent = Agent(self, obj.agentBank, obj.label)
-          self.agents[bankKey] = agent
+          # add new agent only if label is not filtered
+          if len(self.session.agent_filter) == 0 or obj.label in self.session.agent_filter:
+            agent = Agent(self, obj.agentBank, obj.label)
+            self.agents[bankKey] = agent
       finally:
         self.cv.release()
       if agent and self.session.console:
-        self.session.console.newAgent(agent)
+        self.session._newAgentCallback(agent)
     else:
       try:
         self.cv.acquire()
@@ -2406,7 +2538,7 @@ class Broker(Thread):
       finally:
         self.cv.release()
       if agent and self.session.console:
-        self.session.console.delAgent(agent)
+        self.session._delAgentCallback(agent)
 
   def _addAgent(self, name, agent):
     try:
@@ -2415,7 +2547,7 @@ class Broker(Thread):
     finally:
       self.cv.release()
     if self.session.console:
-      self.session.console.newAgent(agent)
+      self.session._newAgentCallback(agent)
 
   def _ageAgents(self):
     if (time() - self.last_age_check) < self.session.agent_heartbeat_min:
@@ -2437,7 +2569,7 @@ class Broker(Thread):
       self.cv.release()
     if self.session.console:
       for agent in to_notify:
-        self.session.console.delAgent(agent)
+        self.session._delAgentCallback(agent)
 
   def _v2SendAgentLocate(self, predicate={}):
     """
@@ -2517,8 +2649,8 @@ class Broker(Thread):
       self.cv.release()
 
     if self.session.console:
-      for agent in _agents:
-        self.session.console.delAgent(agent)
+      for agent in _agents.itervalues():
+        self.session._delAgentCallback(agent)
 
   def _shutdown(self, _timeout=10):
     """ Disconnect from a broker, and release its resources.   Errors are
@@ -2530,7 +2662,7 @@ class Broker(Thread):
       self.rcv_queue.put(Broker._q_item(Broker._q_item.type_wakeup, None))
       self.join(_timeout)
 
-    # abort any pending transactions
+    # abort any pending transactions and delete agents
     self._disconnect("broker shutdown")
 
     try:
@@ -2904,19 +3036,46 @@ class Agent:
     """
     if 'qmf_object' in kwargs:
       if self.session.console:
-        self.session.console.objectProps(self.broker, kwargs['qmf_object'])
+        obj = kwargs['qmf_object']
+        if len(self.session.class_filter) == 0:
+          self.session.console.objectProps(self.broker, obj)
+        elif obj.getClassKey():
+          # slow path: check classKey against event_filter
+          pname = obj.getClassKey().getPackageName()
+          cname = obj.getClassKey().getClassName()
+          if ((pname, cname) in self.session.class_filter
+              or (pname, None) in self.session.class_filter):
+            self.session.console.objectProps(self.broker, obj)
     elif 'qmf_object_stats' in kwargs:
       if self.session.console:
-        self.session.console.objectStats(self.broker, kwargs['qmf_object_stats'])
+        obj = kwargs['qmf_object_stats']
+        if len(self.session.class_filter) == 0:
+          self.session.console.objectStats(self.broker, obj)
+        elif obj.getClassKey():
+          # slow path: check classKey against event_filter
+          pname = obj.getClassKey().getPackageName()
+          cname = obj.getClassKey().getClassName()
+          if ((pname, cname) in self.session.class_filter
+              or (pname, None) in self.session.class_filter):
+            self.session.console.objectStats(self.broker, obj)
     elif 'qmf_event' in kwargs:
       if self.session.console:
-        self.session.console.event(self.broker, kwargs['qmf_event'])
+        event = kwargs['qmf_event']
+        if len(self.session.event_filter) == 0:
+          self.session.console.event(self.broker, event)
+        elif event.classKey:
+          # slow path: check classKey against event_filter
+          pname = event.classKey.getPackageName()
+          ename = event.classKey.getClassName()
+          if ((pname, ename) in self.session.event_filter
+              or (pname, None) in self.session.event_filter):
+            self.session.console.event(self.broker, event)
     elif 'qmf_schema_id' in kwargs:
       ckey = kwargs['qmf_schema_id']
       new_pkg, new_cls = self.session.schemaCache.declareClass(ckey)
       if self.session.console:
         if new_pkg:
-          self.session.console.newPackage(ckey.getPackageName())
+          self.session._newPackageCallback(ckey.getPackageName())
         if new_cls:
           # translate V2's string based type value to legacy
           # integer value for backward compatibility
@@ -2925,7 +3084,7 @@ class Agent:
             cls_type = 1
           elif str(cls_type) == ckey.TYPE_EVENT:
             cls_type = 2
-          self.session.console.newClass(cls_type, ckey)
+          self.session._newClassCallback(cls_type, ckey)
 
   def touch(self):
     if self.heartbeatInterval:



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org