You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kg...@apache.org on 2010/01/27 15:41:56 UTC
svn commit: r903660 - in /qpid/trunk/qpid/python/qmf2/tests:
agent_discovery.py basic_method.py basic_query.py events.py obj_gets.py
Author: kgiusti
Date: Wed Jan 27 14:41:55 2010
New Revision: 903660
URL: http://svn.apache.org/viewvc?rev=903660&view=rev
Log:
QPID-2261: add wait in tests for agent setup to complete
Modified:
qpid/trunk/qpid/python/qmf2/tests/agent_discovery.py
qpid/trunk/qpid/python/qmf2/tests/basic_method.py
qpid/trunk/qpid/python/qmf2/tests/basic_query.py
qpid/trunk/qpid/python/qmf2/tests/events.py
qpid/trunk/qpid/python/qmf2/tests/obj_gets.py
Modified: qpid/trunk/qpid/python/qmf2/tests/agent_discovery.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/python/qmf2/tests/agent_discovery.py?rev=903660&r1=903659&r2=903660&view=diff
==============================================================================
--- qpid/trunk/qpid/python/qmf2/tests/agent_discovery.py (original)
+++ qpid/trunk/qpid/python/qmf2/tests/agent_discovery.py Wed Jan 27 14:41:55 2010
@@ -56,10 +56,14 @@
_heartbeat_interval=heartbeat)
# No database needed for this test
self.running = False
+ self.ready = Event()
def start_app(self):
self.running = True
self.start()
+ self.ready.wait(10)
+ if not self.ready.is_set():
+ raise Exception("Agent failed to connect to broker.")
def stop_app(self):
self.running = False
@@ -78,6 +82,7 @@
self.broker_url.password)
conn.connect()
self.agent.set_connection(conn)
+ self.ready.set()
while self.running:
self.notifier.wait_for_work(None)
Modified: qpid/trunk/qpid/python/qmf2/tests/basic_method.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/python/qmf2/tests/basic_method.py?rev=903660&r1=903659&r2=903660&view=diff
==============================================================================
--- qpid/trunk/qpid/python/qmf2/tests/basic_method.py (original)
+++ qpid/trunk/qpid/python/qmf2/tests/basic_method.py Wed Jan 27 14:41:55 2010
@@ -47,9 +47,10 @@
class _agentApp(Thread):
- def __init__(self, name, heartbeat):
+ def __init__(self, name, broker_url, heartbeat):
Thread.__init__(self)
self.notifier = _testNotifier()
+ self.broker_url = broker_url
self.agent = Agent(name,
_notifier=self.notifier,
_heartbeat_interval=heartbeat)
@@ -108,33 +109,34 @@
_obj2.set_value("field4", ["a", "list", "value"])
self.agent.add_object(_obj2)
+ self.running = False
+ self.ready = Event()
+
+ def start_app(self):
self.running = True
self.start()
+ self.ready.wait(10)
+ if not self.ready.is_set():
+ raise Exception("Agent failed to connect to broker.")
- def connect_agent(self, broker_url):
- # broker_url = "user/passwd@hostname:port"
- self.conn = qpid.messaging.Connection(broker_url.host,
- broker_url.port,
- broker_url.user,
- broker_url.password)
- self.conn.connect()
- self.agent.set_connection(self.conn)
-
- def disconnect_agent(self, timeout):
- if self.conn:
- self.agent.remove_connection(timeout)
-
- def shutdown_agent(self, timeout):
- self.agent.destroy(timeout)
-
- def stop(self):
+ def stop_app(self):
self.running = False
+ # wake main thread
self.notifier.indication() # hmmm... collide with daemon???
self.join(10)
if self.isAlive():
raise Exception("AGENT DID NOT TERMINATE AS EXPECTED!!!")
def run(self):
+ # broker_url = "user/passwd@hostname:port"
+ self.conn = qpid.messaging.Connection(self.broker_url.host,
+ self.broker_url.port,
+ self.broker_url.user,
+ self.broker_url.password)
+ self.conn.connect()
+ self.agent.set_connection(self.conn)
+ self.ready.set()
+
# Agent application main processing loop
while self.running:
self.notifier.wait_for_work(None)
@@ -197,6 +199,10 @@
self.agent.release_workitem(wi)
wi = self.agent.get_next_workitem(timeout=0)
+ if self.conn:
+ self.agent.remove_connection(10)
+ self.agent.destroy(10)
+
class BaseTest(unittest.TestCase):
@@ -206,20 +212,19 @@
self.defines = self.config.defines
def setUp(self):
- # one second agent indication interval
- self.agent1 = _agentApp("agent1", 1)
- self.agent1.connect_agent(self.broker)
- self.agent2 = _agentApp("agent2", 1)
- self.agent2.connect_agent(self.broker)
+ # one second agent heartbeat interval
+ self.agent_heartbeat = 1
+ self.agent1 = _agentApp("agent1", self.broker, self.agent_heartbeat)
+ self.agent1.start_app()
+ self.agent2 = _agentApp("agent2", self.broker, self.agent_heartbeat)
+ self.agent2.start_app()
def tearDown(self):
if self.agent1:
- self.agent1.shutdown_agent(10)
- self.agent1.stop()
+ self.agent1.stop_app()
self.agent1 = None
if self.agent2:
- self.agent2.shutdown_agent(10)
- self.agent2.stop()
+ self.agent2.stop_app()
self.agent2 = None
def test_described_obj(self):
Modified: qpid/trunk/qpid/python/qmf2/tests/basic_query.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/python/qmf2/tests/basic_query.py?rev=903660&r1=903659&r2=903660&view=diff
==============================================================================
--- qpid/trunk/qpid/python/qmf2/tests/basic_query.py (original)
+++ qpid/trunk/qpid/python/qmf2/tests/basic_query.py Wed Jan 27 14:41:55 2010
@@ -47,9 +47,10 @@
class _agentApp(Thread):
- def __init__(self, name, heartbeat):
+ def __init__(self, name, broker_url, heartbeat):
Thread.__init__(self)
self.notifier = _testNotifier()
+ self.broker_url = broker_url
self.agent = Agent(name,
_notifier=self.notifier,
_heartbeat_interval=heartbeat)
@@ -108,33 +109,34 @@
_obj2.set_value("field4", ["a", "list", "value"])
self.agent.add_object(_obj2)
+ self.running = False
+ self.ready = Event()
+
+ def start_app(self):
self.running = True
self.start()
+ self.ready.wait(10)
+ if not self.ready.is_set():
+ raise Exception("Agent failed to connect to broker.")
- def connect_agent(self, broker_url):
- # broker_url = "user/passwd@hostname:port"
- self.conn = qpid.messaging.Connection(broker_url.host,
- broker_url.port,
- broker_url.user,
- broker_url.password)
- self.conn.connect()
- self.agent.set_connection(self.conn)
-
- def disconnect_agent(self, timeout):
- if self.conn:
- self.agent.remove_connection(timeout)
-
- def shutdown_agent(self, timeout):
- self.agent.destroy(timeout)
-
- def stop(self):
+ def stop_app(self):
self.running = False
+ # wake main thread
self.notifier.indication() # hmmm... collide with daemon???
self.join(10)
if self.isAlive():
raise Exception("AGENT DID NOT TERMINATE AS EXPECTED!!!")
def run(self):
+ # broker_url = "user/passwd@hostname:port"
+ self.conn = qpid.messaging.Connection(self.broker_url.host,
+ self.broker_url.port,
+ self.broker_url.user,
+ self.broker_url.password)
+ self.conn.connect()
+ self.agent.set_connection(self.conn)
+ self.ready.set()
+
while self.running:
self.notifier.wait_for_work(None)
wi = self.agent.get_next_workitem(timeout=0)
@@ -143,6 +145,11 @@
self.agent.release_workitem(wi)
wi = self.agent.get_next_workitem(timeout=0)
+ if self.conn:
+ self.agent.remove_connection(10)
+ self.agent.destroy(10)
+
+
class BaseTest(unittest.TestCase):
@@ -153,19 +160,18 @@
def setUp(self):
# one second agent indication interval
- self.agent1 = _agentApp("agent1", 1)
- self.agent1.connect_agent(self.broker)
- self.agent2 = _agentApp("agent2", 1)
- self.agent2.connect_agent(self.broker)
+ self.agent_heartbeat = 1
+ self.agent1 = _agentApp("agent1", self.broker, self.agent_heartbeat)
+ self.agent1.start_app()
+ self.agent2 = _agentApp("agent2", self.broker, self.agent_heartbeat)
+ self.agent2.start_app()
def tearDown(self):
if self.agent1:
- self.agent1.shutdown_agent(10)
- self.agent1.stop()
+ self.agent1.stop_app()
self.agent1 = None
if self.agent2:
- self.agent2.shutdown_agent(10)
- self.agent2.stop()
+ self.agent2.stop_app()
self.agent2 = None
def test_all_oids(self):
Modified: qpid/trunk/qpid/python/qmf2/tests/events.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/python/qmf2/tests/events.py?rev=903660&r1=903659&r2=903660&view=diff
==============================================================================
--- qpid/trunk/qpid/python/qmf2/tests/events.py (original)
+++ qpid/trunk/qpid/python/qmf2/tests/events.py Wed Jan 27 14:41:55 2010
@@ -75,10 +75,16 @@
self.agent.register_object_class(_schema)
self.running = False
+ self.ready = Event()
def start_app(self):
self.running = True
self.start()
+ self.ready.wait(10)
+ if not self.ready.is_set():
+ raise Exception("Agent failed to connect to broker.")
+ # time.sleep(1)
+ print("!!! agent=%s setup complete (%s)" % (self.agent, time.time()))
def stop_app(self):
self.running = False
@@ -100,6 +106,8 @@
raise Skipped(e)
self.agent.set_connection(conn)
+ print("!!! agent=%s connection done (%s)" % (self.agent, time.time()))
+ self.ready.set()
counter = 1
while self.running:
@@ -164,9 +172,7 @@
# find the agents
for aname in ["agent1", "agent2"]:
- print("!!! finding aname=%s (%s)" % (aname, time.time()))
agent = self.console.find_agent(aname, timeout=3)
- print("!!! agent=%s aname=%s (%s)" % (agent, aname, time.time()))
self.assertTrue(agent and agent.get_name() == aname)
# now wait for events
Modified: qpid/trunk/qpid/python/qmf2/tests/obj_gets.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/python/qmf2/tests/obj_gets.py?rev=903660&r1=903659&r2=903660&view=diff
==============================================================================
--- qpid/trunk/qpid/python/qmf2/tests/obj_gets.py (original)
+++ qpid/trunk/qpid/python/qmf2/tests/obj_gets.py Wed Jan 27 14:41:55 2010
@@ -47,9 +47,10 @@
class _agentApp(Thread):
- def __init__(self, name, heartbeat):
+ def __init__(self, name, broker_url, heartbeat):
Thread.__init__(self)
self.notifier = _testNotifier()
+ self.broker_url = broker_url
self.agent = Agent(name,
_notifier=self.notifier,
_heartbeat_interval=heartbeat)
@@ -138,26 +139,17 @@
_obj.set_value("key-2", 2)
self.agent.add_object(_obj)
+ self.running = False
+ self.ready = Event()
+
+ def start_app(self):
self.running = True
self.start()
+ self.ready.wait(10)
+ if not self.ready.is_set():
+ raise Exception("Agent failed to connect to broker.")
- def connect_agent(self, broker_url):
- # broker_url = "user/passwd@hostname:port"
- self.conn = qpid.messaging.Connection(broker_url.host,
- broker_url.port,
- broker_url.user,
- broker_url.password)
- self.conn.connect()
- self.agent.set_connection(self.conn)
-
- def disconnect_agent(self, timeout):
- if self.conn:
- self.agent.remove_connection(timeout)
-
- def shutdown_agent(self, timeout):
- self.agent.destroy(timeout)
-
- def stop(self):
+ def stop_app(self):
self.running = False
self.notifier.indication() # hmmm... collide with daemon???
self.join(10)
@@ -165,6 +157,15 @@
raise Exception("AGENT DID NOT TERMINATE AS EXPECTED!!!")
def run(self):
+ # broker_url = "user/passwd@hostname:port"
+ self.conn = qpid.messaging.Connection(self.broker_url.host,
+ self.broker_url.port,
+ self.broker_url.user,
+ self.broker_url.password)
+ self.conn.connect()
+ self.agent.set_connection(self.conn)
+ self.ready.set()
+
while self.running:
self.notifier.wait_for_work(None)
wi = self.agent.get_next_workitem(timeout=0)
@@ -173,6 +174,9 @@
self.agent.release_workitem(wi)
wi = self.agent.get_next_workitem(timeout=0)
+ if self.conn:
+ self.agent.remove_connection(10)
+ self.agent.destroy(10)
class BaseTest(unittest.TestCase):
@@ -186,15 +190,14 @@
def setUp(self):
self.agents = []
for i in range(self.agent_count):
- agent = _agentApp("agent-" + str(i), 1)
- agent.connect_agent(self.broker)
+ agent = _agentApp("agent-" + str(i), self.broker, 1)
+ agent.start_app()
self.agents.append(agent)
def tearDown(self):
for agent in self.agents:
if agent is not None:
- agent.shutdown_agent(10)
- agent.stop()
+ agent.stop_app()
def test_all_agents(self):
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org