You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by tr...@apache.org on 2009/08/13 21:18:03 UTC

svn commit: r803996 - in /qpid/trunk/qpid/cpp/src: qpid/broker/DirectExchange.cpp qpid/broker/TopicExchange.cpp tests/federation.py

Author: tross
Date: Thu Aug 13 19:18:03 2009
New Revision: 803996

URL: http://svn.apache.org/viewvc?rev=803996&view=rev
Log:
QPID-1971 - bind in fedOpReorigen mode is not threadsafe for TopicExchange

Applied patch from Ken Giusti.


Modified:
    qpid/trunk/qpid/cpp/src/qpid/broker/DirectExchange.cpp
    qpid/trunk/qpid/cpp/src/qpid/broker/TopicExchange.cpp
    qpid/trunk/qpid/cpp/src/tests/federation.py

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/DirectExchange.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/DirectExchange.cpp?rev=803996&r1=803995&r2=803996&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/DirectExchange.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/DirectExchange.cpp Thu Aug 13 19:18:03 2009
@@ -92,12 +92,24 @@
         if (bk.fedBinding.count() == 0)
             unbind(queue, routingKey, 0);
     } else if (fedOp == fedOpReorigin) {
-        for (std::map<string, BoundKey>::iterator iter = bindings.begin();
-             iter != bindings.end(); iter++) {
-            const BoundKey& bk = iter->second;
-            if (bk.fedBinding.hasLocal()) {
-                propagateFedOp(iter->first, string(), fedOpBind, string());
+        /** gather up all the keys that need rebinding in a local vector
+         * while holding the lock.  Then propagate once the lock is
+         * released
+         */
+        std::vector<std::string> keys2prop;
+        {
+            Mutex::ScopedLock l(lock);    
+            for (Bindings::iterator iter = bindings.begin();
+                 iter != bindings.end(); iter++) {
+                const BoundKey& bk = iter->second;
+                if (bk.fedBinding.hasLocal()) {
+                    keys2prop.push_back(iter->first);
+                }
             }
+        }   /* lock dropped */
+        for (std::vector<std::string>::const_iterator key = keys2prop.begin();
+             key != keys2prop.end(); key++) {
+            propagateFedOp( *key, string(), fedOpBind, string());
         }
     }
 

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/TopicExchange.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/TopicExchange.cpp?rev=803996&r1=803995&r2=803996&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/TopicExchange.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/TopicExchange.cpp Thu Aug 13 19:18:03 2009
@@ -225,12 +225,25 @@
         if (reallyUnbind)
             unbind(queue, routingPattern, 0);
     } else if (fedOp == fedOpReorigin) {
-        for (BindingMap::iterator iter = bindings.begin();
-             iter != bindings.end(); iter++) {
-            const BoundKey& bk = iter->second;
-            if (bk.fedBinding.hasLocal()) {
-                propagateFedOp(iter->first, string(), fedOpBind, string());
+        /** gather up all the keys that need rebinding in a local vector
+         * while holding the lock.  Then propagate once the lock is
+         * released
+         */
+        std::vector<std::string> keys2prop;
+        {
+            RWlock::ScopedRlock l(lock);    
+            for (BindingMap::iterator iter = bindings.begin();
+                 iter != bindings.end(); iter++) {
+                const BoundKey& bk = iter->second;
+                
+                if (bk.fedBinding.hasLocal()) {
+                    keys2prop.push_back(iter->first);
+                }
             }
+        }   /* lock dropped */
+        for (std::vector<std::string>::const_iterator key = keys2prop.begin();
+             key != keys2prop.end(); key++) {
+            propagateFedOp( *key, string(), fedOpBind, string());
         }
     }
 

Modified: qpid/trunk/qpid/cpp/src/tests/federation.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/federation.py?rev=803996&r1=803995&r2=803996&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/federation.py (original)
+++ qpid/trunk/qpid/cpp/src/tests/federation.py Thu Aug 13 19:18:03 2009
@@ -455,7 +455,131 @@
         sleep(3)
         self.assertEqual(len(qmf.getObjects(_class="bridge")), 0)
         self.assertEqual(len(qmf.getObjects(_class="link")), 0)
+
+    def test_dynamic_topic_reorigin(self):
+        session = self.session
+        r_conn = self.connect(host=self.remote_host(), port=self.remote_port())
+        r_session = r_conn.session("test_dynamic_topic_reorigin")
+
+        session.exchange_declare(exchange="fed.topic_reorigin", type="topic")
+        r_session.exchange_declare(exchange="fed.topic_reorigin", type="topic")
+
+        session.exchange_declare(exchange="fed.topic_reorigin_2", type="topic")
+        r_session.exchange_declare(exchange="fed.topic_reorigin_2", type="topic")
+
+        self.startQmf()
+        qmf = self.qmf
+        broker = qmf.getObjects(_class="broker")[0]
+        result = broker.connect(self.remote_host(), self.remote_port(), False, "PLAIN", "guest", "guest", "tcp")
+        self.assertEqual(result.status, 0)
+
+        session.queue_declare(queue="fed2", exclusive=True, auto_delete=True)
+        session.exchange_bind(queue="fed2", exchange="fed.topic_reorigin_2", binding_key="ft-key.one.#")
+        self.subscribe(queue="fed2", destination="f2")
+        queue2 = session.incoming("f2")
+
+        link = qmf.getObjects(_class="link")[0]
+        result = link.bridge(False, "fed.topic_reorigin", "fed.topic_reorigin", "", "", "", False, False, True, 0)
+        self.assertEqual(result.status, 0)
+        result = link.bridge(False, "fed.topic_reorigin_2", "fed.topic_reorigin_2", "", "", "", False, False, True, 0)
+        self.assertEqual(result.status, 0)
+
+        bridge = qmf.getObjects(_class="bridge")[0]
+        bridge2 = qmf.getObjects(_class="bridge")[1]
+        sleep(5)
+
+        session.queue_declare(queue="fed1", exclusive=True, auto_delete=True)
+        session.exchange_bind(queue="fed1", exchange="fed.topic_reorigin", binding_key="ft-key.#")
+        self.subscribe(queue="fed1", destination="f1")
+        queue = session.incoming("f1")
+
+        for i in range(1, 11):
+            dp = r_session.delivery_properties(routing_key="ft-key.one.two")
+            r_session.message_transfer(destination="fed.topic_reorigin", message=Message(dp, "Message %d" % i))
+
+        for i in range(1, 11):
+            msg = queue.get(timeout=5)
+            self.assertEqual("Message %d" % i, msg.body)
+        try:
+            extra = queue.get(timeout=1)
+            self.fail("Got unexpected message in queue: " + extra.body)
+        except Empty: None
+
+        result = bridge.close()
+        self.assertEqual(result.status, 0)
+        result = bridge2.close()
+        self.assertEqual(result.status, 0)
+        result = link.close()
+        self.assertEqual(result.status, 0)
+
+        sleep(3)
+        self.assertEqual(len(qmf.getObjects(_class="bridge")), 0)
+        self.assertEqual(len(qmf.getObjects(_class="link")), 0)
+
+
         
+    def test_dynamic_direct_reorigin(self):
+        session = self.session
+        r_conn = self.connect(host=self.remote_host(), port=self.remote_port())
+        r_session = r_conn.session("test_dynamic_direct_reorigin")
+
+        session.exchange_declare(exchange="fed.direct_reorigin", type="direct")
+        r_session.exchange_declare(exchange="fed.direct_reorigin", type="direct")
+
+        session.exchange_declare(exchange="fed.direct_reorigin_2", type="direct")
+        r_session.exchange_declare(exchange="fed.direct_reorigin_2", type="direct")
+
+        self.startQmf()
+        qmf = self.qmf
+        broker = qmf.getObjects(_class="broker")[0]
+        result = broker.connect(self.remote_host(), self.remote_port(), False, "PLAIN", "guest", "guest", "tcp")
+        self.assertEqual(result.status, 0)
+
+        session.queue_declare(queue="fed2", exclusive=True, auto_delete=True)
+        session.exchange_bind(queue="fed2", exchange="fed.direct_reorigin_2", binding_key="ft-key.two")
+        self.subscribe(queue="fed2", destination="f2")
+        queue2 = session.incoming("f2")
+
+        link = qmf.getObjects(_class="link")[0]
+        result = link.bridge(False, "fed.direct_reorigin", "fed.direct_reorigin", "", "", "", False, False, True, 0)
+        self.assertEqual(result.status, 0)
+        result = link.bridge(False, "fed.direct_reorigin_2", "fed.direct_reorigin_2", "", "", "", False, False, True, 0)
+        self.assertEqual(result.status, 0)
+
+        bridge = qmf.getObjects(_class="bridge")[0]
+        bridge2 = qmf.getObjects(_class="bridge")[1]
+        sleep(5)
+
+        session.queue_declare(queue="fed1", exclusive=True, auto_delete=True)
+        session.exchange_bind(queue="fed1", exchange="fed.direct_reorigin", binding_key="ft-key.one")
+        self.subscribe(queue="fed1", destination="f1")
+        queue = session.incoming("f1")
+
+        for i in range(1, 11):
+            dp = r_session.delivery_properties(routing_key="ft-key.one")
+            r_session.message_transfer(destination="fed.direct_reorigin", message=Message(dp, "Message %d" % i))
+
+        for i in range(1, 11):
+            msg = queue.get(timeout=5)
+            self.assertEqual("Message %d" % i, msg.body)
+        try:
+            extra = queue.get(timeout=1)
+            self.fail("Got unexpected message in queue: " + extra.body)
+        except Empty: None
+
+        result = bridge.close()
+        self.assertEqual(result.status, 0)
+        result = bridge2.close()
+        self.assertEqual(result.status, 0)
+        result = link.close()
+        self.assertEqual(result.status, 0)
+
+        sleep(3)
+        self.assertEqual(len(qmf.getObjects(_class="bridge")), 0)
+        self.assertEqual(len(qmf.getObjects(_class="link")), 0)
+
+
+
     def getProperty(self, msg, name):
         for h in msg.headers:
             if hasattr(h, name): return getattr(h, name)



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org