You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by tr...@apache.org on 2010/01/11 21:46:07 UTC
svn commit: r898057 [3/3] - in /qpid/branches/qmfv2/qpid/python/qmf:
qmfAgent.py qmfCommon.py qmfConsole.py test/agent_test.py
test/console_test.py
Modified: qpid/branches/qmfv2/qpid/python/qmf/qmfConsole.py
URL: http://svn.apache.org/viewvc/qpid/branches/qmfv2/qpid/python/qmf/qmfConsole.py?rev=898057&r1=898056&r2=898057&view=diff
==============================================================================
--- qpid/branches/qmfv2/qpid/python/qmf/qmfConsole.py (original)
+++ qpid/branches/qmfv2/qpid/python/qmf/qmfConsole.py Mon Jan 11 20:46:07 2010
@@ -30,10 +30,9 @@
from qpid.messaging import *
-from qmfCommon import (AMQP_QMF_DIRECT, AMQP_QMF_NAME_SEPARATOR, AMQP_QMF_AGENT_INDICATION,
- AMQP_QMF_AGENT_LOCATE, AgentId, makeSubject, parseSubject, OpCode,
- QmfQuery, AgentIdFactory, Notifier, QmfQueryPredicate, MsgKey,
- QmfData)
+from qmfCommon import (makeSubject, parseSubject, OpCode, QmfQuery, Notifier,
+ QmfQueryPredicate, MsgKey, QmfData, QmfAddress,
+ AMQP_QMF_AGENT_LOCATE, AMQP_QMF_AGENT_INDICATION)
@@ -186,69 +185,106 @@
return False
+##==============================================================================
+## DATA MODEL
+##==============================================================================
+
-#class ObjectProxy(QmfObject):
-class ObjectProxy(object):
+class QmfConsoleData(QmfData):
"""
- A local representation of a QmfObject that is managed by a remote agent.
+ Console's representation of an managed QmfData instance.
"""
- def __init__(self, agent, cls, kwargs={}):
+ def __init__(self, map_, agent, _schema=None):
+ super(QmfConsoleData, self).__init__(_map=map_,
+ _schema=_schema,
+ _const=True)
+ self._agent = agent
+
+ def get_timestamps(self):
"""
- @type agent: qmfConsole.AgentProxy
- @param agent: Agent that manages this object.
- @type cls: qmfCommon.SchemaObjectClass
- @param cls: Schema that describes the class.
- @type kwargs: dict
- @param kwargs: ??? supported keys ???
+ Returns a list of timestamps describing the lifecycle of
+ the object. All timestamps are represented by the AMQP
+ timestamp type. [0] = time of last update from Agent,
+ [1] = creation timestamp
+ [2] = deletion timestamp, or zero if not
+ deleted.
"""
- # QmfObject.__init__(self, cls, kwargs)
- self._agent = agent
+ return [self._utime, self._ctime, self._dtime]
+
+ def get_create_time(self):
+ """
+ returns the creation timestamp
+ """
+ return self._ctime
- # def update(self):
- def refresh(self, timeout = None):
+ def get_update_time(self):
+ """
+ returns the update timestamp
"""
- Called to re-fetch the current state of the object from the agent. This updates
- the contents of the object to their most current values.
+ return self._utime
- @rtype: bool
- @return: True if refresh succeeded. Refresh may fail if agent does not respond.
+ def get_delete_time(self):
+ """
+ returns the deletion timestamp, or zero if not yet deleted.
"""
- if not self._agent:
- raise Exception("No Agent associated with this object")
- newer = self._agent.get_object(QmfQuery({"object_id":None}), timeout)
- if newer == None:
- logging.error("Failed to retrieve object %s from agent %s" % (str(self), str(self._agent)))
- raise Exception("Failed to retrieve object %s from agent %s" % (str(self), str(self._agent)))
- #self.mergeUpdate(newer) ### ??? in Rafi's console.py::Object Class
+ return self._dtime
- ### def _merge_update(self, newerObject):
- ### ??? in Rafi's console.py::Object Class
+ def is_deleted(self):
+ """
+ True if deletion timestamp not zero.
+ """
+ return self._dtime != long(0)
+ def refresh(self, _reply_handle=None, _timeout=None):
+ """
+ request that the Agent update the value of this object's
+ contents.
+ """
+ logging.error(" TBD!!!")
+ return None
- ### def is_deleted(self):
- ### ??? in Rafi's console.py::Object Class
+ def invoke_method(self, name, _in_args=None, _reply_handle=None,
+ _timeout=None):
+ """
+ invoke the named method.
+ """
+ logging.error(" TBD!!!")
+ return None
- def key(self): pass
+class QmfLocalData(QmfData):
+ """
+ Console's representation of an unmanaged QmfData instance. There
+ is no remote agent associated with this instance. The Console has
+ full control over this instance.
+ """
+ def __init__(self, values, _subtypes={}, _tag=None, _object_id=None,
+ _schema=None):
+ # timestamp in millisec since epoch UTC
+ ctime = long(time.time() * 1000)
+ super(QmfLocalData, self).__init__(_values=values,
+ _subtypes=_subtypes, _tag=_tag,
+ _object_id=_object_id,
+ _schema=_schema, _ctime=ctime,
+ _utime=ctime, _const=False)
class Agent(object):
"""
A local representation of a remote agent managed by this console.
"""
- def __init__(self, agent_id, console):
+ def __init__(self, name, console):
"""
@type name: AgentId
@param name: uniquely identifies this agent in the AMQP domain.
"""
- if not isinstance(agent_id, AgentId):
- raise TypeError("parameter must be an instance of class AgentId")
+
if not isinstance(console, Console):
raise TypeError("parameter must be an instance of class Console")
- self._id = agent_id
- self._address = AMQP_QMF_DIRECT + AMQP_QMF_NAME_SEPARATOR + str(agent_id)
+ self._name = name
+ self._address = QmfAddress.direct(name, console._domain)
self._console = console
self._sender = None
self._packages = {} # map of {package-name:[list of class-names], } for this agent
@@ -257,8 +293,8 @@
logging.debug( "Created Agent with address: [%s]" % self._address )
- def getAgentId(self):
- return self._id
+ def get_name(self):
+ return self._name
def isActive(self):
return self._announce_timestamp != None
@@ -267,7 +303,7 @@
"""
Low-level routine to asynchronously send a message to this agent.
"""
- msg.reply_to = self._console.address()
+ msg.reply_to = str(self._console._address)
# handle = self._console._req_correlation.allocate()
# if handle == 0:
# raise Exception("Can not allocate a correlation id!")
@@ -329,7 +365,7 @@
pass
def __repr__(self):
- return self._address
+ return str(self._address)
def __str__(self):
return self.__repr__()
@@ -339,7 +375,7 @@
"""
msg = Message(subject=makeSubject(OpCode.get_query),
properties={"method":"request"},
- content={MsgKey.query: query.mapEncode()})
+ content={MsgKey.query: query.map_encode()})
self._sendMsg( msg, correlation_id )
@@ -389,7 +425,7 @@
"""
A Console manages communications to a collection of agents on behalf of an application.
"""
- def __init__(self, name=None, notifier=None,
+ def __init__(self, name=None, _domain=None, notifier=None,
reply_timeout = 60,
# agent_timeout = 120,
agent_timeout = 60,
@@ -403,10 +439,12 @@
@param kwargs: ??? Unused
"""
Thread.__init__(self)
- self._name = name
- if not self._name:
+ if not name:
self._name = "qmfc-%s.%d" % (platform.node(), os.getpid())
- self._address = AMQP_QMF_DIRECT + AMQP_QMF_NAME_SEPARATOR + self._name
+ else:
+ self._name = str(name)
+ self._domain = _domain
+ self._address = QmfAddress.direct(self._name, self._domain)
self._notifier = notifier
self._lock = Lock()
self._conn = None
@@ -467,10 +505,24 @@
raise Exception( "Multiple connections per Console not supported." );
self._conn = conn
self._session = conn.session(name=self._name)
- self._direct_recvr = self._session.receiver(self._address, capacity=1)
- self._announce_recvr = self._session.receiver(AMQP_QMF_AGENT_INDICATION,
+ self._direct_recvr = self._session.receiver(str(self._address) +
+ ";{create:always,"
+ " node-properties:"
+ " {type:topic,"
+ " x-properties:"
+ " {type:direct}}}",
+ capacity=1)
+ ind_addr = QmfAddress.topic(AMQP_QMF_AGENT_INDICATION, self._domain)
+ logging.debug("agent.ind addr=%s" % ind_addr)
+ self._announce_recvr = self._session.receiver(str(ind_addr) +
+ ";{create:always,"
+ " node-properties:{type:topic}}",
capacity=1)
- self._locate_sender = self._session.sender(AMQP_QMF_AGENT_LOCATE)
+ locate_addr = QmfAddress.topic(AMQP_QMF_AGENT_LOCATE, self._domain)
+ logging.debug("agent.locate addr=%s" % locate_addr)
+ self._locate_sender = self._session.sender(str(locate_addr) +
+ ";{create:always,"
+ " node-properties:{type:topic}}")
#
# Now that receivers are created, fire off the receive thread...
#
@@ -495,7 +547,7 @@
if self.isAlive():
# kick my thread to wake it up
logging.debug("Making temp sender for [%s]" % self._address)
- tmp_sender = self._session.sender(self._address)
+ tmp_sender = self._session.sender(str(self._address))
try:
msg = Message(subject=makeSubject(OpCode.noop))
tmp_sender.send( msg, sync=True )
@@ -535,21 +587,17 @@
finally:
self._lock.release()
-
-
-
- def findAgent(self, agent_id, timeout=None ):
+ def findAgent(self, name, timeout=None ):
"""
Given the id of a particular agent, return an instance of class Agent
representing that agent. Return None if the agent does not exist.
"""
- if not isinstance(agent_id, AgentId):
- raise TypeError("parameter must be an instance of class AgentId")
self._lock.acquire()
try:
- if agent_id in self._agent_map:
- return self._agent_map[agent_id]
+ agent = self._agent_map.get(name)
+ if agent:
+ return agent
finally:
self._lock.release()
@@ -559,17 +607,21 @@
if handle == 0:
raise Exception("Can not allocate a correlation id!")
try:
- tmp_sender = self._session.sender(AMQP_QMF_DIRECT + AMQP_QMF_NAME_SEPARATOR + str(agent_id))
+ tmp_sender = self._session.sender(str(QmfAddress.direct(name,
+ self._domain))
+ + ";{create:always,"
+ " node-properties:"
+ " {type:topic,"
+ " x-properties:"
+ " {type:direct}}}")
+
query = QmfQuery({QmfQuery._TARGET: {QmfQuery._TARGET_AGENT:None},
QmfQuery._PREDICATE:
- {QmfQuery._LOGIC_AND:
- [{QmfQuery._CMP_EQ: ["vendor", agent_id.vendor()]},
- {QmfQuery._CMP_EQ: ["product", agent_id.product()]},
- {QmfQuery._CMP_EQ: ["name", agent_id.name()]}]}})
+ {QmfQuery._CMP_EQ: ["_name", name]}})
msg = Message(subject=makeSubject(OpCode.agent_locate),
properties={"method":"request"},
- content={MsgKey.query: query.mapEncode()})
- msg.reply_to = self._address
+ content={MsgKey.query: query.map_encode()})
+ msg.reply_to = str(self._address)
msg.correlation_id = str(handle)
logging.debug("Sending Agent Locate (%s)" % time.time())
tmp_sender.send( msg )
@@ -588,13 +640,43 @@
logging.debug("Agent Locate wait ended (%s)" % time.time())
self._lock.acquire()
try:
- if agent_id in self._agent_map:
- new_agent = self._agent_map[agent_id]
+ new_agent = self._agent_map.get(name)
finally:
self._lock.release()
return new_agent
+ def doQuery(self, agent, query, timeout=None ):
+ """
+ """
+
+ handle = self._req_correlation.allocate()
+ if handle == 0:
+ raise Exception("Can not allocate a correlation id!")
+ try:
+ logging.debug("Sending Query to Agent (%s)" % time.time())
+ agent._sendQuery(query, handle)
+ except SendError, e:
+ logging.error(str(e))
+ self._req_correlation.release(handle)
+ return None
+
+ if not timeout:
+ timeout = self._reply_timeout
+
+ logging.debug("Waiting for response to Query (%s)" % timeout)
+ reply = self._req_correlation.get_data(handle, timeout)
+ self._req_correlation.release(handle)
+ logging.debug("Agent Query wait ended (%s)" % time.time())
+ if reply:
+ print("Agent Query Reply='%s'" % reply)
+ return reply.content
+ else:
+ print("Agent Query FAILED!!!")
+ return None
+
+
+
def run(self):
global _callback_thread
#
@@ -607,7 +689,6 @@
try:
msg = self._announce_recvr.fetch(timeout = 0)
if msg:
- logging.error( "Announce Msg Rcvd@%s: [%s]" % (time.time(), msg) )
self._dispatch(msg, _direct=False)
except Empty:
pass
@@ -615,7 +696,6 @@
try:
msg = self._direct_recvr.fetch(timeout = 0)
if msg:
- logging.error( "Direct Msg Rcvd@%s: [%s]" % (time.time(), msg) )
self._dispatch(msg, _direct=True)
except Empty:
pass
@@ -654,6 +734,9 @@
"""
PRIVATE: Process a message received from an Agent
"""
+
+ logging.error( "Message received from Agent! [%s]" % msg )
+
try:
version,opcode = parseSubject(msg.subject)
# @todo: deal with version mismatch!!!
@@ -670,7 +753,7 @@
if opcode == OpCode.agent_ind:
self._handleAgentIndMsg( msg, cmap, version, _direct )
elif opcode == OpCode.data_ind:
- logging.warning("!!! data_ind TBD !!!")
+ self._handleDataIndMsg(msg, cmap, version, _direct)
elif opcode == OpCode.event_ind:
logging.warning("!!! event_ind TBD !!!")
elif opcode == OpCode.managed_object:
@@ -696,7 +779,8 @@
if MsgKey.agent_info in cmap:
try:
- agent_id = AgentIdFactory(cmap[MsgKey.agent_info])
+ # TODO: fix
+ name = cmap[MsgKey.agent_info]["_name"]
except:
logging.warning("Bad agent-ind message received: '%s'" % msg)
return
@@ -709,21 +793,22 @@
if direct and correlated:
ignore = False
elif self._agent_discovery_filter:
- matched = self._agent_discovery_filter.evaluate(QmfData(agent_id.mapEncode()))
- ignore = not matched
+ logging.error("FIXME: agent discovery filter - new agent name style")
+ # matched = self._agent_discovery_filter.evaluate(QmfData(agent_id.mapEncode()))
+ # ignore = not matched
+ matched = True; ignore = False # for now
if not ignore:
agent = None
self._lock.acquire()
try:
- if agent_id in self._agent_map:
- agent = self._agent_map[agent_id]
+ agent = self._agent_map.get(name)
finally:
self._lock.release()
if not agent:
# need to create and add a new agent
- agent = self._createAgent(agent_id)
+ agent = self._createAgent(name)
# lock out expiration scanning code
self._lock.acquire()
@@ -746,6 +831,22 @@
+
+ def _handleDataIndMsg(self, msg, cmap, version, direct):
+ """
+ Process a received data-ind message.
+ """
+ logging.debug("_handleDataIndMsg '%s' (%s)" % (msg, time.time()))
+
+ if not self._req_correlation.isValid(msg.correlation_id):
+ logging.error("FIXME: uncorrelated data indicate??? msg='%s'" % str(msg))
+ return
+
+ # wake up all waiters
+ logging.error("waking waiters for correlation id %s" % msg.correlation_id)
+ self._req_correlation.put_data(msg.correlation_id, msg)
+
+
def _expireAgents(self):
"""
Check for expired agents and issue notifications when they expire.
@@ -777,21 +878,26 @@
- def _createAgent( self, agent_id ):
+ def _createAgent( self, name ):
"""
Factory to create/retrieve an agent for this console
"""
- if not isinstance(agent_id, AgentId):
- raise TypeError("parameter must be an instance of class AgentId")
self._lock.acquire()
try:
- if agent_id in self._agent_map:
- return self._agent_map[agent_id]
+ agent = self._agent_map.get(name)
+ if agent:
+ return agent
+
+ agent = Agent(name, self)
+ agent._sender = self._session.sender(str(agent._address) +
+ ";{create:always,"
+ " node-properties:"
+ " {type:topic,"
+ " x-properties:"
+ " {type:direct}}}")
- agent = Agent(agent_id, self)
- agent._sender = self._session.sender(agent._address)
- self._agent_map[agent_id] = agent
+ self._agent_map[name] = agent
finally:
self._lock.release()
@@ -1235,33 +1341,13 @@
if __name__ == '__main__':
# temp test code
- from qmfCommon import (SchemaEventClassFactory, qmfTypes, SchemaPropertyFactory,
- SchemaObjectClassFactory, ObjectIdFactory, QmfDescribed,
- QmfDescribedFactory, QmfManaged, QmfManagedFactory, QmfDataFactory,
- QmfEvent)
- logging.getLogger().setLevel(logging.INFO)
-
- logging.info( "Starting Connection" )
- _c = Connection("localhost")
- _c.connect()
- #c.start()
-
- logging.info( "Starting Console" )
- _myConsole = Console()
- _myConsole.addConnection( _c )
-
- logging.info( "Finding Agent" )
- _myAgent = _myConsole.findAgent( AgentId( "redhat.com", "agent", "tross" ), 5 )
+ from qmfCommon import (qmfTypes, QmfData,
+ QmfEvent, SchemaClassId, SchemaEventClass,
+ SchemaProperty, SchemaObjectClass)
- logging.info( "Agent Found: %s" % _myAgent )
-
- logging.info( "Removing connection" )
- _myConsole.removeConnection( _c, 10 )
-
- logging.info( "Destroying console:" )
- _myConsole.destroy( 10 )
+ logging.getLogger().setLevel(logging.INFO)
- logging.info( "************* Starting Async Console **************" )
+ logging.info( "************* Creating Async Console **************" )
class MyNotifier(Notifier):
def __init__(self, context):
@@ -1275,239 +1361,179 @@
_noteMe = MyNotifier( 666 )
_myConsole = Console(notifier=_noteMe)
- _myConsole.addConnection( _c )
_myConsole.enableAgentDiscovery()
logging.info("Waiting...")
- while not _noteMe.WorkAvailable:
- try:
- print("No work yet...sleeping!")
- time.sleep(1)
- except KeyboardInterrupt:
- break
-
-
- print("Work available = %d items!" % _myConsole.getWorkItemCount())
- _wi = _myConsole.getNextWorkItem(timeout=0)
- while _wi:
- print("work item %d:%s" % (_wi.getType(), str(_wi.getParams())))
- _wi = _myConsole.getNextWorkItem(timeout=0)
-
-
- logging.info( "Removing connection" )
- _myConsole.removeConnection( _c, 10 )
-
logging.info( "Destroying console:" )
_myConsole.destroy( 10 )
logging.info( "******** Messing around with Schema ********" )
- _sec = SchemaEventClassFactory( { "schema_id": # SchemaClassId map
- {"package_name": "myPackage",
- "class_name": "myClass",
- "type": "event"},
- "desc": "A typical event schema",
- "properties": {"Argument-1":
- {"amqp_type": qmfTypes.TYPE_UINT8,
- "min": 0,
- "max": 100,
- "unit": "seconds",
- "desc": "sleep value"},
- "Argument-2":
- {"amqp_type": qmfTypes.TYPE_LSTR,
- "maxlen": 100,
- "desc": "a string argument"}}} )
- print("_sec=%s" % _sec.getClassId())
- print("_sec.gePropertyCount()=%d" % _sec.getPropertyCount() )
- print("_sec.getProperty('Argument-1`)=%s" % _sec.getProperty('Argument-1') )
- print("_sec.getProperty('Argument-2`)=%s" % _sec.getProperty('Argument-2') )
+ _sec = SchemaEventClass( _classId=SchemaClassId("myPackage", "myClass",
+ stype=SchemaClassId.TYPE_EVENT),
+ _desc="A typical event schema",
+ _props={"Argument-1": SchemaProperty(_type_code=qmfTypes.TYPE_UINT8,
+ kwargs = {"min":0,
+ "max":100,
+ "unit":"seconds",
+ "desc":"sleep value"}),
+ "Argument-2": SchemaProperty(_type_code=qmfTypes.TYPE_LSTR,
+ kwargs={"maxlen":100,
+ "desc":"a string argument"})})
+ print("_sec=%s" % _sec.get_class_id())
+ print("_sec.gePropertyCount()=%d" % _sec.get_property_count() )
+ print("_sec.getProperty('Argument-1`)=%s" % _sec.get_property('Argument-1') )
+ print("_sec.getProperty('Argument-2`)=%s" % _sec.get_property('Argument-2') )
try:
- print("_sec.getProperty('not-found')=%s" % _sec.getProperty('not-found') )
+ print("_sec.getProperty('not-found')=%s" % _sec.get_property('not-found') )
except:
pass
- print("_sec.getProperties()='%s'" % _sec.getProperties())
+ print("_sec.getProperties()='%s'" % _sec.get_properties())
print("Adding another argument")
- _arg3 = SchemaPropertyFactory( { "amqp_type": qmfTypes.TYPE_BOOL,
- "dir": "IO",
- "desc": "a boolean argument"} )
- _sec.addProperty('Argument-3', _arg3)
- print("_sec=%s" % _sec.getClassId())
- print("_sec.getPropertyCount()=%d" % _sec.getPropertyCount() )
- print("_sec.getProperty('Argument-1')=%s" % _sec.getProperty('Argument-1') )
- print("_sec.getProperty('Argument-2')=%s" % _sec.getProperty('Argument-2') )
- print("_sec.getProperty('Argument-3')=%s" % _sec.getProperty('Argument-3') )
+ _arg3 = SchemaProperty( _type_code=qmfTypes.TYPE_BOOL,
+ kwargs={"dir":"IO",
+ "desc":"a boolean argument"})
+ _sec.add_property('Argument-3', _arg3)
+ print("_sec=%s" % _sec.get_class_id())
+ print("_sec.getPropertyCount()=%d" % _sec.get_property_count() )
+ print("_sec.getProperty('Argument-1')=%s" % _sec.get_property('Argument-1') )
+ print("_sec.getProperty('Argument-2')=%s" % _sec.get_property('Argument-2') )
+ print("_sec.getProperty('Argument-3')=%s" % _sec.get_property('Argument-3') )
- print("_arg3.mapEncode()='%s'" % _arg3.mapEncode() )
+ print("_arg3.mapEncode()='%s'" % _arg3.map_encode() )
- _secmap = _sec.mapEncode()
+ _secmap = _sec.map_encode()
print("_sec.mapEncode()='%s'" % _secmap )
- _sec2 = SchemaEventClassFactory( _secmap )
+ _sec2 = SchemaEventClass( _map=_secmap )
- print("_sec=%s" % _sec.getClassId())
- print("_sec2=%s" % _sec2.getClassId())
+ print("_sec=%s" % _sec.get_class_id())
+ print("_sec2=%s" % _sec2.get_class_id())
-
-
-
- _soc = SchemaObjectClassFactory( {"schema_id": {"package_name": "myOtherPackage",
- "class_name": "myOtherClass",
- "type": "data"},
- "desc": "A test data object",
- "properties":
- {"prop1": {"amqp_type": qmfTypes.TYPE_UINT8,
- "access": "RO",
- "index": True,
- "unit": "degrees"},
- "prop2": {"amqp_type": qmfTypes.TYPE_UINT8,
- "access": "RW",
- "index": True,
- "desc": "The Second Property(tm)",
- "unit": "radians"},
- "statistics": { "amqp_type": qmfTypes.TYPE_DELTATIME,
- "unit": "seconds",
- "desc": "time until I retire"}},
- "methods":
- {"meth1": {"desc": "A test method",
- "arguments":
- {"arg1": {"amqp_type": qmfTypes.TYPE_UINT32,
- "desc": "an argument 1",
- "dir": "I"},
- "arg2": {"amqp_type": qmfTypes.TYPE_BOOL,
- "dir": "IO",
- "desc": "some weird boolean"}}},
- "meth2": {"desc": "A test method",
- "arguments":
- {"m2arg1": {"amqp_type": qmfTypes.TYPE_UINT32,
- "desc": "an 'nuther argument",
- "dir": "I"}}}},
- "primary_key": ["prop2", "prop1"]})
+ _soc = SchemaObjectClass( _map = {"_schema_id": {"_package_name": "myOtherPackage",
+ "_class_name": "myOtherClass",
+ "_type": "_data"},
+ "_desc": "A test data object",
+ "_values":
+ {"prop1": {"amqp_type": qmfTypes.TYPE_UINT8,
+ "access": "RO",
+ "index": True,
+ "unit": "degrees"},
+ "prop2": {"amqp_type": qmfTypes.TYPE_UINT8,
+ "access": "RW",
+ "index": True,
+ "desc": "The Second Property(tm)",
+ "unit": "radians"},
+ "statistics": { "amqp_type": qmfTypes.TYPE_DELTATIME,
+ "unit": "seconds",
+ "desc": "time until I retire"},
+ "meth1": {"desc": "A test method",
+ "arguments":
+ {"arg1": {"amqp_type": qmfTypes.TYPE_UINT32,
+ "desc": "an argument 1",
+ "dir": "I"},
+ "arg2": {"amqp_type": qmfTypes.TYPE_BOOL,
+ "dir": "IO",
+ "desc": "some weird boolean"}}},
+ "meth2": {"desc": "A test method",
+ "arguments":
+ {"m2arg1": {"amqp_type": qmfTypes.TYPE_UINT32,
+ "desc": "an 'nuther argument",
+ "dir":
+ "I"}}}},
+ "_subtypes":
+ {"prop1":"qmfProperty",
+ "prop2":"qmfProperty",
+ "statistics":"qmfProperty",
+ "meth1":"qmfMethod",
+ "meth2":"qmfMethod"},
+ "_primary_key_names": ["prop2", "prop1"]})
print("_soc='%s'" % _soc)
- print("_soc.getPrimaryKeyList='%s'" % _soc.getPrimaryKeyList())
+ print("_soc.getPrimaryKeyList='%s'" % _soc.get_id_names())
- print("_soc.getPropertyCount='%d'" % _soc.getPropertyCount())
- print("_soc.getProperties='%s'" % _soc.getProperties())
- print("_soc.getProperty('prop2')='%s'" % _soc.getProperty('prop2'))
+ print("_soc.getPropertyCount='%d'" % _soc.get_property_count())
+ print("_soc.getProperties='%s'" % _soc.get_properties())
+ print("_soc.getProperty('prop2')='%s'" % _soc.get_property('prop2'))
- print("_soc.getMethodCount='%d'" % _soc.getMethodCount())
- print("_soc.getMethods='%s'" % _soc.getMethods())
- print("_soc.getMethod('meth2')='%s'" % _soc.getMethod('meth2'))
+ print("_soc.getMethodCount='%d'" % _soc.get_method_count())
+ print("_soc.getMethods='%s'" % _soc.get_methods())
+ print("_soc.getMethod('meth2')='%s'" % _soc.get_method('meth2'))
- _socmap = _soc.mapEncode()
+ _socmap = _soc.map_encode()
print("_socmap='%s'" % _socmap)
- _soc2 = SchemaObjectClassFactory( _socmap )
+ _soc2 = SchemaObjectClass( _map=_socmap )
print("_soc='%s'" % _soc)
print("_soc2='%s'" % _soc2)
- if _soc2.getClassId() == _soc.getClassId():
+ if _soc2.get_class_id() == _soc.get_class_id():
print("soc and soc2 are the same schema")
logging.info( "******** Messing around with ObjectIds ********" )
- oid = ObjectIdFactory( {"agent_id": {"vendor": "redhat.com",
- "product": "mgmt-tool",
- "name": "myAgent1"},
- "primary_key": "key1:key2" })
-
- print("oid = %s" % oid)
-
- oid2 = ObjectIdFactory( oid.mapEncode() )
-
- print("oid2 = %s" % oid2)
- if oid == oid2:
- print("oid1 == oid2")
- else:
- print("oid1 != oid2")
-
- hashme = {oid: "myoid"}
- print("oid hash = %s" % hashme[oid2] )
-
-
- qd = QmfData( {"prop1":1, "prop2":True, "prop3": {"a":"map"}, "prop4": "astring"} )
+ qd = QmfData( _values={"prop1":1, "prop2":True, "prop3": {"a":"map"}, "prop4": "astring"} )
print("qd='%s':" % qd)
print("prop1=%d prop2=%s prop3=%s prop4=%s" % (qd.prop1, qd.prop2, qd.prop3, qd.prop4))
- print("qd map='%s'" % qd.mapEncode())
- print("qd getProperty('prop4')='%s'" % qd.getProperty("prop4"))
- qd.setProperty("prop4", 4)
- print("qd setProperty('prop4', 4)='%s'" % qd.getProperty("prop4"))
+ print("qd map='%s'" % qd.map_encode())
+ print("qd getProperty('prop4')='%s'" % qd.get_value("prop4"))
+ qd.set_value("prop4", 4, "A test property called 4")
+ print("qd setProperty('prop4', 4)='%s'" % qd.get_value("prop4"))
qd.prop4 = 9
print("qd.prop4 = 9 ='%s'" % qd.prop4)
qd["prop4"] = 11
print("qd[prop4] = 11 ='%s'" % qd["prop4"])
- print("qd.mapEncode()='%s'" % qd.mapEncode())
- _qd2 = QmfDataFactory( qd.mapEncode() )
- print("_qd2.mapEncode()='%s'" % _qd2.mapEncode())
+ print("qd.mapEncode()='%s'" % qd.map_encode())
+ _qd2 = QmfData( _map = qd.map_encode() )
+ print("_qd2.mapEncode()='%s'" % _qd2.map_encode())
- _qmfDesc1 = QmfDescribed( _schemaId = _soc.getClassId(),
- _props = {"prop1": 1, "statistics": 666, "prop2": 0})
+ _qmfDesc1 = QmfConsoleData( {"_values" : {"prop1": 1, "statistics": 666,
+ "prop2": 0}},
+ agent="some agent name?",
+ _schema = _soc)
- print("_qmfDesc1 map='%s'" % _qmfDesc1.mapEncode())
+ print("_qmfDesc1 map='%s'" % _qmfDesc1.map_encode())
- _qmfDesc1.setSchema( _soc )
+ _qmfDesc1._set_schema( _soc )
- print("_qmfDesc1 props{} = '%s'" % _qmfDesc1.getProperties())
- print("_qmfDesc1 primarykey = '%s'" % _qmfDesc1.getPrimaryKey())
- print("_qmfDesc1 classid = '%s'" % _qmfDesc1.getSchemaClassId())
+ print("_qmfDesc1 prop2 = '%s'" % _qmfDesc1.get_value("prop2"))
+ print("_qmfDesc1 primarykey = '%s'" % _qmfDesc1.get_object_id())
+ print("_qmfDesc1 classid = '%s'" % _qmfDesc1.get_schema_class_id())
- _qmfDescMap = _qmfDesc1.mapEncode()
+ _qmfDescMap = _qmfDesc1.map_encode()
print("_qmfDescMap='%s'" % _qmfDescMap)
- _qmfDesc2 = QmfDescribedFactory( _qmfDescMap, _schema=_soc )
-
- print("_qmfDesc2 map='%s'" % _qmfDesc2.mapEncode())
- print("_qmfDesc2 props = '%s'" % _qmfDesc2.getProperties())
- print("_qmfDesc2 primary key = '%s'" % _qmfDesc2.getPrimaryKey())
-
-
- _qmfMgd1 = QmfManaged( _agentId=AgentId("redhat.com", "anAgent", "tross"),
- _schema = _soc,
- _schemaId = _soc.getClassId(),
- _props = {"prop1": 11, "prop2": 10, "statistics":999})
-
-
- print("_qmfMgd1 map='%s'" % _qmfMgd1.mapEncode())
-
- print("_qmfMgd1.getObjectId()='%s'" % _qmfMgd1.getObjectId())
- print("_qmfMgd1 props = '%s'" % _qmfMgd1.getProperties())
-
- _qmfMgd1Map = _qmfMgd1.mapEncode()
- print("_qmfMgd1Map='%s'" % _qmfMgd1Map)
-
- _qmfMgd2 = QmfManagedFactory( param=_qmfMgd1.mapEncode(), _schema=_soc )
+ _qmfDesc2 = QmfData( _map=_qmfDescMap, _schema=_soc )
- print("_qmfMgd2 map='%s'" % _qmfMgd2.mapEncode())
- print("_qmfMgd2 getObjectId() = '%s'" % _qmfMgd2.getObjectId())
- print("_qmfMgd2 props = '%s'" % _qmfMgd2.getProperties())
+ print("_qmfDesc2 map='%s'" % _qmfDesc2.map_encode())
+ print("_qmfDesc2 prop2 = '%s'" % _qmfDesc2.get_value("prop2"))
+ print("_qmfDesc2 primary key = '%s'" % _qmfDesc2.get_object_id())
logging.info( "******** Messing around with QmfEvents ********" )
_qmfevent1 = QmfEvent( _timestamp = 1111,
- _agentId = AgentId("redhat.com", "whizzbang2000", "ted"),
- _schema = _sec,
- _props = {"Argument-1": 77,
- "Argument-3": True,
- "Argument-2": "a string"})
- print("_qmfevent1.mapEncode()='%s'" % _qmfevent1.mapEncode())
- print("_qmfevent1.getTimestamp()='%s'" % _qmfevent1.getTimestamp())
- print("_qmfevent1.getAgentId()='%s'" % _qmfevent1.getAgentId())
+ _schema = _sec,
+ _values = {"Argument-1": 77,
+ "Argument-3": True,
+ "Argument-2": "a string"})
+ print("_qmfevent1.mapEncode()='%s'" % _qmfevent1.map_encode())
+ print("_qmfevent1.getTimestamp()='%s'" % _qmfevent1.get_timestamp())
- _qmfevent1Map = _qmfevent1.mapEncode()
+ _qmfevent1Map = _qmfevent1.map_encode()
- _qmfevent2 = QmfEvent(_map=_qmfevent1Map)
- print("_qmfevent2.mapEncode()='%s'" % _qmfevent2.mapEncode())
+ _qmfevent2 = QmfEvent(_map=_qmfevent1Map, _schema=_sec)
+ print("_qmfevent2.mapEncode()='%s'" % _qmfevent2.map_encode())
logging.info( "******** Messing around with Queries ********" )
Modified: qpid/branches/qmfv2/qpid/python/qmf/test/agent_test.py
URL: http://svn.apache.org/viewvc/qpid/branches/qmfv2/qpid/python/qmf/test/agent_test.py?rev=898057&r1=898056&r2=898057&view=diff
==============================================================================
--- qpid/branches/qmfv2/qpid/python/qmf/test/agent_test.py (original)
+++ qpid/branches/qmfv2/qpid/python/qmf/test/agent_test.py Mon Jan 11 20:46:07 2010
@@ -4,10 +4,8 @@
from qpid.messaging import *
-from qmfCommon import (AgentId, SchemaEventClassFactory, qmfTypes, SchemaProperty,
- SchemaObjectClass, ObjectIdFactory, QmfData, QmfDescribed,
- QmfDescribedFactory, QmfManaged, QmfManagedFactory, QmfDataFactory,
- QmfEvent, SchemaMethod, Notifier)
+from qmfCommon import (qmfTypes, SchemaProperty, SchemaObjectClass, QmfData,
+ QmfEvent, SchemaMethod, Notifier, SchemaClassId)
from qmfAgent import (Agent, QmfAgentData)
@@ -20,9 +18,9 @@
self._sema4.release()
def waitForWork(self):
- logging.error("Waiting for event...")
+ print("Waiting for event...")
self._sema4.acquire()
- logging.error("...event present")
+ print("...event present")
@@ -31,58 +29,54 @@
#
_notifier = ExampleNotifier()
-_agent = Agent( "redhat.com", "qmf", "testAgent", _notifier )
+_agent = Agent( "qmf.testAgent", _notifier=_notifier )
# Dynamically construct a class schema
-_schema = SchemaObjectClass( "MyPackage", "MyClass",
- desc="A test data schema",
- _pkey=["index1", "index2"] )
+_schema = SchemaObjectClass( _classId=SchemaClassId("MyPackage", "MyClass"),
+ _desc="A test data schema",
+ _object_id_names=["index1", "index2"] )
# add properties
-_schema.addProperty( "index1",
- SchemaProperty(qmfTypes.TYPE_UINT8))
-_schema.addProperty( "index2",
- SchemaProperty(qmfTypes.TYPE_LSTR))
+_schema.add_property( "index1", SchemaProperty(qmfTypes.TYPE_UINT8))
+_schema.add_property( "index2", SchemaProperty(qmfTypes.TYPE_LSTR))
+
# these two properties are statistics
-_schema.addProperty( "query_count",
- SchemaProperty(qmfTypes.TYPE_UINT32))
-_schema.addProperty( "method_call_count",
- SchemaProperty(qmfTypes.TYPE_UINT32))
+_schema.add_property( "query_count", SchemaProperty(qmfTypes.TYPE_UINT32))
+_schema.add_property( "method_call_count", SchemaProperty(qmfTypes.TYPE_UINT32))
+
# These two properties can be set via the method call
-_schema.addProperty( "set_string",
- SchemaProperty(qmfTypes.TYPE_LSTR))
-_schema.addProperty( "set_int",
- SchemaProperty(qmfTypes.TYPE_UINT32))
+_schema.add_property( "set_string", SchemaProperty(qmfTypes.TYPE_LSTR))
+_schema.add_property( "set_int", SchemaProperty(qmfTypes.TYPE_UINT32))
# add method
_meth = SchemaMethod( _desc="Method to set string and int in object." )
-_meth.addArgument( "arg_int", SchemaProperty(qmfTypes.TYPE_UINT32) )
-_meth.addArgument( "arg_str", SchemaProperty(qmfTypes.TYPE_LSTR) )
-_schema.addMethod( "set_meth", _meth )
+_meth.add_argument( "arg_int", SchemaProperty(qmfTypes.TYPE_UINT32) )
+_meth.add_argument( "arg_str", SchemaProperty(qmfTypes.TYPE_LSTR) )
+_schema.add_method( "set_meth", _meth )
# Add schema to Agent
-_agent.registerObjectClass(_schema)
+_agent.register_object_class(_schema)
# instantiate managed data objects matching the schema
-_obj = QmfAgentData( _agent, _schema )
-_obj.setProperty("index1", 100)
-_obj.setProperty("index2", "a name" )
-_obj.setProperty("set_string", "UNSET")
-_obj.setProperty("set_int", 0)
-_obj.setProperty("query_count", 0)
-_obj.setProperty("method_call_count", 0)
-_agent.addObject( _obj )
-
-_agent.addObject( QmfAgentData( _agent, _schema,
- _props={"index1":99,
- "index2": "another name",
- "set_string": "UNSET",
- "set_int": 0,
- "query_count": 0,
- "method_call_count": 0} ))
+_obj = QmfAgentData( _agent, _schema=_schema )
+_obj.set_value("index1", 100)
+_obj.set_value("index2", "a name" )
+_obj.set_value("set_string", "UNSET")
+_obj.set_value("set_int", 0)
+_obj.set_value("query_count", 0)
+_obj.set_value("method_call_count", 0)
+_agent.add_object( _obj )
+
+_agent.add_object( QmfAgentData( _agent, _schema=_schema,
+ _values={"index1":99,
+ "index2": "another name",
+ "set_string": "UNSET",
+ "set_int": 0,
+ "query_count": 0,
+ "method_call_count": 0} ))
## Now connect to the broker
@@ -100,18 +94,18 @@
while _wi:
print("work item %d:%s" % (_wi.getType(), str(_wi.getParams())))
_agent.releaseWorkItem(_wi)
- _wi = _agent.getNextWorkitem(timeout=0)
+ _wi = _agent.getNextWorkItem(timeout=0)
except:
- logging.info( "shutting down..." )
+ print( "shutting down..." )
_done = True
-logging.info( "Removing connection... TBD!!!" )
+print( "Removing connection... TBD!!!" )
#_myConsole.remove_connection( _c, 10 )
-logging.info( "Destroying agent... TBD!!!" )
+print( "Destroying agent... TBD!!!" )
#_myConsole.destroy( 10 )
-logging.info( "******** agent test done ********" )
+print( "******** agent test done ********" )
Modified: qpid/branches/qmfv2/qpid/python/qmf/test/console_test.py
URL: http://svn.apache.org/viewvc/qpid/branches/qmfv2/qpid/python/qmf/test/console_test.py?rev=898057&r1=898056&r2=898057&view=diff
==============================================================================
--- qpid/branches/qmfv2/qpid/python/qmf/test/console_test.py (original)
+++ qpid/branches/qmfv2/qpid/python/qmf/test/console_test.py Mon Jan 11 20:46:07 2010
@@ -4,7 +4,7 @@
from qpid.messaging import *
-from qmfCommon import (Notifier, QmfQuery)
+from qmfCommon import (Notifier, QmfQuery, MsgKey, SchemaClassId, SchemaClass)
from qmfConsole import Console
@@ -16,18 +16,18 @@
self._sema4.release()
def waitForWork(self):
- logging.error("Waiting for event...")
+ print("Waiting for event...")
self._sema4.acquire()
- logging.error("...event present")
+ print("...event present")
logging.getLogger().setLevel(logging.INFO)
-logging.info( "Starting Connection" )
+print( "Starting Connection" )
_c = Connection("localhost")
_c.connect()
-logging.info( "Starting Console" )
+print( "Starting Console" )
_notifier = ExampleNotifier()
_myConsole = Console(notifier=_notifier)
@@ -40,30 +40,65 @@
_query = {QmfQuery._TARGET:
{QmfQuery._TARGET_AGENT:None},
QmfQuery._PREDICATE:
- {QmfQuery._LOGIC_AND:
- [{QmfQuery._CMP_EQ: ["vendor", "redhat.com"]},
- {QmfQuery._CMP_EQ: ["product", "qmf"]}]}}
+ {QmfQuery._CMP_EQ: ["_name", "qmf.testAgent"]}}
_query = QmfQuery(_query)
_myConsole.enableAgentDiscovery(_query)
_done = False
while not _done:
- try:
- _notifier.waitForWork()
+# try:
+ _notifier.waitForWork()
- _wi = _myConsole.get_next_workitem(timeout=0)
- while _wi:
- print("!!! work item received %d:%s" % (_wi.getType(), str(_wi.getParams())))
- _wi = _myConsole.get_next_workitem(timeout=0)
- except:
- logging.info( "shutting down..." )
- _done = True
+ _wi = _myConsole.getNextWorkItem(timeout=0)
+ while _wi:
+ print("!!! work item received %d:%s" % (_wi.getType(),
+ str(_wi.getParams())))
-logging.info( "Removing connection" )
+
+ if _wi.getType() == _wi.AGENT_ADDED:
+ _agent = _wi.getParams().get("agent")
+ if not _agent:
+ print("!!!! AGENT IN REPLY IS NULL !!! ")
+
+ _query = QmfQuery( {QmfQuery._TARGET:
+ {QmfQuery._TARGET_PACKAGES:None}} )
+
+ _reply = _myConsole.doQuery(_agent, _query)
+
+ package_list = _reply.get(MsgKey.package_info)
+ for pname in package_list:
+ print("!!! Querying for schema from package: %s" % pname)
+ _query = QmfQuery({QmfQuery._TARGET:
+ {QmfQuery._TARGET_SCHEMA_ID:None},
+ QmfQuery._PREDICATE:
+ {QmfQuery._CMP_EQ:
+ [SchemaClassId.KEY_PACKAGE, pname]}})
+
+ _reply = _myConsole.doQuery(_agent, _query)
+
+ schema_id_list = _reply.get(MsgKey.schema_id)
+ for sid_map in schema_id_list:
+ _query = QmfQuery({QmfQuery._TARGET:
+ {QmfQuery._TARGET_SCHEMA:None},
+ QmfQuery._PREDICATE:
+ {QmfQuery._CMP_EQ:
+ [SchemaClass.KEY_SCHEMA_ID, sid_map]}})
+
+ _reply = _myConsole.doQuery(_agent, _query)
+
+
+
+ _myConsole.releaseWorkItem(_wi)
+ _wi = _myConsole.getNextWorkItem(timeout=0)
+# except:
+# logging.info( "shutting down..." )
+# _done = True
+
+print( "Removing connection" )
_myConsole.removeConnection( _c, 10 )
-logging.info( "Destroying console:" )
+print( "Destroying console:" )
_myConsole.destroy( 10 )
-logging.info( "******** console test done ********" )
+print( "******** console test done ********" )
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org