You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kg...@apache.org on 2010/04/16 22:57:44 UTC

svn commit: r935079 - /qpid/trunk/qpid/cpp/src/tests/federation.py

Author: kgiusti
Date: Fri Apr 16 20:57:44 2010
New Revision: 935079

URL: http://svn.apache.org/viewvc?rev=935079&view=rev
Log:
QPID-2487: fix route propagation tests to prevent spurious timeouts.

Modified:
    qpid/trunk/qpid/cpp/src/tests/federation.py

Modified: qpid/trunk/qpid/cpp/src/tests/federation.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/federation.py?rev=935079&r1=935078&r2=935079&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/federation.py (original)
+++ qpid/trunk/qpid/cpp/src/tests/federation.py Fri Apr 16 20:57:44 2010
@@ -872,11 +872,24 @@ class FederationTests(TestBase010):
 
         self._setup_brokers()
 
-        # create direct exchange on each broker
+        # create direct exchange on each broker, and retrieve the corresponding
+        # management object for that exchange
 
+        exchanges=[]
         for _b in self._brokers:
             _b.client_session.exchange_declare(exchange="fedX.direct", type="direct")
 
+            # pull the exchange out of qmf...
+            my_exchange = None
+            objs = qmf.getObjects(_broker=_b.qmf_broker, _class="exchange")
+            for ooo in objs:
+                if ooo.name == "fedX.direct":
+                    my_exchange = ooo
+                    break
+            self.assertTrue(my_exchange is not None)
+            exchanges.append(my_exchange)
+        self.assertEqual(len(exchanges), len(self._brokers), "Exchange creation failed!")
+
         # connect B0 --> B1
         result = self._brokers[1].qmf_object.connect(self._brokers[0].host,
                                                      self._brokers[0].port,
@@ -928,29 +941,51 @@ class FederationTests(TestBase010):
         queue_3 = self._brokers[3].client_session.incoming("f1")
 
         # wait until the binding key has propagated to each broker (twice at
-        # broker 2)
+        # broker B1)
 
-        retries = 0
-        count = 0
-        for xxx in qmf.getObjects(_class="binding"):
-            if xxx.bindingKey == "spudboy":
-                count += 1
-        while count < 5:
-            retries += 1
-            if retries >= 10:
-                self.fail("binding did not propagate to all brokers!")
-                return
-            sleep(1)
-            count = 0
-            for xxx in qmf.getObjects(_class="binding"):
-                if xxx.bindingKey == "spudboy":
-                    count += 1
+        binding_counts = [1, 2, 1, 1]
+        self.assertEqual(len(binding_counts), len(exchanges), "Update Test!")
+        for i in range(len(exchanges)):
+            retries = 0
+            exchanges[i].update()
+            while exchanges[i].bindingCount < binding_counts[i]:
+                retries += 1
+                self.failIfEqual(retries, 10,
+                                 "binding failed to propagate to broker %d"
+                                 % i)
+                sleep(1)
+                exchanges[i].update()
 
         # send 10 msgs from B0
         for i in range(1, 11):
             dp = self._brokers[0].client_session.delivery_properties(routing_key="spudboy")
             self._brokers[0].client_session.message_transfer(destination="fedX.direct", message=Message(dp, "Message %d" % i))
 
+        # wait for 10 messages to be forwarded from B0->B1,
+        # 10 messages from B1->B2,
+        # and 10 messages from B1->B3
+        retries = 0
+        for ex in exchanges:
+            ex.update()
+        while (exchanges[0].msgReceives != 10 or exchanges[0].msgRoutes != 10 or
+               exchanges[1].msgReceives != 10 or exchanges[1].msgRoutes != 20 or
+               exchanges[2].msgReceives != 10 or exchanges[2].msgRoutes != 10 or
+               exchanges[3].msgReceives != 10 or exchanges[3].msgRoutes != 10):
+            retries += 1
+            self.failIfEqual(retries, 10,
+                             "federation failed to route msgs %d:%d %d:%d %d:%d %d:%d"
+                             % (exchanges[0].msgReceives,
+                                exchanges[0].msgRoutes,
+                                exchanges[1].msgReceives,
+                                exchanges[1].msgRoutes,
+                                exchanges[2].msgReceives,
+                                exchanges[2].msgRoutes,
+                                exchanges[3].msgReceives,
+                                exchanges[3].msgRoutes))
+            sleep(1)
+            for ex in exchanges:
+                ex.update()
+
         # get exactly 10 msgs on B2 and B3
         for i in range(1, 11):
             msg = queue_2.get(timeout=5)
@@ -968,19 +1003,54 @@ class FederationTests(TestBase010):
             self.fail("Got unexpected message in queue_3: " + extra.body)
         except Empty: None
 
+
         # tear down the queue on B2
         self._brokers[2].client_session.exchange_unbind(queue="fedX1", exchange="fedX.direct", binding_key="spudboy")
         self._brokers[2].client_session.message_cancel(destination="f1")
         self._brokers[2].client_session.queue_delete(queue="fedX1")
 
-        # @todo - find a proper way to check the propagation here!
+        # @todo - restore code when QPID-2499 fixed!!
         sleep(6)
+        # wait for the binding count on B1 to drop from 2 to 1
+        # retries = 0
+        # exchanges[1].update()
+        # while exchanges[1].bindingCount != 1:
+        #     retries += 1
+        #     self.failIfEqual(retries, 10,
+        #                      "unbinding failed to propagate to broker B1: %d"
+        #                      % exchanges[1].bindingCount)
+        #     sleep(1)
+        #     exchanges[1].update()
 
         # send 10 msgs from B0
         for i in range(1, 11):
             dp = self._brokers[0].client_session.delivery_properties(routing_key="spudboy")
             self._brokers[0].client_session.message_transfer(destination="fedX.direct", message=Message(dp, "Message %d" % i))
 
+        # verify messages are forwarded to B3 only
+        # note: why exchanges[1].msgRoutes == 40???, not 20???  QPID-2499?
+        retries = 0
+        for ex in exchanges:
+            ex.update()
+        while (exchanges[0].msgReceives != 20 or exchanges[0].msgRoutes != 20 or
+               exchanges[1].msgReceives != 20 or exchanges[1].msgRoutes != 40 or
+               exchanges[2].msgReceives != 20 or exchanges[2].msgDrops != 10 or exchanges[2].msgRoutes != 10 or
+               exchanges[3].msgReceives != 20 or exchanges[3].msgRoutes != 20):
+            retries += 1
+            self.failIfEqual(retries, 10,
+                             "federation failed to route more msgs %d:%d %d:%d %d:%d %d:%d"
+                             % (exchanges[0].msgReceives,
+                                exchanges[0].msgRoutes,
+                                exchanges[1].msgReceives,
+                                exchanges[1].msgRoutes,
+                                exchanges[2].msgReceives,
+                                exchanges[2].msgRoutes,
+                                exchanges[3].msgReceives,
+                                exchanges[3].msgRoutes))
+            sleep(1)
+            for ex in exchanges:
+                ex.update()
+
         # get exactly 10 msgs on B3 only
         for i in range(1, 11):
             msg = queue_3.get(timeout=5)
@@ -1034,10 +1104,22 @@ class FederationTests(TestBase010):
 
         self._setup_brokers()
 
-        # create topic exchange on each broker
+        # create exchange on each broker, and retrieve the corresponding
+        # management object for that exchange
 
+        exchanges=[]
         for _b in self._brokers:
             _b.client_session.exchange_declare(exchange="fedX.topic", type="topic")
+            # pull the exchange out of qmf...
+            my_exchange = None
+            objs = qmf.getObjects(_broker=_b.qmf_broker, _class="exchange")
+            for ooo in objs:
+                if ooo.name == "fedX.topic":
+                    my_exchange = ooo
+                    break
+            self.assertTrue(my_exchange is not None)
+            exchanges.append(my_exchange)
+        self.assertEqual(len(exchanges), len(self._brokers), "Exchange creation failed!")
 
         # connect B0 --> B1
         result = self._brokers[1].qmf_object.connect(self._brokers[0].host,
@@ -1090,29 +1172,51 @@ class FederationTests(TestBase010):
         queue_3 = self._brokers[3].client_session.incoming("f1")
 
         # wait until the binding key has propagated to each broker (twice at
-        # broker 2)
+        # broker B1)
 
-        retries = 0
-        count = 0
-        for xxx in qmf.getObjects(_class="binding"):
-            if xxx.bindingKey == "spud.*":
-                count += 1
-        while count < 5:
-            retries += 1
-            if retries >= 10:
-                self.fail("binding did not propagate to all brokers!")
-                return
-            sleep(1)
-            count = 0
-            for xxx in qmf.getObjects(_class="binding"):
-                if xxx.bindingKey == "spud.*":
-                    count += 1
+        binding_counts = [1, 2, 1, 1]
+        self.assertEqual(len(binding_counts), len(exchanges), "Update Test!")
+        for i in range(len(exchanges)):
+            retries = 0
+            exchanges[i].update()
+            while exchanges[i].bindingCount < binding_counts[i]:
+                retries += 1
+                self.failIfEqual(retries, 10,
+                                 "binding failed to propagate to broker %d"
+                                 % i)
+                sleep(1)
+                exchanges[i].update()
 
         # send 10 msgs from B0
         for i in range(1, 11):
             dp = self._brokers[0].client_session.delivery_properties(routing_key="spud.boy")
             self._brokers[0].client_session.message_transfer(destination="fedX.topic", message=Message(dp, "Message %d" % i))
 
+        # wait for 10 messages to be forwarded from B0->B1,
+        # 10 messages from B1->B2,
+        # and 10 messages from B1->B3
+        retries = 0
+        for ex in exchanges:
+            ex.update()
+        while (exchanges[0].msgReceives != 10 or exchanges[0].msgRoutes != 10 or
+               exchanges[1].msgReceives != 10 or exchanges[1].msgRoutes != 20 or
+               exchanges[2].msgReceives != 10 or exchanges[2].msgRoutes != 10 or
+               exchanges[3].msgReceives != 10 or exchanges[3].msgRoutes != 10):
+            retries += 1
+            self.failIfEqual(retries, 10,
+                             "federation failed to route msgs %d:%d %d:%d %d:%d %d:%d"
+                             % (exchanges[0].msgReceives,
+                                exchanges[0].msgRoutes,
+                                exchanges[1].msgReceives,
+                                exchanges[1].msgRoutes,
+                                exchanges[2].msgReceives,
+                                exchanges[2].msgRoutes,
+                                exchanges[3].msgReceives,
+                                exchanges[3].msgRoutes))
+            sleep(1)
+            for ex in exchanges:
+                ex.update()
+
         # get exactly 10 msgs on B2 and B3
         for i in range(1, 11):
             msg = queue_2.get(timeout=5)
@@ -1135,14 +1239,48 @@ class FederationTests(TestBase010):
         self._brokers[2].client_session.message_cancel(destination="f1")
         self._brokers[2].client_session.queue_delete(queue="fedX1")
 
-        # @todo - find a proper way to check the propagation here!
+        # @todo - restore code when QPID-2499 fixed!!
         sleep(6)
+        # wait for the binding count on B1 to drop from 2 to 1
+        # retries = 0
+        # exchanges[1].update()
+        # while exchanges[1].bindingCount != 1:
+        #     retries += 1
+        #     self.failIfEqual(retries, 10,
+        #                      "unbinding failed to propagate to broker B1: %d"
+        #                      % exchanges[1].bindingCount)
+        #     sleep(1)
+        #     exchanges[1].update()
 
         # send 10 msgs from B0
         for i in range(1, 11):
             dp = self._brokers[0].client_session.delivery_properties(routing_key="spud.boy")
             self._brokers[0].client_session.message_transfer(destination="fedX.topic", message=Message(dp, "Message %d" % i))
 
+        # verify messages are forwarded to B3 only
+        # note: why exchanges[1].msgRoutes == 40???, not 20???  QPID-2499?
+        retries = 0
+        for ex in exchanges:
+            ex.update()
+        while (exchanges[0].msgReceives != 20 or exchanges[0].msgRoutes != 20 or
+               exchanges[1].msgReceives != 20 or exchanges[1].msgRoutes != 40 or
+               exchanges[2].msgReceives != 20 or exchanges[2].msgDrops != 10 or exchanges[2].msgRoutes != 10 or
+               exchanges[3].msgReceives != 20 or exchanges[3].msgRoutes != 20):
+            retries += 1
+            self.failIfEqual(retries, 10,
+                             "federation failed to route more msgs %d:%d %d:%d %d:%d %d:%d"
+                             % (exchanges[0].msgReceives,
+                                exchanges[0].msgRoutes,
+                                exchanges[1].msgReceives,
+                                exchanges[1].msgRoutes,
+                                exchanges[2].msgReceives,
+                                exchanges[2].msgRoutes,
+                                exchanges[3].msgReceives,
+                                exchanges[3].msgRoutes))
+            sleep(1)
+            for ex in exchanges:
+                ex.update()
+
         # get exactly 10 msgs on B3 only
         for i in range(1, 11):
             msg = queue_3.get(timeout=5)
@@ -1197,10 +1335,22 @@ class FederationTests(TestBase010):
 
         self._setup_brokers()
 
-        # create fanout exchange on each broker
+        # create fanout exchange on each broker, and retrieve the corresponding
+        # management object for that exchange
 
+        exchanges=[]
         for _b in self._brokers:
             _b.client_session.exchange_declare(exchange="fedX.fanout", type="fanout")
+            # pull the exchange out of qmf...
+            my_exchange = None
+            objs = qmf.getObjects(_broker=_b.qmf_broker, _class="exchange")
+            for ooo in objs:
+                if ooo.name == "fedX.fanout":
+                    my_exchange = ooo
+                    break
+            self.assertTrue(my_exchange is not None)
+            exchanges.append(my_exchange)
+        self.assertEqual(len(exchanges), len(self._brokers), "Exchange creation failed!")
 
         # connect B0 --> B1
         result = self._brokers[1].qmf_object.connect(self._brokers[0].host,
@@ -1252,31 +1402,52 @@ class FederationTests(TestBase010):
         self.subscribe(self._brokers[3].client_session, queue="fedX1", destination="f1")
         queue_3 = self._brokers[3].client_session.incoming("f1")
 
-        # wait until the binding has propagated to each broker by
-        # counting the number of bindings that have origins.
-        # Should have 3: 1 for B0, 2 for B1.
+        # wait until the binding key has propagated to each broker (twice at
+        # broker B1)
 
-        retries = 0
-        count = 0
-        for xxx in qmf.getObjects(_class="binding"):
-            if xxx.origin:
-                count += 1
-        while count < 3:
-            retries += 1
-            if retries >= 10:
-                self.fail("binding did not propagate to all brokers!")
-                return
-            sleep(1)
-            count = 0
-            for xxx in qmf.getObjects(_class="binding"):
-                if xxx.origin:
-                    count += 1
+        binding_counts = [1, 2, 1, 1]
+        self.assertEqual(len(binding_counts), len(exchanges), "Update Test!")
+        for i in range(len(exchanges)):
+            retries = 0
+            exchanges[i].update()
+            while exchanges[i].bindingCount < binding_counts[i]:
+                retries += 1
+                self.failIfEqual(retries, 10,
+                                 "binding failed to propagate to broker %d"
+                                 % i)
+                sleep(1)
+                exchanges[i].update()
 
         # send 10 msgs from B0
         for i in range(1, 11):
             dp = self._brokers[0].client_session.delivery_properties()
             self._brokers[0].client_session.message_transfer(destination="fedX.fanout", message=Message(dp, "Message %d" % i))
 
+        # wait for 10 messages to be forwarded from B0->B1,
+        # 10 messages from B1->B2,
+        # and 10 messages from B1->B3
+        retries = 0
+        for ex in exchanges:
+            ex.update()
+        while (exchanges[0].msgReceives != 10 or exchanges[0].msgRoutes != 10 or
+               exchanges[1].msgReceives != 10 or exchanges[1].msgRoutes != 20 or
+               exchanges[2].msgReceives != 10 or exchanges[2].msgRoutes != 10 or
+               exchanges[3].msgReceives != 10 or exchanges[3].msgRoutes != 10):
+            retries += 1
+            self.failIfEqual(retries, 10,
+                             "federation failed to route msgs %d:%d %d:%d %d:%d %d:%d"
+                             % (exchanges[0].msgReceives,
+                                exchanges[0].msgRoutes,
+                                exchanges[1].msgReceives,
+                                exchanges[1].msgRoutes,
+                                exchanges[2].msgReceives,
+                                exchanges[2].msgRoutes,
+                                exchanges[3].msgReceives,
+                                exchanges[3].msgRoutes))
+            sleep(1)
+            for ex in exchanges:
+                ex.update()
+
         # get exactly 10 msgs on B2 and B3
         for i in range(1, 11):
             msg = queue_2.get(timeout=5)
@@ -1301,12 +1472,46 @@ class FederationTests(TestBase010):
 
         # @todo - find a proper way to check the propagation here!
         sleep(6)
+        # wait for the binding count on B1 to drop from 2 to 1
+        # retries = 0
+        # exchanges[1].update()
+        # while exchanges[1].bindingCount != 1:
+        #     retries += 1
+        #     self.failIfEqual(retries, 10,
+        #                      "unbinding failed to propagate to broker B1: %d"
+        #                      % exchanges[1].bindingCount)
+        #     sleep(1)
+        #     exchanges[1].update()
 
         # send 10 msgs from B0
         for i in range(1, 11):
             dp = self._brokers[0].client_session.delivery_properties()
             self._brokers[0].client_session.message_transfer(destination="fedX.fanout", message=Message(dp, "Message %d" % i))
 
+        # verify messages are forwarded to B3 only
+        # note: why exchanges[1].msgRoutes == 40???, not 20???  QPID-2499?
+        retries = 0
+        for ex in exchanges:
+            ex.update()
+        while (exchanges[0].msgReceives != 20 or exchanges[0].msgRoutes != 20 or
+               exchanges[1].msgReceives != 20 or exchanges[1].msgRoutes != 40 or
+               exchanges[2].msgReceives != 20 or exchanges[2].msgDrops != 10 or exchanges[2].msgRoutes != 10 or
+               exchanges[3].msgReceives != 20 or exchanges[3].msgRoutes != 20):
+            retries += 1
+            self.failIfEqual(retries, 10,
+                             "federation failed to route more msgs %d:%d %d:%d %d:%d %d:%d"
+                             % (exchanges[0].msgReceives,
+                                exchanges[0].msgRoutes,
+                                exchanges[1].msgReceives,
+                                exchanges[1].msgRoutes,
+                                exchanges[2].msgReceives,
+                                exchanges[2].msgRoutes,
+                                exchanges[3].msgReceives,
+                                exchanges[3].msgRoutes))
+            sleep(1)
+            for ex in exchanges:
+                ex.update()
+
         # get exactly 10 msgs on B3 only
         for i in range(1, 11):
             msg = queue_3.get(timeout=5)



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org