You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2011/04/19 19:46:04 UTC

svn commit: r1095144 - in /qpid/trunk/qpid/cpp/src: qpid/broker/Exchange.cpp qpid/broker/Exchange.h qpid/broker/ExchangeRegistry.cpp qpid/broker/SemanticState.cpp tests/cluster_tests.py

Author: aconway
Date: Tue Apr 19 17:46:03 2011
New Revision: 1095144

URL: http://svn.apache.org/viewvc?rev=1095144&view=rev
Log:

QPID-3215: cached exchange reference can cause cluster inconsistencies if exchange is deleted/recreated

SemanticState::route() uses a simple cache variable to avoid looking
up the exchange for every message. However if the exchange in question
is deleted, even if then recreated, this can cause inconsistencies in
a cluster.

Even in a stand-alone broker messages can be routed by a deleted
exchange because of the cache.

Fix is to mark the exchange deleted and check the status when using
the cached exchange.

Modified:
    qpid/trunk/qpid/cpp/src/qpid/broker/Exchange.cpp
    qpid/trunk/qpid/cpp/src/qpid/broker/Exchange.h
    qpid/trunk/qpid/cpp/src/qpid/broker/ExchangeRegistry.cpp
    qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp
    qpid/trunk/qpid/cpp/src/tests/cluster_tests.py

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Exchange.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Exchange.cpp?rev=1095144&r1=1095143&r2=1095144&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Exchange.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Exchange.cpp Tue Apr 19 17:46:03 2011
@@ -161,7 +161,7 @@ void Exchange::routeIVE(){
 
 Exchange::Exchange (const string& _name, Manageable* parent, Broker* b) :
     name(_name), durable(false), persistenceId(0), sequence(false),
-    sequenceNo(0), ive(false), mgmtExchange(0), broker(b)
+    sequenceNo(0), ive(false), mgmtExchange(0), broker(b), destroyed(false)
 {
     if (parent != 0 && broker != 0)
     {

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Exchange.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Exchange.h?rev=1095144&r1=1095143&r2=1095144&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Exchange.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Exchange.h Tue Apr 19 17:46:03 2011
@@ -10,9 +10,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *   http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -82,15 +82,15 @@ protected:
     private:
         Exchange* parent;
     };
-           
+
     typedef boost::shared_ptr<const std::vector<boost::shared_ptr<qpid::broker::Exchange::Binding> > > ConstBindingList;
     typedef boost::shared_ptr<      std::vector<boost::shared_ptr<qpid::broker::Exchange::Binding> > > BindingList;
     void doRoute(Deliverable& msg, ConstBindingList b);
     void routeIVE();
-           
+
 
     struct MatchQueue {
-        const boost::shared_ptr<Queue> queue;        
+        const boost::shared_ptr<Queue> queue;
         MatchQueue(boost::shared_ptr<Queue> q);
         bool operator()(Exchange::Binding::shared_ptr b);
     };
@@ -195,7 +195,7 @@ public:
     virtual bool isBound(boost::shared_ptr<Queue> queue, const std::string* const routingKey, const qpid::framing::FieldTable* const args) = 0;
     QPID_BROKER_EXTERN virtual void setProperties(const boost::intrusive_ptr<Message>&);
     virtual void route(Deliverable& msg, const std::string& routingKey, const qpid::framing::FieldTable* args) = 0;
-    
+
     //PersistableExchange:
     QPID_BROKER_EXTERN void setPersistenceId(uint64_t id) const;
     uint64_t getPersistenceId() const { return persistenceId; }
@@ -228,14 +228,18 @@ public:
 
     bool routeWithAlternate(Deliverable& message);
 
+    void destroy() { destroyed = true; }
+    bool isDestroyed() const { return destroyed; }
+
 protected:
     qpid::sys::Mutex bridgeLock;
     std::vector<DynamicBridge*> bridgeVector;
     Broker* broker;
+    bool destroyed;
 
     QPID_BROKER_EXTERN virtual void handleHelloRequest();
     void propagateFedOp(const std::string& routingKey, const std::string& tags,
-                        const std::string& op,         const std::string& origin, 
+                        const std::string& op,         const std::string& origin,
                         qpid::framing::FieldTable* extra_args=0);
 };
 

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/ExchangeRegistry.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/ExchangeRegistry.cpp?rev=1095144&r1=1095143&r2=1095144&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/ExchangeRegistry.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/ExchangeRegistry.cpp Tue Apr 19 17:46:03 2011
@@ -7,9 +7,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *   http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -39,7 +39,7 @@ pair<Exchange::shared_ptr, bool> Exchang
     return declare(name, type, false, FieldTable());
 }
 
-pair<Exchange::shared_ptr, bool> ExchangeRegistry::declare(const string& name, const string& type, 
+pair<Exchange::shared_ptr, bool> ExchangeRegistry::declare(const string& name, const string& type,
                                                            bool durable, const FieldTable& args){
     RWlock::ScopedWlock locker(lock);
     ExchangeMap::iterator i =  exchanges.find(name);
@@ -61,7 +61,7 @@ pair<Exchange::shared_ptr, bool> Exchang
         }else{
             FunctionMap::iterator i =  factory.find(type);
             if (i == factory.end()) {
-                throw UnknownExchangeTypeException();    
+                throw UnknownExchangeTypeException();
             } else {
                 exchange = i->second(name, durable, args, parent, broker);
             }
@@ -82,6 +82,7 @@ void ExchangeRegistry::destroy(const str
     RWlock::ScopedWlock locker(lock);
     ExchangeMap::iterator i =  exchanges.find(name);
     if (i != exchanges.end()) {
+        i->second->destroy();
         exchanges.erase(i);
     }
 }
@@ -104,7 +105,7 @@ void ExchangeRegistry::registerType(cons
 }
 
 
-namespace 
+namespace
 {
 const std::string empty;
 }

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp?rev=1095144&r1=1095143&r2=1095144&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp Tue Apr 19 17:46:03 2011
@@ -462,7 +462,7 @@ void SemanticState::route(intrusive_ptr<
     msg->setTimestamp(getSession().getBroker().getExpiryPolicy());
 
     std::string exchangeName = msg->getExchangeName();
-    if (!cacheExchange || cacheExchange->getName() != exchangeName)
+    if (!cacheExchange || cacheExchange->getName() != exchangeName || cacheExchange->isDestroyed())
         cacheExchange = session.getBroker().getExchanges().get(exchangeName);
     cacheExchange->setProperties(msg);
 

Modified: qpid/trunk/qpid/cpp/src/tests/cluster_tests.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/cluster_tests.py?rev=1095144&r1=1095143&r2=1095144&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/cluster_tests.py (original)
+++ qpid/trunk/qpid/cpp/src/tests/cluster_tests.py Tue Apr 19 17:46:03 2011
@@ -479,6 +479,54 @@ acl allow all all
         for q in ['a','b','c']: self.assert_browse(s1,q,[str(n) for n in xrange(1,6)])
         self.assert_browse(s1,'d',[str(n) for n in xrange(1,5)])
 
+    def test_deleted_exchange(self):
+        """QPID-3215: cached exchange reference can cause cluster inconsistencies
+        if exchange is deleted/recreated
+        Verify stand-alone case
+        """
+        cluster = self.cluster()
+        # Verify we do not route message via an exchange that has been destroyed.
+        cluster.start()
+        s0 = cluster[0].connect().session()
+        self.evaluate_address(s0, "ex;{create:always,node:{type:topic}}")
+        self.evaluate_address(s0, "q;{create:always,node:{x-bindings:[{exchange:'ex',queue:q,key:foo}]}}")
+        send0 = s0.sender("ex/foo")
+        send0.send("foo")
+        self.assert_browse(s0, "q", ["foo"])
+        self.evaluate_address(s0, "ex;{delete:always}")
+        try:
+            send0.send("bar")     # Should fail, exchange is deleted.
+            self.fail("Expected not-found exception")
+        except qpid.messaging.NotFound: pass
+        # FIXME aconway 2011-04-19: s0 is broken, new session
+        self.assert_browse(cluster[0].connect().session(), "q", ["foo"])
+
+    def test_deleted_exchange_inconsistent(self):
+        """QPID-3215: cached exchange reference can cause cluster inconsistencies
+        if exchange is deleted/recreated
+
+        Verify cluster inconsistency.
+        """
+        cluster = self.cluster()
+        cluster.start()
+        s0 = cluster[0].connect().session()
+        self.evaluate_address(s0, "ex;{create:always,node:{type:topic}}")
+        self.evaluate_address(s0, "q;{create:always,node:{x-bindings:[{exchange:'ex',queue:q,key:foo}]}}")
+        send0 = s0.sender("ex/foo")
+        send0.send("foo")
+        self.assert_browse(s0, "q", ["foo"])
+
+        cluster.start()
+        s1 = cluster[1].connect().session()
+        self.evaluate_address(s0, "ex;{delete:always}")
+        try:
+            send0.send("bar")
+            self.fail("Expected not-found exception")
+        except qpid.messaging.NotFound: pass
+
+        self.assert_browse(s1, "q", ["foo"])
+
+
 class LongTests(BrokerTest):
     """Tests that can run for a long time if -DDURATION=<minutes> is set"""
     def duration(self):



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org