You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kg...@apache.org on 2010/01/21 22:58:00 UTC

svn commit: r901879 - in /qpid/branches/qmfv2/qpid/python/qmf/test: __init__.py agent_discovery.py basic_method.py basic_query.py obj_gets.py

Author: kgiusti
Date: Thu Jan 21 21:57:59 2010
New Revision: 901879

URL: http://svn.apache.org/viewvc?rev=901879&view=rev
Log:
QPID-2261: check in the qmf python tests.

Added:
    qpid/branches/qmfv2/qpid/python/qmf/test/__init__.py
    qpid/branches/qmfv2/qpid/python/qmf/test/agent_discovery.py
    qpid/branches/qmfv2/qpid/python/qmf/test/basic_method.py
    qpid/branches/qmfv2/qpid/python/qmf/test/basic_query.py
    qpid/branches/qmfv2/qpid/python/qmf/test/obj_gets.py

Added: qpid/branches/qmfv2/qpid/python/qmf/test/__init__.py
URL: http://svn.apache.org/viewvc/qpid/branches/qmfv2/qpid/python/qmf/test/__init__.py?rev=901879&view=auto
==============================================================================
--- qpid/branches/qmfv2/qpid/python/qmf/test/__init__.py (added)
+++ qpid/branches/qmfv2/qpid/python/qmf/test/__init__.py Thu Jan 21 21:57:59 2010
@@ -0,0 +1,22 @@
+# Do not delete - marks this directory as a python package.
+
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+# 
+#   http://www.apache.org/licenses/LICENSE-2.0
+# 
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+import agent_discovery, basic_query, basic_method, obj_gets, events

Added: qpid/branches/qmfv2/qpid/python/qmf/test/agent_discovery.py
URL: http://svn.apache.org/viewvc/qpid/branches/qmfv2/qpid/python/qmf/test/agent_discovery.py?rev=901879&view=auto
==============================================================================
--- qpid/branches/qmfv2/qpid/python/qmf/test/agent_discovery.py (added)
+++ qpid/branches/qmfv2/qpid/python/qmf/test/agent_discovery.py Thu Jan 21 21:57:59 2010
@@ -0,0 +1,320 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+import unittest
+import logging
+from threading import Thread, Event
+
+import qpid.messaging
+import qmf.qmfCommon
+import qmf.qmfConsole
+import qmf.qmfAgent
+
+
+class _testNotifier(qmf.qmfCommon.Notifier):
+    def __init__(self):
+        self._event = Event()
+
+    def indication(self):
+        # note: called by qmf daemon thread
+        self._event.set()
+
+    def wait_for_work(self, timeout):
+        # note: called by application thread to wait
+        # for qmf to generate work
+        self._event.wait(timeout)
+        timed_out = self._event.isSet() == False
+        if not timed_out:
+            self._event.clear()
+            return True
+        return False
+
+
+class _agentApp(Thread):
+    def __init__(self, name, heartbeat):
+        Thread.__init__(self)
+        self.notifier = _testNotifier()
+        self.agent = qmf.qmfAgent.Agent(name,
+                           _notifier=self.notifier,
+                           _heartbeat_interval=heartbeat)
+        # No database needed for this test
+        self.running = True
+        self.start()
+
+    def connect_agent(self, broker_url):
+        # broker_url = "user/passwd@hostname:port"
+        self.conn = qpid.messaging.Connection(broker_url.host,
+                                         broker_url.port,
+                                         broker_url.user,
+                                         broker_url.password)
+        self.conn.connect()
+        self.agent.set_connection(self.conn)
+
+    def disconnect_agent(self, timeout):
+        if self.conn:
+            self.agent.remove_connection(timeout)
+
+    def shutdown_agent(self, timeout):
+        self.agent.destroy(timeout)
+
+    def stop(self):
+        self.running = False
+        self.notifier.indication() # hmmm... collide with daemon???
+        self.join(10)
+        if self.isAlive():
+            logging.error("AGENT DID NOT TERMINATE AS EXPECTED!!!")
+
+    def run(self):
+        while self.running:
+            self.notifier.wait_for_work(None)
+            wi = self.agent.get_next_workitem(timeout=0)
+            while wi is not None:
+                logging.error("UNEXPECTED AGENT WORKITEM RECEIVED=%s" % wi.get_type())
+                self.agent.release_workitem(wi)
+                wi = self.agent.get_next_workitem(timeout=0)
+
+
+
+class BaseTest(unittest.TestCase):
+    def configure(self, config):
+        self.config = config
+        self.broker = config.broker
+        self.defines = self.config.defines
+
+    def setUp(self):
+        # one second agent indication interval
+        self.agent1 = _agentApp("agent1", 1)
+        self.agent1.connect_agent(self.broker)
+        self.agent2 = _agentApp("agent2", 1)
+        self.agent2.connect_agent(self.broker)
+
+    def tearDown(self):
+        if self.agent1:
+            self.agent1.shutdown_agent(10)
+            self.agent1.stop()
+            self.agent1 = None
+        if self.agent2:
+            self.agent2.shutdown_agent(10)
+            self.agent2.stop()
+            self.agent2 = None
+
+    def test_discover_all(self):
+        # create console
+        # enable agent discovery
+        # wait
+        # expect agent add for agent1 and agent2
+        self.notifier = _testNotifier()
+        self.console = qmf.qmfConsole.Console(notifier=self.notifier,
+                                              agent_timeout=3)
+        self.conn = qpid.messaging.Connection(self.broker.host,
+                                              self.broker.port,
+                                              self.broker.user,
+                                              self.broker.password)
+        self.conn.connect()
+        self.console.addConnection(self.conn)
+        self.console.enable_agent_discovery()
+
+        agent1_found = agent2_found = False
+        wi = self.console.get_next_workitem(timeout=3)
+        while wi and not (agent1_found and agent2_found):
+            if wi.get_type() == wi.AGENT_ADDED:
+                agent = wi.get_params().get("agent")
+                if not agent or not isinstance(agent, qmf.qmfConsole.Agent):
+                    self.fail("Unexpected workitem from agent")
+                else:
+                    if agent.get_name() == "agent1":
+                        agent1_found = True
+                    elif agent.get_name() == "agent2":
+                        agent2_found = True
+                    else:
+                        self.fail("Unexpected agent name received: %s" %
+                                  agent.get_name())
+                    if agent1_found and agent2_found:
+                        break;
+
+            wi = self.console.get_next_workitem(timeout=3)
+
+        self.assertTrue(agent1_found and agent2_found, "All agents not discovered")
+
+        self.console.destroy(10)
+
+
+    def test_discover_one(self):
+        # create console
+        # enable agent discovery, filter for agent1 only
+        # wait until timeout
+        # expect agent add for agent1 only
+        self.notifier = _testNotifier()
+        self.console = qmf.qmfConsole.Console(notifier=self.notifier,
+                                              agent_timeout=3)
+        self.conn = qpid.messaging.Connection(self.broker.host,
+                                              self.broker.port,
+                                              self.broker.user,
+                                              self.broker.password)
+        self.conn.connect()
+        self.console.addConnection(self.conn)
+
+        query = qmf.qmfCommon.QmfQuery.create_predicate(
+                           qmf.qmfCommon.QmfQuery.TARGET_AGENT,
+                           qmf.qmfCommon.QmfQueryPredicate({qmf.qmfCommon.QmfQuery.CMP_EQ:
+                                 [qmf.qmfCommon.QmfQuery.KEY_AGENT_NAME, "agent1"]}))
+        self.console.enable_agent_discovery(query)
+
+        agent1_found = agent2_found = False
+        wi = self.console.get_next_workitem(timeout=3)
+        while wi:
+            if wi.get_type() == wi.AGENT_ADDED:
+                agent = wi.get_params().get("agent")
+                if not agent or not isinstance(agent, qmf.qmfConsole.Agent):
+                    self.fail("Unexpected workitem from agent")
+                else:
+                    if agent.get_name() == "agent1":
+                        agent1_found = True
+                    elif agent.get_name() == "agent2":
+                        agent2_found = True
+                    else:
+                        self.fail("Unexpected agent name received: %s" %
+                                  agent.get_name())
+
+            wi = self.console.get_next_workitem(timeout=2)
+
+        self.assertTrue(agent1_found and not agent2_found, "Unexpected agent discovered")
+
+        self.console.destroy(10)
+
+
+    def test_heartbeat(self):
+        # create console with 2 sec agent timeout
+        # enable agent discovery, find all agents
+        # stop agent1, expect timeout notification
+        # stop agent2, expect timeout notification
+        self.notifier = _testNotifier()
+        self.console = qmf.qmfConsole.Console(notifier=self.notifier,
+                                              agent_timeout=2)
+        self.conn = qpid.messaging.Connection(self.broker.host,
+                                              self.broker.port,
+                                              self.broker.user,
+                                              self.broker.password)
+        self.conn.connect()
+        self.console.addConnection(self.conn)
+        self.console.enable_agent_discovery()
+
+        agent1_found = agent2_found = False
+        wi = self.console.get_next_workitem(timeout=4)
+        while wi and not (agent1_found and agent2_found):
+            if wi.get_type() == wi.AGENT_ADDED:
+                agent = wi.get_params().get("agent")
+                if not agent or not isinstance(agent, qmf.qmfConsole.Agent):
+                    self.fail("Unexpected workitem from agent")
+                else:
+                    if agent.get_name() == "agent1":
+                        agent1_found = True
+                    elif agent.get_name() == "agent2":
+                        agent2_found = True
+                    else:
+                        self.fail("Unexpected agent name received: %s" %
+                                  agent.get_name())
+                    if agent1_found and agent2_found:
+                        break;
+
+            wi = self.console.get_next_workitem(timeout=4)
+
+        self.assertTrue(agent1_found and agent2_found, "All agents not discovered")
+
+        # now kill agent1 and wait for expiration
+
+        agent1 = self.agent1
+        self.agent1 = None
+        agent1.shutdown_agent(10)
+        agent1.stop()
+
+        wi = self.console.get_next_workitem(timeout=4)
+        while wi is not None:
+            if wi.get_type() == wi.AGENT_DELETED:
+                agent = wi.get_params().get("agent")
+                if not agent or not isinstance(agent, qmf.qmfConsole.Agent):
+                    self.fail("Unexpected workitem from agent")
+                else:
+                    if agent.get_name() == "agent1":
+                        agent1_found = False
+                    else:
+                        self.fail("Unexpected agent_deleted received: %s" %
+                                  agent.get_name())
+                    if not agent1_found:
+                        break;
+
+            wi = self.console.get_next_workitem(timeout=4)
+
+        self.assertFalse(agent1_found, "agent1 did not delete!")
+
+        # now kill agent2 and wait for expiration
+
+        agent2 = self.agent2
+        self.agent2 = None
+        agent2.shutdown_agent(10)
+        agent2.stop()
+
+        wi = self.console.get_next_workitem(timeout=4)
+        while wi is not None:
+            if wi.get_type() == wi.AGENT_DELETED:
+                agent = wi.get_params().get("agent")
+                if not agent or not isinstance(agent, qmf.qmfConsole.Agent):
+                    self.fail("Unexpected workitem from agent")
+                else:
+                    if agent.get_name() == "agent2":
+                        agent2_found = False
+                    else:
+                        self.fail("Unexpected agent_deleted received: %s" %
+                                  agent.get_name())
+                    if not agent2_found:
+                        break;
+
+            wi = self.console.get_next_workitem(timeout=4)
+
+        self.assertFalse(agent2_found, "agent2 did not delete!")
+
+        self.console.destroy(10)
+
+
+    def test_find_agent(self):
+        # create console
+        # do not enable agent discovery
+        # find agent1, expect success
+        # find agent-none, expect failure
+        # find agent2, expect success
+        self.notifier = _testNotifier()
+        self.console = qmf.qmfConsole.Console(notifier=self.notifier)
+        self.conn = qpid.messaging.Connection(self.broker.host,
+                                              self.broker.port,
+                                              self.broker.user,
+                                              self.broker.password)
+        self.conn.connect()
+        self.console.addConnection(self.conn)
+
+        agent1 = self.console.find_agent("agent1", timeout=3)
+        self.assertTrue(agent1 and agent1.get_name() == "agent1")
+
+        no_agent = self.console.find_agent("agent-none", timeout=3)
+        self.assertTrue(no_agent == None)
+
+        agent2 = self.console.find_agent("agent2", timeout=3)
+        self.assertTrue(agent2 and agent2.get_name() == "agent2")
+
+        self.console.removeConnection(self.conn, 10)
+        self.console.destroy(10)
+
+

Added: qpid/branches/qmfv2/qpid/python/qmf/test/basic_method.py
URL: http://svn.apache.org/viewvc/qpid/branches/qmfv2/qpid/python/qmf/test/basic_method.py?rev=901879&view=auto
==============================================================================
--- qpid/branches/qmfv2/qpid/python/qmf/test/basic_method.py (added)
+++ qpid/branches/qmfv2/qpid/python/qmf/test/basic_method.py Thu Jan 21 21:57:59 2010
@@ -0,0 +1,348 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+import unittest
+import logging
+from threading import Thread, Event
+
+import qpid.messaging
+from qmf.qmfCommon import (Notifier, SchemaObjectClass, SchemaClassId,
+                           SchemaProperty, qmfTypes, SchemaMethod, QmfQuery,
+                           QmfData, QmfQueryPredicate, WorkItem) 
+import qmf.qmfConsole
+from qmf.qmfAgent import(QmfAgentData, Agent, MethodCallParams)
+
+
+class _testNotifier(Notifier):
+    def __init__(self):
+        self._event = Event()
+
+    def indication(self):
+        # note: called by qmf daemon thread
+        self._event.set()
+
+    def wait_for_work(self, timeout):
+        # note: called by application thread to wait
+        # for qmf to generate work
+        self._event.wait(timeout)
+        timed_out = self._event.isSet() == False
+        if not timed_out:
+            self._event.clear()
+            return True
+        return False
+
+
+class _agentApp(Thread):
+    def __init__(self, name, heartbeat):
+        Thread.__init__(self)
+        self.notifier = _testNotifier()
+        self.agent = Agent(name,
+                           _notifier=self.notifier,
+                           _heartbeat_interval=heartbeat)
+
+        # Dynamically construct a management database
+
+        _schema = SchemaObjectClass( _classId=SchemaClassId("MyPackage", "MyClass"),
+                                     _desc="A test data schema",
+                                     _object_id_names=["index1", "index2"] )
+        # add properties
+        _schema.add_property( "index1", SchemaProperty(qmfTypes.TYPE_UINT8))
+        _schema.add_property( "index2", SchemaProperty(qmfTypes.TYPE_LSTR))
+
+        # these two properties are statistics
+        _schema.add_property( "query_count", SchemaProperty(qmfTypes.TYPE_UINT32))
+        _schema.add_property( "method_call_count", SchemaProperty(qmfTypes.TYPE_UINT32))
+
+        # These two properties can be set via the method call
+        _schema.add_property( "set_string", SchemaProperty(qmfTypes.TYPE_LSTR))
+        _schema.add_property( "set_int", SchemaProperty(qmfTypes.TYPE_UINT32))
+
+        # add method
+        _meth = SchemaMethod( _desc="Method to set string and int in object." )
+        _meth.add_argument( "arg_int", SchemaProperty(qmfTypes.TYPE_UINT32) )
+        _meth.add_argument( "arg_str", SchemaProperty(qmfTypes.TYPE_LSTR) )
+        _schema.add_method( "set_meth", _meth )
+
+        # Add schema to Agent
+
+        self.agent.register_object_class(_schema)
+
+        # instantiate managed data objects matching the schema
+
+        _obj1 = QmfAgentData( self.agent, _schema=_schema )
+        _obj1.set_value("index1", 100)
+        _obj1.set_value("index2", "a name" )
+        _obj1.set_value("set_string", "UNSET")
+        _obj1.set_value("set_int", 0)
+        _obj1.set_value("query_count", 0)
+        _obj1.set_value("method_call_count", 0)
+        self.agent.add_object( _obj1 )
+
+        self.agent.add_object( QmfAgentData( self.agent, _schema=_schema,
+                                             _values={"index1":99, 
+                                                      "index2": "another name",
+                                                      "set_string": "UNSET",
+                                                      "set_int": 0,
+                                                      "query_count": 0,
+                                                      "method_call_count": 0} ))
+
+        # add an "unstructured" object to the Agent
+        _obj2 = QmfAgentData(self.agent, _object_id="01545")
+        _obj2.set_value("field1", "a value")
+        _obj2.set_value("field2", 2)
+        _obj2.set_value("field3", {"a":1, "map":2, "value":3})
+        _obj2.set_value("field4", ["a", "list", "value"])
+        self.agent.add_object(_obj2)
+
+        self.running = True
+        self.start()
+
+    def connect_agent(self, broker_url):
+        # broker_url = "user/passwd@hostname:port"
+        self.conn = qpid.messaging.Connection(broker_url.host,
+                                         broker_url.port,
+                                         broker_url.user,
+                                         broker_url.password)
+        self.conn.connect()
+        self.agent.set_connection(self.conn)
+
+    def disconnect_agent(self, timeout):
+        if self.conn:
+            self.agent.remove_connection(timeout)
+
+    def shutdown_agent(self, timeout):
+        self.agent.destroy(timeout)
+
+    def stop(self):
+        self.running = False
+        self.notifier.indication() # hmmm... collide with daemon???
+        self.join(10)
+        if self.isAlive():
+            raise Exception("AGENT DID NOT TERMINATE AS EXPECTED!!!")
+
+    def run(self):
+        # Agent application main processing loop
+        while self.running:
+            self.notifier.wait_for_work(None)
+            wi = self.agent.get_next_workitem(timeout=0)
+            while wi is not None:
+                if wi.get_type() == WorkItem.METHOD_CALL:
+                    mc = wi.get_params()
+                    if not isinstance(mc, MethodCallParams):
+                        raise Exception("Unexpected method call parameters")
+
+                    if mc.get_name() == "set_meth":
+                        obj = self.agent.get_object(mc.get_object_id())
+                        if obj is None:
+                            error_info = QmfData.create({"code": -2, 
+                                                         "description":
+                                                             "Bad Object Id."})
+                            self.agent.method_response(wi.get_handle(),
+                                                       _error=error_info)
+                        else:
+                            obj.inc_value("method_call_count")
+                            if "arg_int" in mc.get_args():
+                                obj.set_value("set_int", mc.get_args()["arg_int"])
+                            if "arg_str" in mc.get_args():
+                                obj.set_value("set_string", mc.get_args()["arg_str"])
+                            self.agent.method_response(wi.get_handle(),
+                                                       {"code" : 0})
+                    elif mc.get_name() == "a_method":
+                        obj = self.agent.get_object(mc.get_object_id())
+                        if obj is None:
+                            error_info = QmfData.create({"code": -3, 
+                                                         "description":
+                                                             "Unknown object id."})
+                            self.agent.method_response(wi.get_handle(),
+                                                       _error=error_info)
+                        elif obj.get_object_id() != "01545":
+                            error_info = QmfData.create({"code": -4, 
+                                                         "description":
+                                                             "Unexpected id."})
+                            self.agent.method_response(wi.get_handle(),
+                                                       _error=error_info)
+                        else:
+                            args = mc.get_args()
+                            if ("arg1" in args and args["arg1"] == 1 and
+                                "arg2" in args and args["arg2"] == "Now set!"
+                                and "arg3" in args and args["arg3"] == 1966): 
+                                self.agent.method_response(wi.get_handle(),
+                                                           {"code" : 0})
+                            else:
+                                error_info = QmfData.create({"code": -5, 
+                                                             "description":
+                                                                 "Bad Args."})
+                                self.agent.method_response(wi.get_handle(),
+                                                           _error=error_info)
+                    else:
+                        error_info = QmfData.create({"code": -1, 
+                                                     "description":
+                                                         "Unknown method call."})
+                        self.agent.method_response(wi.get_handle(), _error=error_info)
+
+                self.agent.release_workitem(wi)
+                wi = self.agent.get_next_workitem(timeout=0)
+
+
+
+class BaseTest(unittest.TestCase):
+    def configure(self, config):
+        self.config = config
+        self.broker = config.broker
+        self.defines = self.config.defines
+
+    def setUp(self):
+        # one second agent indication interval
+        self.agent1 = _agentApp("agent1", 1)
+        self.agent1.connect_agent(self.broker)
+        self.agent2 = _agentApp("agent2", 1)
+        self.agent2.connect_agent(self.broker)
+
+    def tearDown(self):
+        if self.agent1:
+            self.agent1.shutdown_agent(10)
+            self.agent1.stop()
+            self.agent1 = None
+        if self.agent2:
+            self.agent2.shutdown_agent(10)
+            self.agent2.stop()
+            self.agent2 = None
+
+    def test_described_obj(self):
+        # create console
+        # find agents
+        # synchronous query for all objects in schema
+        # method call on each object
+        self.notifier = _testNotifier()
+        self.console = qmf.qmfConsole.Console(notifier=self.notifier,
+                                              agent_timeout=3)
+        self.conn = qpid.messaging.Connection(self.broker.host,
+                                              self.broker.port,
+                                              self.broker.user,
+                                              self.broker.password)
+        self.conn.connect()
+        self.console.addConnection(self.conn)
+
+        for aname in ["agent1", "agent2"]:
+            agent = self.console.find_agent(aname, timeout=3)
+            self.assertTrue(agent and agent.get_name() == aname)
+
+            query = QmfQuery.create_predicate(QmfQuery.TARGET_OBJECT,
+                                              QmfQueryPredicate(
+                    {QmfQuery.LOGIC_AND:
+                     [{QmfQuery.CMP_EXISTS: [SchemaClassId.KEY_PACKAGE]},
+                      {QmfQuery.CMP_EQ: [SchemaClassId.KEY_PACKAGE,
+                                         "MyPackage"]}]}))
+
+            obj_list = self.console.doQuery(agent, query)
+            self.assertTrue(len(obj_list) == 2)
+            for obj in obj_list:
+                mr = obj.invoke_method( "set_meth", {"arg_int": -99,
+                                                     "arg_str": "Now set!"},
+                                        _timeout=3)
+                self.assertTrue(isinstance(mr, qmf.qmfConsole.MethodResult))
+                self.assertTrue(mr.succeeded())
+                self.assertTrue(mr.get_argument("code") == 0)
+
+                self.assertTrue(obj.get_value("method_call_count") == 0)
+                self.assertTrue(obj.get_value("set_string") == "UNSET")
+                self.assertTrue(obj.get_value("set_int") == 0)
+
+                obj.refresh()
+
+                self.assertTrue(obj.get_value("method_call_count") == 1)
+                self.assertTrue(obj.get_value("set_string") == "Now set!")
+                self.assertTrue(obj.get_value("set_int") == -99)
+
+        self.console.destroy(10)
+
+
+    def test_bad_method(self):
+        # create console
+        # find agents
+        # synchronous query for all objects in schema
+        # invalid method call on each object
+        #  - should throw a ValueError
+        self.notifier = _testNotifier()
+        self.console = qmf.qmfConsole.Console(notifier=self.notifier,
+                                              agent_timeout=3)
+        self.conn = qpid.messaging.Connection(self.broker.host,
+                                              self.broker.port,
+                                              self.broker.user,
+                                              self.broker.password)
+        self.conn.connect()
+        self.console.addConnection(self.conn)
+
+        for aname in ["agent1", "agent2"]:
+            agent = self.console.find_agent(aname, timeout=3)
+            self.assertTrue(agent and agent.get_name() == aname)
+
+            query = QmfQuery.create_predicate(QmfQuery.TARGET_OBJECT,
+                                              QmfQueryPredicate(
+                    {QmfQuery.LOGIC_AND:
+                     [{QmfQuery.CMP_EXISTS: [SchemaClassId.KEY_PACKAGE]},
+                      {QmfQuery.CMP_EQ: [SchemaClassId.KEY_PACKAGE,
+                                         "MyPackage"]}]}))
+
+            obj_list = self.console.doQuery(agent, query)
+            self.assertTrue(len(obj_list) == 2)
+            for obj in obj_list:
+                self.failUnlessRaises(ValueError,
+                                      obj.invoke_method,
+                                      "unknown_meth", 
+                                      {"arg1": -99, "arg2": "Now set!"},
+                                      _timeout=3)
+        self.console.destroy(10)
+
+
+    def test_managed_obj(self):
+        # create console
+        # find agents
+        # synchronous query for a managed object
+        # method call on each object
+        self.notifier = _testNotifier()
+        self.console = qmf.qmfConsole.Console(notifier=self.notifier,
+                                              agent_timeout=3)
+        self.conn = qpid.messaging.Connection(self.broker.host,
+                                              self.broker.port,
+                                              self.broker.user,
+                                              self.broker.password)
+        self.conn.connect()
+        self.console.addConnection(self.conn)
+
+        for aname in ["agent1", "agent2"]:
+            agent = self.console.find_agent(aname, timeout=3)
+            self.assertTrue(agent and agent.get_name() == aname)
+
+            query = QmfQuery.create_id(QmfQuery.TARGET_OBJECT, "01545")
+            obj_list = self.console.doQuery(agent, query)
+
+            self.assertTrue(isinstance(obj_list, type([])))
+            self.assertTrue(len(obj_list) == 1)
+            obj = obj_list[0]
+
+            mr = obj.invoke_method("a_method",
+                                   {"arg1": 1,
+                                    "arg2": "Now set!",
+                                    "arg3": 1966},
+                                   _timeout=3)
+            self.assertTrue(isinstance(mr, qmf.qmfConsole.MethodResult))
+            self.assertTrue(mr.succeeded())
+            self.assertTrue(mr.get_argument("code") == 0)
+            # @todo refresh and verify changes
+
+        self.console.destroy(10)

Added: qpid/branches/qmfv2/qpid/python/qmf/test/basic_query.py
URL: http://svn.apache.org/viewvc/qpid/branches/qmfv2/qpid/python/qmf/test/basic_query.py?rev=901879&view=auto
==============================================================================
--- qpid/branches/qmfv2/qpid/python/qmf/test/basic_query.py (added)
+++ qpid/branches/qmfv2/qpid/python/qmf/test/basic_query.py Thu Jan 21 21:57:59 2010
@@ -0,0 +1,336 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+import unittest
+import logging
+from threading import Thread, Event
+
+import qpid.messaging
+from qmf.qmfCommon import (Notifier, SchemaObjectClass, SchemaClassId,
+                           SchemaProperty, qmfTypes, SchemaMethod, QmfQuery,
+                           QmfData, QmfQueryPredicate) 
+import qmf.qmfConsole
+from qmf.qmfAgent import(QmfAgentData, Agent)
+
+
+class _testNotifier(Notifier):
+    def __init__(self):
+        self._event = Event()
+
+    def indication(self):
+        # note: called by qmf daemon thread
+        self._event.set()
+
+    def wait_for_work(self, timeout):
+        # note: called by application thread to wait
+        # for qmf to generate work
+        self._event.wait(timeout)
+        timed_out = self._event.isSet() == False
+        if not timed_out:
+            self._event.clear()
+            return True
+        return False
+
+
+class _agentApp(Thread):
+    def __init__(self, name, heartbeat):
+        Thread.__init__(self)
+        self.notifier = _testNotifier()
+        self.agent = Agent(name,
+                           _notifier=self.notifier,
+                           _heartbeat_interval=heartbeat)
+
+        # Dynamically construct a management database
+
+        _schema = SchemaObjectClass( _classId=SchemaClassId("MyPackage", "MyClass"),
+                                     _desc="A test data schema",
+                                     _object_id_names=["index1", "index2"] )
+        # add properties
+        _schema.add_property( "index1", SchemaProperty(qmfTypes.TYPE_UINT8))
+        _schema.add_property( "index2", SchemaProperty(qmfTypes.TYPE_LSTR))
+
+        # these two properties are statistics
+        _schema.add_property( "query_count", SchemaProperty(qmfTypes.TYPE_UINT32))
+        _schema.add_property( "method_call_count", SchemaProperty(qmfTypes.TYPE_UINT32))
+
+        # These two properties can be set via the method call
+        _schema.add_property( "set_string", SchemaProperty(qmfTypes.TYPE_LSTR))
+        _schema.add_property( "set_int", SchemaProperty(qmfTypes.TYPE_UINT32))
+
+        # add method
+        _meth = SchemaMethod( _desc="Method to set string and int in object." )
+        _meth.add_argument( "arg_int", SchemaProperty(qmfTypes.TYPE_UINT32) )
+        _meth.add_argument( "arg_str", SchemaProperty(qmfTypes.TYPE_LSTR) )
+        _schema.add_method( "set_meth", _meth )
+
+        # Add schema to Agent
+
+        self.agent.register_object_class(_schema)
+
+        # instantiate managed data objects matching the schema
+
+        _obj1 = QmfAgentData( self.agent, _schema=_schema )
+        _obj1.set_value("index1", 100)
+        _obj1.set_value("index2", "a name" )
+        _obj1.set_value("set_string", "UNSET")
+        _obj1.set_value("set_int", 0)
+        _obj1.set_value("query_count", 0)
+        _obj1.set_value("method_call_count", 0)
+        self.agent.add_object( _obj1 )
+
+        self.agent.add_object( QmfAgentData( self.agent, _schema=_schema,
+                                             _values={"index1":99, 
+                                                      "index2": "another name",
+                                                      "set_string": "UNSET",
+                                                      "set_int": 0,
+                                                      "query_count": 0,
+                                                      "method_call_count": 0} ))
+
+        # add an "unstructured" object to the Agent
+        _obj2 = QmfAgentData(self.agent, _object_id="01545")
+        _obj2.set_value("field1", "a value")
+        _obj2.set_value("field2", 2)
+        _obj2.set_value("field3", {"a":1, "map":2, "value":3})
+        _obj2.set_value("field4", ["a", "list", "value"])
+        self.agent.add_object(_obj2)
+
+        self.running = True
+        self.start()
+
+    def connect_agent(self, broker_url):
+        # broker_url = "user/passwd@hostname:port"
+        self.conn = qpid.messaging.Connection(broker_url.host,
+                                         broker_url.port,
+                                         broker_url.user,
+                                         broker_url.password)
+        self.conn.connect()
+        self.agent.set_connection(self.conn)
+
+    def disconnect_agent(self, timeout):
+        if self.conn:
+            self.agent.remove_connection(timeout)
+
+    def shutdown_agent(self, timeout):
+        self.agent.destroy(timeout)
+
+    def stop(self):
+        self.running = False
+        self.notifier.indication() # hmmm... collide with daemon???
+        self.join(10)
+        if self.isAlive():
+            raise Exception("AGENT DID NOT TERMINATE AS EXPECTED!!!")
+
+    def run(self):
+        while self.running:
+            self.notifier.wait_for_work(None)
+            wi = self.agent.get_next_workitem(timeout=0)
+            while wi is not None:
+                logging.error("UNEXPECTED AGENT WORKITEM RECEIVED=%s" % wi.get_type())
+                self.agent.release_workitem(wi)
+                wi = self.agent.get_next_workitem(timeout=0)
+
+
+
+class BaseTest(unittest.TestCase):
+    def configure(self, config):
+        self.config = config
+        self.broker = config.broker
+        self.defines = self.config.defines
+
+    def setUp(self):
+        # one second agent indication interval
+        self.agent1 = _agentApp("agent1", 1)
+        self.agent1.connect_agent(self.broker)
+        self.agent2 = _agentApp("agent2", 1)
+        self.agent2.connect_agent(self.broker)
+
+    def tearDown(self):
+        if self.agent1:
+            self.agent1.shutdown_agent(10)
+            self.agent1.stop()
+            self.agent1 = None
+        if self.agent2:
+            self.agent2.shutdown_agent(10)
+            self.agent2.stop()
+            self.agent2 = None
+
+    def test_all_oids(self):
+        # create console
+        # find agents
+        # synchronous query for all objects by id
+        # verify known object ids are returned
+        self.notifier = _testNotifier()
+        self.console = qmf.qmfConsole.Console(notifier=self.notifier,
+                                              agent_timeout=3)
+        self.conn = qpid.messaging.Connection(self.broker.host,
+                                              self.broker.port,
+                                              self.broker.user,
+                                              self.broker.password)
+        self.conn.connect()
+        self.console.addConnection(self.conn)
+
+        for aname in ["agent1", "agent2"]:
+            agent = self.console.find_agent(aname, timeout=3)
+            self.assertTrue(agent and agent.get_name() == aname)
+
+            query = QmfQuery.create_wildcard(QmfQuery.TARGET_OBJECT_ID)
+            oid_list = self.console.doQuery(agent, query)
+
+            self.assertTrue(isinstance(oid_list, type([])), 
+                            "Unexpected return type")
+            self.assertTrue(len(oid_list) == 3, "Wrong count")
+            self.assertTrue('100a name' in oid_list)
+            self.assertTrue('99another name' in oid_list)
+            self.assertTrue('01545' in oid_list)
+
+        self.console.destroy(10)
+
+
+    def test_direct_oids(self):
+        # create console
+        # find agents
+        # synchronous query for each objects
+        # verify objects and schemas are correct
+        self.notifier = _testNotifier()
+        self.console = qmf.qmfConsole.Console(notifier=self.notifier,
+                                              agent_timeout=3)
+        self.conn = qpid.messaging.Connection(self.broker.host,
+                                              self.broker.port,
+                                              self.broker.user,
+                                              self.broker.password)
+        self.conn.connect()
+        self.console.addConnection(self.conn)
+
+        for aname in ["agent1", "agent2"]:
+            agent = self.console.find_agent(aname, timeout=3)
+            self.assertTrue(agent and agent.get_name() == aname)
+
+            for oid in ['100a name', '99another name', '01545']:
+                query = QmfQuery.create_id(QmfQuery.TARGET_OBJECT, oid)
+                obj_list = self.console.doQuery(agent, query)
+
+                self.assertTrue(isinstance(obj_list, type([])), 
+                                "Unexpected return type")
+                self.assertTrue(len(obj_list) == 1)
+                obj = obj_list[0]
+                self.assertTrue(isinstance(obj, QmfData))
+                self.assertTrue(obj.get_object_id() == oid)
+
+                if obj.is_described():
+                    self.assertTrue(oid in ['100a name', '99another name'])
+                    schema_id = obj.get_schema_class_id()
+                    self.assertTrue(isinstance(schema_id, SchemaClassId))
+                else:
+                    self.assertTrue(oid == "01545")
+
+
+
+        self.console.destroy(10)
+
+
+
+    def test_packages(self):
+        # create console
+        # find agents
+        # synchronous query all package names
+        self.notifier = _testNotifier()
+        self.console = qmf.qmfConsole.Console(notifier=self.notifier,
+                                              agent_timeout=3)
+        self.conn = qpid.messaging.Connection(self.broker.host,
+                                              self.broker.port,
+                                              self.broker.user,
+                                              self.broker.password)
+        self.conn.connect()
+        self.console.addConnection(self.conn)
+
+        for aname in ["agent1", "agent2"]:
+            agent = self.console.find_agent(aname, timeout=3)
+            self.assertTrue(agent and agent.get_name() == aname)
+
+            query = QmfQuery.create_wildcard(QmfQuery.TARGET_PACKAGES)
+            package_list = self.console.doQuery(agent, query)
+            self.assertTrue(len(package_list) == 1)
+            self.assertTrue('MyPackage' in package_list)
+
+
+        self.console.destroy(10)
+
+
+
+    def test_predicate_schema_id(self):
+        # create console
+        # find agents
+        # synchronous query for all schema by package name
+        self.notifier = _testNotifier()
+        self.console = qmf.qmfConsole.Console(notifier=self.notifier,
+                                              agent_timeout=3)
+        self.conn = qpid.messaging.Connection(self.broker.host,
+                                              self.broker.port,
+                                              self.broker.user,
+                                              self.broker.password)
+        self.conn.connect()
+        self.console.addConnection(self.conn)
+
+        for aname in ["agent1", "agent2"]:
+            agent = self.console.find_agent(aname, timeout=3)
+            self.assertTrue(agent and agent.get_name() == aname)
+
+            query = QmfQuery.create_predicate(QmfQuery.TARGET_SCHEMA,
+                                              QmfQueryPredicate(
+                    {QmfQuery.CMP_EQ: [SchemaClassId.KEY_PACKAGE, 
+                                       "MyPackage"]}))
+
+            schema_list = self.console.doQuery(agent, query)
+            self.assertTrue(len(schema_list))
+            for schema in schema_list:
+                self.assertTrue(schema.get_class_id().get_package_name() ==
+                                "MyPackage")
+
+
+        self.console.destroy(10)
+
+
+
+    def test_predicate_no_match(self):
+        # create console
+        # find agents
+        # synchronous query for all schema by package name
+        self.notifier = _testNotifier()
+        self.console = qmf.qmfConsole.Console(notifier=self.notifier,
+                                              agent_timeout=3)
+        self.conn = qpid.messaging.Connection(self.broker.host,
+                                              self.broker.port,
+                                              self.broker.user,
+                                              self.broker.password)
+        self.conn.connect()
+        self.console.addConnection(self.conn)
+
+        for aname in ["agent1", "agent2"]:
+            agent = self.console.find_agent(aname, timeout=3)
+            self.assertTrue(agent and agent.get_name() == aname)
+
+            query = QmfQuery.create_predicate(QmfQuery.TARGET_SCHEMA,
+                                              QmfQueryPredicate(
+                    {QmfQuery.CMP_EQ: [SchemaClassId.KEY_PACKAGE, 
+                                       "No-Such-Package"]}))
+
+            schema_list = self.console.doQuery(agent, query)
+            self.assertTrue(len(schema_list) == 0)
+
+        self.console.destroy(10)
+
+

Added: qpid/branches/qmfv2/qpid/python/qmf/test/obj_gets.py
URL: http://svn.apache.org/viewvc/qpid/branches/qmfv2/qpid/python/qmf/test/obj_gets.py?rev=901879&view=auto
==============================================================================
--- qpid/branches/qmfv2/qpid/python/qmf/test/obj_gets.py (added)
+++ qpid/branches/qmfv2/qpid/python/qmf/test/obj_gets.py Thu Jan 21 21:57:59 2010
@@ -0,0 +1,399 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+import unittest
+import logging
+from threading import Thread, Event
+
+import qpid.messaging
+from qmf.qmfCommon import (Notifier, SchemaObjectClass, SchemaClassId,
+                           SchemaProperty, qmfTypes, SchemaMethod, QmfQuery,
+                           QmfData, QmfQueryPredicate) 
+import qmf.qmfConsole
+from qmf.qmfAgent import(QmfAgentData, Agent)
+
+
+class _testNotifier(Notifier):
+    def __init__(self):
+        self._event = Event()
+
+    def indication(self):
+        # note: called by qmf daemon thread
+        self._event.set()
+
+    def wait_for_work(self, timeout):
+        # note: called by application thread to wait
+        # for qmf to generate work
+        self._event.wait(timeout)
+        timed_out = self._event.isSet() == False
+        if not timed_out:
+            self._event.clear()
+            return True
+        return False
+
+
+class _agentApp(Thread):
+    def __init__(self, name, heartbeat):
+        Thread.__init__(self)
+        self.notifier = _testNotifier()
+        self.agent = Agent(name,
+                           _notifier=self.notifier,
+                           _heartbeat_interval=heartbeat)
+
+        # Management Database 
+        # - two different schema packages, 
+        # - two classes within one schema package
+        # - multiple objects per schema package+class
+        # - two "undescribed" objects
+
+        # "package1/class1"
+
+        _schema = SchemaObjectClass( _classId=SchemaClassId("package1", "class1"),
+                                     _desc="A test data schema - one",
+                                     _object_id_names=["key"] )
+
+        _schema.add_property( "key", SchemaProperty(qmfTypes.TYPE_LSTR))
+        _schema.add_property( "count1", SchemaProperty(qmfTypes.TYPE_UINT32))
+        _schema.add_property( "count2", SchemaProperty(qmfTypes.TYPE_UINT32))
+
+        self.agent.register_object_class(_schema)
+
+        _obj = QmfAgentData( self.agent, _schema=_schema )
+        _obj.set_value("key", "p1c1_key1")
+        _obj.set_value("count1", 0)
+        _obj.set_value("count2", 0)
+        self.agent.add_object( _obj )
+
+        _obj = QmfAgentData( self.agent, _schema=_schema )
+        _obj.set_value("key", "p1c1_key2")
+        _obj.set_value("count1", 9)
+        _obj.set_value("count2", 10)
+        self.agent.add_object( _obj )
+
+        # "package1/class2"
+
+        _schema = SchemaObjectClass( _classId=SchemaClassId("package1", "class2"),
+                                     _desc="A test data schema - two",
+                                     _object_id_names=["name"] )
+        # add properties
+        _schema.add_property( "name", SchemaProperty(qmfTypes.TYPE_LSTR))
+        _schema.add_property( "string1", SchemaProperty(qmfTypes.TYPE_LSTR))
+
+        self.agent.register_object_class(_schema)
+
+        _obj = QmfAgentData( self.agent, _schema=_schema )
+        _obj.set_value("name", "p1c2_name1")
+        _obj.set_value("string1", "a data string")
+        self.agent.add_object( _obj )
+
+
+        # "package2/class1"
+
+        _schema = SchemaObjectClass( _classId=SchemaClassId("package2", "class1"),
+                                     _desc="A test data schema - second package",
+                                     _object_id_names=["key"] )
+
+        _schema.add_property( "key", SchemaProperty(qmfTypes.TYPE_LSTR))
+        _schema.add_property( "counter", SchemaProperty(qmfTypes.TYPE_UINT32))
+
+        self.agent.register_object_class(_schema)
+
+        _obj = QmfAgentData( self.agent, _schema=_schema )
+        _obj.set_value("key", "p2c1_key1")
+        _obj.set_value("counter", 0)
+        self.agent.add_object( _obj )
+
+        _obj = QmfAgentData( self.agent, _schema=_schema )
+        _obj.set_value("key", "p2c1_key2")
+        _obj.set_value("counter", 2112)
+        self.agent.add_object( _obj )
+
+
+        # add two "unstructured" objects to the Agent
+
+        _obj = QmfAgentData(self.agent, _object_id="undesc-1")
+        _obj.set_value("field1", "a value")
+        _obj.set_value("field2", 2)
+        _obj.set_value("field3", {"a":1, "map":2, "value":3})
+        _obj.set_value("field4", ["a", "list", "value"])
+        self.agent.add_object(_obj)
+
+
+        _obj = QmfAgentData(self.agent, _object_id="undesc-2")
+        _obj.set_value("key-1", "a value")
+        _obj.set_value("key-2", 2)
+        self.agent.add_object(_obj)
+
+        self.running = True
+        self.start()
+
+    def connect_agent(self, broker_url):
+        # broker_url = "user/passwd@hostname:port"
+        self.conn = qpid.messaging.Connection(broker_url.host,
+                                         broker_url.port,
+                                         broker_url.user,
+                                         broker_url.password)
+        self.conn.connect()
+        self.agent.set_connection(self.conn)
+
+    def disconnect_agent(self, timeout):
+        if self.conn:
+            self.agent.remove_connection(timeout)
+
+    def shutdown_agent(self, timeout):
+        self.agent.destroy(timeout)
+
+    def stop(self):
+        self.running = False
+        self.notifier.indication() # hmmm... collide with daemon???
+        self.join(10)
+        if self.isAlive():
+            raise Exception("AGENT DID NOT TERMINATE AS EXPECTED!!!")
+
+    def run(self):
+        while self.running:
+            self.notifier.wait_for_work(None)
+            wi = self.agent.get_next_workitem(timeout=0)
+            while wi is not None:
+                logging.error("UNEXPECTED AGENT WORKITEM RECEIVED=%s" % wi.get_type())
+                self.agent.release_workitem(wi)
+                wi = self.agent.get_next_workitem(timeout=0)
+
+
+
+class BaseTest(unittest.TestCase):
+    agent_count = 5
+
+    def configure(self, config):
+        self.config = config
+        self.broker = config.broker
+        self.defines = self.config.defines
+
+    def setUp(self):
+        self.agents = []
+        for i in range(self.agent_count):
+            agent = _agentApp("agent-" + str(i), 1)
+            agent.connect_agent(self.broker)
+            self.agents.append(agent)
+
+    def tearDown(self):
+        for agent in self.agents:
+            if agent is not None:
+                agent.shutdown_agent(10)
+                agent.stop()
+
+
+    def test_all_agents(self):
+        # create console
+        # find all agents
+        # synchronous query for all objects by id
+        # verify known object ids are returned
+        self.notifier = _testNotifier()
+        self.console = qmf.qmfConsole.Console(notifier=self.notifier,
+                                              agent_timeout=3)
+        self.conn = qpid.messaging.Connection(self.broker.host,
+                                              self.broker.port,
+                                              self.broker.user,
+                                              self.broker.password)
+        self.conn.connect()
+        self.console.addConnection(self.conn)
+
+        for agent_app in self.agents:
+            aname = agent_app.agent.get_name()
+            agent = self.console.find_agent(aname, timeout=3)
+            self.assertTrue(agent and agent.get_name() == aname)
+
+        # console has discovered all agents, now query all undesc-2 objects
+        objs = self.console.get_objects(_object_id="undesc-2", _timeout=5)
+        self.assertTrue(len(objs) == self.agent_count)
+        for obj in objs:
+            self.assertTrue(obj.get_object_id() == "undesc-2")
+
+        # now query all objects from schema "package1"
+        objs = self.console.get_objects(_pname="package1", _timeout=5)
+        self.assertTrue(len(objs) == (self.agent_count * 3))
+        for obj in objs:
+            self.assertTrue(obj.get_schema_class_id().get_package_name() == "package1")
+
+        # now query all objects from schema "package2"
+        objs = self.console.get_objects(_pname="package2", _timeout=5)
+        self.assertTrue(len(objs) == (self.agent_count * 2))
+        for obj in objs:
+            self.assertTrue(obj.get_schema_class_id().get_package_name() == "package2")
+
+        # now query all objects from schema "package1/class2"
+        objs = self.console.get_objects(_pname="package1", _cname="class2", _timeout=5)
+        self.assertTrue(len(objs) == self.agent_count)
+        for obj in objs:
+            self.assertTrue(obj.get_schema_class_id().get_package_name() == "package1")
+            self.assertTrue(obj.get_schema_class_id().get_class_name() == "class2")
+
+        # given the schema identifier from the last query, repeat using the
+        # specific schema id
+        schema_id = objs[0].get_schema_class_id()
+        objs = self.console.get_objects(_schema_id=schema_id, _timeout=5)
+        self.assertTrue(len(objs) == self.agent_count)
+        for obj in objs:
+            self.assertTrue(obj.get_schema_class_id() == schema_id)
+
+
+        self.console.destroy(10)
+
+
+
+    def test_agent_subset(self):
+        # create console
+        # find all agents
+        # synchronous query for all objects by id
+        # verify known object ids are returned
+        self.notifier = _testNotifier()
+        self.console = qmf.qmfConsole.Console(notifier=self.notifier,
+                                              agent_timeout=3)
+        self.conn = qpid.messaging.Connection(self.broker.host,
+                                              self.broker.port,
+                                              self.broker.user,
+                                              self.broker.password)
+        self.conn.connect()
+        self.console.addConnection(self.conn)
+
+        agent_list = []
+        for agent_app in self.agents:
+            aname = agent_app.agent.get_name()
+            agent = self.console.find_agent(aname, timeout=3)
+            self.assertTrue(agent and agent.get_name() == aname)
+            agent_list.append(agent)
+
+        # Only use a subset of the agents:
+        agent_list = agent_list[:len(agent_list)/2]
+
+        # console has discovered all agents, now query all undesc-2 objects
+        objs = self.console.get_objects(_object_id="undesc-2",
+                                        _agents=agent_list, _timeout=5)
+        self.assertTrue(len(objs) == len(agent_list))
+        for obj in objs:
+            self.assertTrue(obj.get_object_id() == "undesc-2")
+
+        # now query all objects from schema "package1"
+        objs = self.console.get_objects(_pname="package1",
+                                        _agents=agent_list,
+                                        _timeout=5)
+        self.assertTrue(len(objs) == (len(agent_list) * 3))
+        for obj in objs:
+            self.assertTrue(obj.get_schema_class_id().get_package_name() == "package1")
+
+        # now query all objects from schema "package2"
+        objs = self.console.get_objects(_pname="package2", 
+                                        _agents=agent_list,
+                                        _timeout=5)
+        self.assertTrue(len(objs) == (len(agent_list) * 2))
+        for obj in objs:
+            self.assertTrue(obj.get_schema_class_id().get_package_name() == "package2")
+
+        # now query all objects from schema "package1/class2"
+        objs = self.console.get_objects(_pname="package1", _cname="class2", 
+                                        _agents=agent_list,
+                                        _timeout=5)
+        self.assertTrue(len(objs) == len(agent_list))
+        for obj in objs:
+            self.assertTrue(obj.get_schema_class_id().get_package_name() == "package1")
+            self.assertTrue(obj.get_schema_class_id().get_class_name() == "class2")
+
+        # given the schema identifier from the last query, repeat using the
+        # specific schema id
+        schema_id = objs[0].get_schema_class_id()
+        objs = self.console.get_objects(_schema_id=schema_id, 
+                                        _agents=agent_list,
+                                        _timeout=5)
+        self.assertTrue(len(objs) == len(agent_list))
+        for obj in objs:
+            self.assertTrue(obj.get_schema_class_id() == schema_id)
+
+
+        self.console.destroy(10)
+
+
+
+    def test_single_agent(self):
+        # create console
+        # find all agents
+        # synchronous query for all objects by id
+        # verify known object ids are returned
+        self.notifier = _testNotifier()
+        self.console = qmf.qmfConsole.Console(notifier=self.notifier,
+                                              agent_timeout=3)
+        self.conn = qpid.messaging.Connection(self.broker.host,
+                                              self.broker.port,
+                                              self.broker.user,
+                                              self.broker.password)
+        self.conn.connect()
+        self.console.addConnection(self.conn)
+
+        agent_list = []
+        for agent_app in self.agents:
+            aname = agent_app.agent.get_name()
+            agent = self.console.find_agent(aname, timeout=3)
+            self.assertTrue(agent and agent.get_name() == aname)
+            agent_list.append(agent)
+
+        # Only use one agetn
+        agent = agent_list[0]
+
+        # console has discovered all agents, now query all undesc-2 objects
+        objs = self.console.get_objects(_object_id="undesc-2",
+                                        _agents=agent, _timeout=5)
+        self.assertTrue(len(objs) == 1)
+        for obj in objs:
+            self.assertTrue(obj.get_object_id() == "undesc-2")
+
+        # now query all objects from schema "package1"
+        objs = self.console.get_objects(_pname="package1",
+                                        _agents=agent,
+                                        _timeout=5)
+        self.assertTrue(len(objs) == 3)
+        for obj in objs:
+            self.assertTrue(obj.get_schema_class_id().get_package_name() == "package1")
+
+        # now query all objects from schema "package2"
+        objs = self.console.get_objects(_pname="package2", 
+                                        _agents=agent,
+                                        _timeout=5)
+        self.assertTrue(len(objs) == 2)
+        for obj in objs:
+            self.assertTrue(obj.get_schema_class_id().get_package_name() == "package2")
+
+        # now query all objects from schema "package1/class2"
+        objs = self.console.get_objects(_pname="package1", _cname="class2", 
+                                        _agents=agent,
+                                        _timeout=5)
+        self.assertTrue(len(objs) == 1)
+        for obj in objs:
+            self.assertTrue(obj.get_schema_class_id().get_package_name() == "package1")
+            self.assertTrue(obj.get_schema_class_id().get_class_name() == "class2")
+
+        # given the schema identifier from the last query, repeat using the
+        # specific schema id
+        schema_id = objs[0].get_schema_class_id()
+        objs = self.console.get_objects(_schema_id=schema_id, 
+                                        _agents=agent,
+                                        _timeout=5)
+        self.assertTrue(len(objs) == 1)
+        for obj in objs:
+            self.assertTrue(obj.get_schema_class_id() == schema_id)
+
+
+        self.console.destroy(10)
+



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org