You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2012/11/14 17:04:07 UTC

svn commit: r1409241 - in /qpid/trunk/qpid/cpp: include/qpid/framing/ include/qpid/types/ src/qpid/broker/ src/qpid/framing/ src/qpid/ha/ src/qpid/types/ src/tests/

Author: aconway
Date: Wed Nov 14 16:04:04 2012
New Revision: 1409241

URL: http://svn.apache.org/viewvc?rev=1409241&view=rev
Log:
QPID-4428: HA add UUID tag to avoid using an out of date queue/exchange.

Imagine a cluster with primary A and backups B and C. A queue Q is created on A
and replicated to B, C. Now A dies and B takes over as primary. Before C can
connect to B, a client destroys Q and creates a new queue with the same name.
When B connects it sees Q and incorrectly assumes it is the same Q that it has
already replicated. Now C has an inconsistent replica of Q.

The fix is to tag queues/exchanges with a UUID so a backup can tell if a queue
is not the same as the one it has already replicated, even if the names are the
same.  This all also applies to exchanges.

- Minor imrovements to printing UUIDs in a FieldTable.
- Fix comparison of void Variants, added operator !=

Modified:
    qpid/trunk/qpid/cpp/include/qpid/framing/FieldValue.h
    qpid/trunk/qpid/cpp/include/qpid/types/Variant.h
    qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp
    qpid/trunk/qpid/cpp/src/qpid/broker/ConfigurationObserver.h
    qpid/trunk/qpid/cpp/src/qpid/broker/Exchange.cpp
    qpid/trunk/qpid/cpp/src/qpid/broker/Exchange.h
    qpid/trunk/qpid/cpp/src/qpid/broker/ExchangeRegistry.cpp
    qpid/trunk/qpid/cpp/src/qpid/broker/ExchangeRegistry.h
    qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp
    qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h
    qpid/trunk/qpid/cpp/src/qpid/broker/QueueRegistry.cpp
    qpid/trunk/qpid/cpp/src/qpid/broker/SessionAdapter.cpp
    qpid/trunk/qpid/cpp/src/qpid/framing/FieldValue.cpp
    qpid/trunk/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp
    qpid/trunk/qpid/cpp/src/qpid/ha/Primary.cpp
    qpid/trunk/qpid/cpp/src/qpid/ha/Primary.h
    qpid/trunk/qpid/cpp/src/qpid/ha/QueueReplicator.cpp
    qpid/trunk/qpid/cpp/src/qpid/ha/types.cpp
    qpid/trunk/qpid/cpp/src/qpid/ha/types.h
    qpid/trunk/qpid/cpp/src/qpid/types/Variant.cpp
    qpid/trunk/qpid/cpp/src/tests/ha_test.py
    qpid/trunk/qpid/cpp/src/tests/ha_tests.py

Modified: qpid/trunk/qpid/cpp/include/qpid/framing/FieldValue.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/include/qpid/framing/FieldValue.h?rev=1409241&r1=1409240&r2=1409241&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/include/qpid/framing/FieldValue.h (original)
+++ qpid/trunk/qpid/cpp/include/qpid/framing/FieldValue.h Wed Nov 14 16:04:04 2012
@@ -175,11 +175,19 @@ class FixedWidthValue : public FieldValu
         return v;
     }
     uint8_t* rawOctets() { return octets; }
-    uint8_t* rawOctets() const { return octets; }
+    const uint8_t* rawOctets() const { return octets; }
 
     void print(std::ostream& o) const { o << "F" << width << ":"; };
 };
 
+class UuidData : public FixedWidthValue<16> {
+  public:
+    UuidData();
+    UuidData(const unsigned char* bytes);
+    bool convertsToString() const;
+    std::string getString() const;
+};
+
 template <class T, int W>
 inline T FieldValue::getIntegerValue() const
 {
@@ -356,7 +364,7 @@ class Var16Value : public FieldValue {
 class Var32Value : public FieldValue {
   public:
     QPID_COMMON_EXTERN Var32Value(const std::string& v, uint8_t code);
-};
+ };
 
 class Struct32Value : public FieldValue {
   public:
@@ -453,6 +461,7 @@ class ListValue : public FieldValue {
 
 class UuidValue : public FieldValue {
   public:
+    QPID_COMMON_EXTERN UuidValue();
     QPID_COMMON_EXTERN UuidValue(const unsigned char*);
 };
 

Modified: qpid/trunk/qpid/cpp/include/qpid/types/Variant.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/include/qpid/types/Variant.h?rev=1409241&r1=1409240&r2=1409241&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/include/qpid/types/Variant.h (original)
+++ qpid/trunk/qpid/cpp/include/qpid/types/Variant.h Wed Nov 14 16:04:04 2012
@@ -177,6 +177,7 @@ QPID_TYPES_EXTERN std::ostream& operator
 QPID_TYPES_EXTERN std::ostream& operator<<(std::ostream& out, const Variant::Map& map);
 QPID_TYPES_EXTERN std::ostream& operator<<(std::ostream& out, const Variant::List& list);
 QPID_TYPES_EXTERN bool operator==(const Variant& a, const Variant& b);
+QPID_TYPES_EXTERN bool operator!=(const Variant& a, const Variant& b);
 #endif
 }} // namespace qpid::types
 

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp?rev=1409241&r1=1409240&r2=1409241&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp Wed Nov 14 16:04:04 2012
@@ -1176,29 +1176,12 @@ std::pair<Exchange::shared_ptr, bool> Br
     }
 
     std::pair<Exchange::shared_ptr, bool> result;
-    result = exchanges.declare(name, type, durable, arguments);
+    result = exchanges.declare(
+        name, type, durable, arguments, alternate, connectionId, userId);
     if (result.second) {
-        if (alternate) {
-            result.first->setAlternate(alternate);
-            alternate->incAlternateUsers();
-        }
         if (durable) {
             store->create(*result.first, arguments);
         }
-        if (managementAgent.get()) {
-            //TODO: debatable whether we should raise an event here for
-            //create when this is a 'declare' event; ideally add a create
-            //event instead?
-            managementAgent->raiseEvent(_qmf::EventExchangeDeclare(connectionId,
-                                                         userId,
-                                                         name,
-                                                         type,
-                                                         alternateExchange,
-                                                         durable,
-                                                         false,
-                                                         ManagementAgent::toMap(arguments),
-                                                         "created"));
-        }
         QPID_LOG_CAT(debug, model, "Create exchange. name:" << name
             << " user:" << userId
             << " rhost:" << connectionId
@@ -1225,10 +1208,7 @@ void Broker::deleteExchange(const std::s
     if (exchange->inUseAsAlternate()) throw framing::NotAllowedException(QPID_MSG("Exchange in use as alternate-exchange."));
     if (exchange->isDurable()) store->destroy(*exchange);
     if (exchange->getAlternate()) exchange->getAlternate()->decAlternateUsers();
-    exchanges.destroy(name);
-
-    if (managementAgent.get())
-        managementAgent->raiseEvent(_qmf::EventExchangeDelete(connectionId, userId, name));
+    exchanges.destroy(name, connectionId,  userId);
     QPID_LOG_CAT(debug, model, "Delete exchange. name:" << name
         << " user:" << userId
         << " rhost:" << connectionId);

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/ConfigurationObserver.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/ConfigurationObserver.h?rev=1409241&r1=1409240&r2=1409241&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/ConfigurationObserver.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/ConfigurationObserver.h Wed Nov 14 16:04:04 2012
@@ -38,6 +38,10 @@ class Exchange;
 
 /**
  * Observer for changes to configuration (aka wiring)
+ *
+ * NOTE: create and destroy functions are called with
+ * the registry lock held. This is necessary to ensure
+ * they are called in the correct sequence.
  */
 class ConfigurationObserver
 {

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Exchange.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Exchange.cpp?rev=1409241&r1=1409240&r2=1409241&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Exchange.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Exchange.cpp Wed Nov 14 16:04:04 2012
@@ -408,5 +408,10 @@ bool Exchange::routeWithAlternate(Delive
     return msg.delivered;
 }
 
+void Exchange::setArgs(const framing::FieldTable& newArgs) {
+    args = newArgs;
+    if (mgmtExchange) mgmtExchange->set_arguments(ManagementAgent::toMap(args));
+}
+
 }}
 

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Exchange.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Exchange.h?rev=1409241&r1=1409240&r2=1409241&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Exchange.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Exchange.h Wed Nov 14 16:04:04 2012
@@ -173,8 +173,8 @@ public:
 
     const std::string& getName() const { return name; }
     bool isDurable() { return durable; }
-    qpid::framing::FieldTable& getArgs() { return args; }
     const qpid::framing::FieldTable& getArgs() const { return args; }
+    void setArgs(const framing::FieldTable&);
 
     QPID_BROKER_EXTERN Exchange::shared_ptr getAlternate() { return alternate; }
     QPID_BROKER_EXTERN void setAlternate(Exchange::shared_ptr _alternate);

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/ExchangeRegistry.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/ExchangeRegistry.cpp?rev=1409241&r1=1409240&r2=1409241&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/ExchangeRegistry.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/ExchangeRegistry.cpp Wed Nov 14 16:04:04 2012
@@ -29,20 +29,26 @@
 #include "qpid/management/ManagementDirectExchange.h"
 #include "qpid/management/ManagementTopicExchange.h"
 #include "qpid/framing/reply_exceptions.h"
+#include "qmf/org/apache/qpid/broker/EventExchangeDeclare.h"
+#include "qmf/org/apache/qpid/broker/EventExchangeDelete.h"
 
 using namespace qpid::broker;
 using namespace qpid::sys;
 using std::pair;
 using std::string;
 using qpid::framing::FieldTable;
+using qpid::management::ManagementAgent;
+namespace _qmf = qmf::org::apache::qpid::broker;
 
 pair<Exchange::shared_ptr, bool> ExchangeRegistry::declare(const string& name, const string& type){
 
     return declare(name, type, false, FieldTable());
 }
 
-pair<Exchange::shared_ptr, bool> ExchangeRegistry::declare(const string& name, const string& type,
-                                                           bool durable, const FieldTable& args){
+pair<Exchange::shared_ptr, bool> ExchangeRegistry::declare(
+    const string& name, const string& type, bool durable, const FieldTable& args,
+    Exchange::shared_ptr alternate, const string& connectionId, const string& userId)
+{
     Exchange::shared_ptr exchange;
     std::pair<Exchange::shared_ptr, bool> result;
     {
@@ -73,31 +79,58 @@ pair<Exchange::shared_ptr, bool> Exchang
             }
             exchanges[name] = exchange;
             result = std::pair<Exchange::shared_ptr, bool>(exchange, true);
+            if (alternate) {
+                exchange->setAlternate(alternate);
+                alternate->incAlternateUsers();
+            }
+            // Call exchangeCreate inside the lock to ensure correct ordering.
+            if (broker) broker->getConfigurationObservers().exchangeCreate(exchange);
         } else {
             result = std::pair<Exchange::shared_ptr, bool>(i->second, false);
         }
+        if (broker && broker->getManagementAgent()) {
+            // Call raiseEvent inside the lock to ensure correct ordering.
+            broker->getManagementAgent()->raiseEvent(
+                _qmf::EventExchangeDeclare(
+                    connectionId,
+                    userId,
+                    name,
+                    type,
+                    alternate ? alternate->getName() : string(),
+                    durable,
+                    false,
+                    ManagementAgent::toMap(result.first->getArgs()),
+                    "created"));
+        }
     }
-    if (broker && exchange) broker->getConfigurationObservers().exchangeCreate(exchange);
     return result;
 }
 
-void ExchangeRegistry::destroy(const string& name){
+void ExchangeRegistry::destroy(
+    const string& name, const string& connectionId, const string& userId)
+{
     if (name.empty() ||
         (name.find("amq.") == 0 &&
          (name == "amq.direct" || name == "amq.fanout" || name == "amq.topic" || name == "amq.match")) ||
         name == "qpid.management")
         throw framing::NotAllowedException(QPID_MSG("Cannot delete default exchange: '" << name << "'"));
-    Exchange::shared_ptr exchange;
     {
         RWlock::ScopedWlock locker(lock);
         ExchangeMap::iterator i =  exchanges.find(name);
         if (i != exchanges.end()) {
-            exchange = i->second;
+            if (broker) {
+                // Call exchangeDestroy and raiseEvent inside the lock to ensure
+                // correct ordering.
+                broker->getConfigurationObservers().exchangeDestroy(i->second);
+                if (broker->getManagementAgent())
+                    broker->getManagementAgent()->raiseEvent(
+                        _qmf::EventExchangeDelete(connectionId, userId, name));
+            }
             i->second->destroy();
             exchanges.erase(i);
+
         }
     }
-    if (broker && exchange) broker->getConfigurationObservers().exchangeDestroy(exchange);
 }
 
 Exchange::shared_ptr ExchangeRegistry::find(const string& name){

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/ExchangeRegistry.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/ExchangeRegistry.h?rev=1409241&r1=1409240&r2=1409241&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/ExchangeRegistry.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/ExchangeRegistry.h Wed Nov 14 16:04:04 2012
@@ -46,14 +46,23 @@ class ExchangeRegistry{
                              bool, const qpid::framing::FieldTable&, qpid::management::Manageable*, qpid::broker::Broker*> FactoryFunction;
 
     ExchangeRegistry (Broker* b = 0) : parent(0), broker(b) {}
-    QPID_BROKER_EXTERN std::pair<Exchange::shared_ptr, bool> declare
-      (const std::string& name, const std::string& type);
-    QPID_BROKER_EXTERN std::pair<Exchange::shared_ptr, bool> declare
-      (const std::string& name,
-       const std::string& type, 
-       bool durable,
-       const qpid::framing::FieldTable& args = framing::FieldTable());
-    QPID_BROKER_EXTERN void destroy(const std::string& name);
+    QPID_BROKER_EXTERN std::pair<Exchange::shared_ptr, bool> declare(
+        const std::string& name, const std::string& type);
+
+    QPID_BROKER_EXTERN std::pair<Exchange::shared_ptr, bool> declare(
+        const std::string& name,
+        const std::string& type,
+        bool durable,
+        const qpid::framing::FieldTable& args = framing::FieldTable(),
+        Exchange::shared_ptr alternate = Exchange::shared_ptr(),
+        const std::string& connectionId = std::string(),
+        const std::string& userId = std::string());
+
+    QPID_BROKER_EXTERN void destroy(
+        const std::string& name,
+        const std::string& connectionId = std::string(),
+        const std::string& userId = std::string());
+
     QPID_BROKER_EXTERN Exchange::shared_ptr getDefault();
 
     /**

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp?rev=1409241&r1=1409240&r2=1409241&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp Wed Nov 14 16:04:04 2012
@@ -1169,14 +1169,10 @@ void tryAutoDeleteImpl(Broker& broker, Q
 {
     if (broker.getQueues().destroyIf(queue->getName(),
                                      boost::bind(boost::mem_fn(&Queue::canAutoDelete), queue))) {
-        QPID_LOG(debug, "Auto-deleting " << queue->getName());
-        queue->destroyed();
-
-        if (broker.getManagementAgent())
-            broker.getManagementAgent()->raiseEvent(_qmf::EventQueueDelete(connectionId, userId, queue->getName()));
-        QPID_LOG_CAT(debug, model, "Delete queue. name:" << queue->getName()
+        QPID_LOG_CAT(debug, model, "Auto-delete queue: " << queue->getName()
             << " user:" << userId
             << " rhost:" << connectionId );
+        queue->destroyed();
     }
 }
 
@@ -1598,5 +1594,10 @@ void Queue::UsageBarrier::destroy()
     while (count) usageLock.wait();
 }
 
+void Queue::addArgument(const string& key, const types::Variant& value) {
+    settings.original.insert(types::Variant::Map::value_type(key, value));
+    if (mgmtObject != 0) mgmtObject->set_arguments(settings.asMap());
+}
+
 }}
 

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h?rev=1409241&r1=1409240&r2=1409241&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h Wed Nov 14 16:04:04 2012
@@ -145,7 +145,7 @@ class Queue : public boost::enable_share
     mutable qpid::sys::Mutex messageLock;
     mutable qpid::sys::Mutex ownershipLock;
     mutable uint64_t persistenceId;
-    const QueueSettings settings;
+    QueueSettings settings;
     qpid::framing::FieldTable encodableSettings;
     QueueDepth current;
     QueueBindings bindings;
@@ -423,6 +423,10 @@ class Queue : public boost::enable_share
 
     uint32_t getDequeueSincePurge() { return dequeueSincePurge.get(); }
     QPID_BROKER_EXTERN void setDequeueSincePurge(uint32_t value);
+
+    /** Add an argument to be included in management messages about this queue. */
+    void addArgument(const std::string& key, const types::Variant& value);
+
   friend class QueueFactory;
 };
 }

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/QueueRegistry.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/QueueRegistry.cpp?rev=1409241&r1=1409240&r2=1409241&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/QueueRegistry.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/QueueRegistry.cpp Wed Nov 14 16:04:04 2012
@@ -69,23 +69,25 @@ QueueRegistry::declare(const string& nam
                 queue->create();
             }
             queues[name] = queue;
+            // NOTE: raiseEvent and queueCreate must be called with the lock held in
+            // order to ensure events are generated in the correct order.
+            // Call queueCreate before raiseEvents so it can add arguments that
+            // will be included in the management event.
+            if (getBroker()) getBroker()->getConfigurationObservers().queueCreate(queue);
             result = std::pair<Queue::shared_ptr, bool>(queue, true);
         } else {
             result = std::pair<Queue::shared_ptr, bool>(i->second, false);
         }
-        // NOTE: raiseEvent must be called with the lock held in order to
-        // ensure management events are generated in the correct order.
-        if (getBroker() && getBroker()->getManagementAgent() && connectionId.size() && userId.size()) {
+            if (getBroker() && getBroker()->getManagementAgent()) {
             getBroker()->getManagementAgent()->raiseEvent(
                 _qmf::EventQueueDeclare(
                     connectionId, userId, name,
                     settings.durable, owner, settings.autodelete,
                     alternate ? alternate->getName() : string(),
-                    settings.asMap(),
+                    result.first->getSettings().asMap(),
                     result.second ? "created" : "existing"));
         }
     }
-    if (getBroker() && result.second) getBroker()->getConfigurationObservers().queueCreate(result.first);
     return result;
 }
 
@@ -99,17 +101,17 @@ void QueueRegistry::destroy(
         if (i != queues.end()) {
             q = i->second;
             queues.erase(i);
-            if (getBroker() && getBroker()->getManagementAgent() &&
-                connectionId.size() && userId.size())
-            {
-                // NOTE: raiseEvent must be called with the lock held in order to
-                // ensure management events are generated in the correct order.
-                getBroker()->getManagementAgent()->raiseEvent(
-                    _qmf::EventQueueDelete(connectionId, userId, name));
+            if (getBroker()) {
+                // NOTE: queueDestroy and raiseEvent must be called with the
+                // lock held in order to ensure events are generated
+                // in the correct order.
+                getBroker()->getConfigurationObservers().queueDestroy(q);
+                if (getBroker()->getManagementAgent())
+                    getBroker()->getManagementAgent()->raiseEvent(
+                        _qmf::EventQueueDelete(connectionId, userId, name));
             }
         }
     }
-    if (getBroker() && q) getBroker()->getConfigurationObservers().queueDestroy(q);
 }
 
 Queue::shared_ptr QueueRegistry::find(const string& name){

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/SessionAdapter.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/SessionAdapter.cpp?rev=1409241&r1=1409240&r2=1409241&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/SessionAdapter.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/SessionAdapter.cpp Wed Nov 14 16:04:04 2012
@@ -98,17 +98,6 @@ void SessionAdapter::ExchangeHandlerImpl
                 //exchange already there, not created
                 checkType(response.first, type);
                 checkAlternate(response.first, alternate);
-                ManagementAgent* agent = getBroker().getManagementAgent();
-                if (agent)
-                    agent->raiseEvent(_qmf::EventExchangeDeclare(getConnection().getUrl(),
-                                                                 getConnection().getUserId(),
-                                                                 exchange,
-                                                                 type,
-                                                                 alternateExchange,
-                                                                 durable,
-                                                                 false,
-                                                                 ManagementAgent::toMap(args),
-                                                                 "existing"));
                 QPID_LOG_CAT(debug, model, "Create exchange. name:" << exchange
                     << " user:" << getConnection().getUserId()
                     << " rhost:" << getConnection().getUrl()

Modified: qpid/trunk/qpid/cpp/src/qpid/framing/FieldValue.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/framing/FieldValue.cpp?rev=1409241&r1=1409240&r2=1409241&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/framing/FieldValue.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/framing/FieldValue.cpp Wed Nov 14 16:04:04 2012
@@ -23,6 +23,7 @@
 #include "qpid/framing/Buffer.h"
 #include "qpid/framing/Endian.h"
 #include "qpid/framing/List.h"
+#include "qpid/framing/Uuid.h"
 #include "qpid/framing/reply_exceptions.h"
 #include "qpid/Msg.h"
 
@@ -43,7 +44,9 @@ void FieldValue::setType(uint8_t type)
         data.reset(new EncodedValue<List>());
     } else if (typeOctet == 0xAA) {
         data.reset(new EncodedValue<Array>());
-    } else {    
+    } else if (typeOctet == 0x48) {
+        data.reset(new UuidData());
+    } else {
         uint8_t lenType = typeOctet >> 4;
         switch(lenType){
           case 0:
@@ -213,9 +216,12 @@ Integer8Value::Integer8Value(int8_t v) :
 Integer16Value::Integer16Value(int16_t v) :
     FieldValue(0x11, new FixedWidthValue<2>(v))
 {}
-UuidValue::UuidValue(const unsigned char* v) :
-    FieldValue(0x48, new FixedWidthValue<16>(v))
-{}
+
+UuidData::UuidData() {}
+UuidData::UuidData(const unsigned char* bytes) : FixedWidthValue<16>(bytes) {}
+bool UuidData::convertsToString() const { return true; }
+std::string UuidData::getString() const { return Uuid(rawOctets()).str(); }
+UuidValue::UuidValue(const unsigned char* v) : FieldValue(0x48, new UuidData(v)) {}
 
 void FieldValue::print(std::ostream& out) const {
     data->print(out);

Modified: qpid/trunk/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp?rev=1409241&r1=1409240&r2=1409241&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp Wed Nov 14 16:04:04 2012
@@ -29,6 +29,7 @@
 #include "qpid/broker/Link.h"
 #include "qpid/broker/amqp_0_10/MessageTransfer.h"
 #include "qpid/framing/FieldTable.h"
+#include "qpid/framing/FieldValue.h"
 #include "qpid/log/Statement.h"
 #include "qpid/amqp_0_10/Codecs.h"
 #include "qpid/broker/SessionHandler.h"
@@ -280,7 +281,9 @@ BrokerReplicator::BrokerReplicator(HaBro
 {
     broker.getConnectionObservers().add(
         boost::shared_ptr<broker::ConnectionObserver>(new ConnectionObserver(*this)));
-    getArgs().setString(QPID_REPLICATE, printable(NONE).str());
+    framing::FieldTable args = getArgs();
+    args.setString(QPID_REPLICATE, printable(NONE).str());
+    setArgs(args);
 
     dispatch[EventQueueDeclare::getFullName()] = &BrokerReplicator::doEventQueueDeclare;
     dispatch[EventQueueDelete::getFullName()] = &BrokerReplicator::doEventQueueDelete;
@@ -458,7 +461,8 @@ void BrokerReplicator::doEventQueueDecla
         // If we already have a queue with this name, replace it.
         // The queue was definitely created on the primary.
         if (queues.find(name)) {
-            QPID_LOG(warning, logPrefix << "Replacing exsiting queue: " << name);
+            QPID_LOG(warning, logPrefix << "Declare event, replacing exsiting queue: "
+                     << name);
             deleteQueue(name);
         }
         replicateQueue(name, values[DURABLE].asBool(), autoDel, args,
@@ -499,7 +503,8 @@ void BrokerReplicator::doEventExchangeDe
         // The exchange was definitely created on the primary.
         if (exchanges.find(name)) {
             deleteExchange(name);
-            QPID_LOG(warning, logPrefix << "Replaced existing exchange: " << name);
+            QPID_LOG(warning, logPrefix << "Declare event, replacing existing exchange: "
+                     << name);
         }
         CreateExchangeResult result = createExchange(
             name, values[EXTYPE].asString(), values[DURABLE].asBool(), args,
@@ -591,8 +596,15 @@ string getAltExchange(const types::Varia
     }
     else return string();
 }
+
+Variant getHaUuid(const Variant::Map& map) {
+    Variant::Map::const_iterator i = map.find(QPID_HA_UUID);
+    return i == map.end() ? Variant() : i->second;
 }
 
+} // namespace
+
+
 void BrokerReplicator::doResponseQueue(Variant::Map& values) {
     Variant::Map argsMap(asMapVoid(values[ARGUMENTS]));
     if (!replicationTest.isReplicated(
@@ -606,6 +618,14 @@ void BrokerReplicator::doResponseQueue(V
         throw Exception(QPID_MSG("Unexpected queue response: " << values));
     if (!queueTracker->response(name)) return; // Response is out-of-date
     QPID_LOG(debug, logPrefix << "Queue response: " << name);
+    // If we see a queue with the same name as one we have, but not the same UUID,
+    // then replace the one we have.
+    boost::shared_ptr<Queue> queue = queues.find(name);
+    if (queue && getHaUuid(queue->getSettings().original) != getHaUuid(argsMap)) {
+        QPID_LOG(warning, logPrefix << "UUID mismatch, replacing queue: "
+                 << name);
+        deleteQueue(name);
+    }
     framing::FieldTable args;
     qpid::amqp_0_10::translate(argsMap, args);
     boost::shared_ptr<QueueReplicator> qr = replicateQueue(
@@ -629,6 +649,16 @@ void BrokerReplicator::doResponseExchang
     QPID_LOG(debug, logPrefix << "Exchange response: " << name);
     framing::FieldTable args;
     qpid::amqp_0_10::translate(argsMap, args);
+    // If we see an exchange with the same name as one we have, but not the same UUID,
+    // then replace the one we have.
+    boost::shared_ptr<Exchange> exchange = exchanges.find(name);
+    if (exchange &&
+        exchange->getArgs().getAsString(QPID_HA_UUID) != args.getAsString(QPID_HA_UUID))
+    {
+        QPID_LOG(warning, logPrefix << "UUID mismatch, replacing exchange: "
+                 << name);
+        deleteExchange(name);
+    }
     CreateExchangeResult result = createExchange(
         name, values[TYPE].asString(), values[DURABLE].asBool(), args,
         getAltExchange(values[ALTEXCHANGE]));

Modified: qpid/trunk/qpid/cpp/src/qpid/ha/Primary.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/Primary.cpp?rev=1409241&r1=1409240&r2=1409241&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/Primary.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/Primary.cpp Wed Nov 14 16:04:04 2012
@@ -31,6 +31,8 @@
 #include "qpid/broker/Connection.h"
 #include "qpid/broker/Queue.h"
 #include "qpid/framing/FieldTable.h"
+#include "qpid/framing/FieldValue.h"
+#include "qpid/framing/Uuid.h"
 #include "qpid/log/Statement.h"
 #include "qpid/sys/Timer.h"
 #include <boost/bind.hpp>
@@ -39,6 +41,8 @@ namespace qpid {
 namespace ha {
 
 using sys::Mutex;
+using namespace std;
+using namespace framing;
 
 namespace {
 
@@ -58,6 +62,8 @@ class PrimaryConfigurationObserver : pub
     PrimaryConfigurationObserver(Primary& p) : primary(p) {}
     void queueCreate(const Primary::QueuePtr& q) { primary.queueCreate(q); }
     void queueDestroy(const Primary::QueuePtr& q) { primary.queueDestroy(q); }
+    void exchangeCreate(const Primary::ExchangePtr& q) { primary.exchangeCreate(q); }
+    void exchangeDestroy(const Primary::ExchangePtr& q) { primary.exchangeDestroy(q); }
   private:
     Primary& primary;
 };
@@ -178,9 +184,12 @@ void Primary::readyReplica(const Replica
     }
 }
 
+// NOTE: Called with queue registry lock held.
 void Primary::queueCreate(const QueuePtr& q) {
-    // Throw if there is an invalid replication level in the queue settings.
-    haBroker.getReplicationTest().replicateLevel(q->getSettings().storeSettings);
+    if (haBroker.getReplicationTest().isReplicated(CONFIGURATION, *q)) {
+        // Give each queue a unique id to avoid confusion of same-named queues.
+        q->addArgument(QPID_HA_UUID, types::Variant(Uuid(true)));
+    }
     Mutex::ScopedLock l(lock);
     for (BackupMap::iterator i = backups.begin(); i != backups.end(); ++i) {
         i->second->queueCreate(q);
@@ -188,6 +197,7 @@ void Primary::queueCreate(const QueuePtr
     }
 }
 
+// NOTE: Called with queue registry lock held.
 void Primary::queueDestroy(const QueuePtr& q) {
     Mutex::ScopedLock l(lock);
     for (BackupMap::iterator i = backups.begin(); i != backups.end(); ++i)
@@ -195,6 +205,21 @@ void Primary::queueDestroy(const QueuePt
     checkReady(l);
 }
 
+// NOTE: Called with exchange registry lock held.
+void Primary::exchangeCreate(const ExchangePtr& ex) {
+    if (haBroker.getReplicationTest().isReplicated(CONFIGURATION, *ex)) {
+        // Give each exchange a unique id to avoid confusion of same-named exchanges.
+        FieldTable args = ex->getArgs();
+        args.set(QPID_HA_UUID, FieldTable::ValuePtr(new UuidValue(&Uuid(true)[0])));
+        ex->setArgs(args);
+    }
+}
+
+// NOTE: Called with exchange registry lock held.
+void Primary::exchangeDestroy(const ExchangePtr&) {
+    // Do nothing
+ }
+
 void Primary::opened(broker::Connection& connection) {
     BrokerInfo info;
     if (ha::ConnectionObserver::getBrokerInfo(connection, info)) {

Modified: qpid/trunk/qpid/cpp/src/qpid/ha/Primary.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/Primary.h?rev=1409241&r1=1409240&r2=1409241&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/Primary.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/Primary.h Wed Nov 14 16:04:04 2012
@@ -60,6 +60,7 @@ class Primary
 {
   public:
     typedef boost::shared_ptr<broker::Queue> QueuePtr;
+    typedef boost::shared_ptr<broker::Exchange> ExchangePtr;
 
     static Primary* get() { return instance; }
 
@@ -72,6 +73,8 @@ class Primary
     // Called via ConfigurationObserver
     void queueCreate(const QueuePtr&);
     void queueDestroy(const QueuePtr&);
+    void exchangeCreate(const ExchangePtr&);
+    void exchangeDestroy(const ExchangePtr&);
 
     // Called via ConnectionObserver
     void opened(broker::Connection& connection);

Modified: qpid/trunk/qpid/cpp/src/qpid/ha/QueueReplicator.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/QueueReplicator.cpp?rev=1409241&r1=1409240&r2=1409241&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/QueueReplicator.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/QueueReplicator.cpp Wed Nov 14 16:04:04 2012
@@ -49,9 +49,8 @@ using namespace framing;
 using namespace std;
 using sys::Mutex;
 
-const std::string QPID_HA_EVENT_PREFIX("qpid.ha-");
-const std::string QueueReplicator::DEQUEUE_EVENT_KEY(QPID_HA_EVENT_PREFIX+"dequeue");
-const std::string QueueReplicator::POSITION_EVENT_KEY(QPID_HA_EVENT_PREFIX+"position");
+const std::string QueueReplicator::DEQUEUE_EVENT_KEY(QPID_HA_PREFIX+"dequeue");
+const std::string QueueReplicator::POSITION_EVENT_KEY(QPID_HA_PREFIX+"position");
 const std::string QueueReplicator::QPID_SYNC_FREQUENCY("qpid.sync_frequency");
 
 std::string QueueReplicator::replicatorName(const std::string& queueName) {
@@ -63,7 +62,7 @@ bool QueueReplicator::isReplicatorName(c
 }
 
 bool QueueReplicator::isEventKey(const std::string key) {
-    const std::string& prefix = QPID_HA_EVENT_PREFIX;
+    const std::string& prefix = QPID_HA_PREFIX;
     bool ret = key.size() > prefix.size() && key.compare(0, prefix.size(), prefix) == 0;
     return ret;
 }
@@ -114,7 +113,9 @@ QueueReplicator::QueueReplicator(HaBroke
     args.setString(QPID_REPLICATE, printable(NONE).str());
     Uuid uuid(true);
     bridgeName = replicatorName(q->getName()) + std::string(".") + uuid.str();
-    getArgs().setString(QPID_REPLICATE, printable(NONE).str());
+    framing::FieldTable args = getArgs();
+    args.setString(QPID_REPLICATE, printable(NONE).str());
+    setArgs(args);
 }
 
 // This must be separate from the constructor so we can call shared_from_this.

Modified: qpid/trunk/qpid/cpp/src/qpid/ha/types.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/types.cpp?rev=1409241&r1=1409240&r2=1409241&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/types.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/types.cpp Wed Nov 14 16:04:04 2012
@@ -33,6 +33,8 @@ namespace ha {
 using namespace std;
 
 const string QPID_REPLICATE("qpid.replicate");
+const string QPID_HA_PREFIX("qpid.ha-");
+const string QPID_HA_UUID(QPID_HA_PREFIX+"uuid");
 
 string EnumBase::str() const {
     assert(value < count);

Modified: qpid/trunk/qpid/cpp/src/qpid/ha/types.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/types.h?rev=1409241&r1=1409240&r2=1409241&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/types.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/types.h Wed Nov 14 16:04:04 2012
@@ -99,6 +99,8 @@ inline bool isBackup(BrokerStatus s) { r
 
 // String constants.
 extern const std::string QPID_REPLICATE;
+extern const std::string QPID_HA_PREFIX;
+extern const std::string QPID_HA_UUID;
 
 /** Define IdSet type, not a typedef so we can overload operator << */
 class IdSet : public std::set<types::Uuid> {};

Modified: qpid/trunk/qpid/cpp/src/qpid/types/Variant.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/types/Variant.cpp?rev=1409241&r1=1409240&r2=1409241&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/types/Variant.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/types/Variant.cpp Wed Nov 14 16:04:04 2012
@@ -650,7 +650,7 @@ VariantImpl* VariantImpl::create(const V
     }
 }
 
-Variant::Variant() : impl(0) {}
+Variant::Variant() : impl(new VariantImpl()) {}
 Variant::Variant(bool b) : impl(new VariantImpl(b)) {}
 Variant::Variant(uint8_t i) : impl(new VariantImpl(i)) {}
 Variant::Variant(uint16_t i) : impl(new VariantImpl(i)) {}
@@ -893,6 +893,8 @@ bool operator==(const Variant& a, const 
     return a.isEqualTo(b);
 }
 
+bool operator!=(const Variant& a, const Variant& b) { return !(a == b); }
+
 bool Variant::isEqualTo(const Variant& other) const
 {
     return impl && impl->isEqualTo(*other.impl);

Modified: qpid/trunk/qpid/cpp/src/tests/ha_test.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/ha_test.py?rev=1409241&r1=1409240&r2=1409241&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/ha_test.py (original)
+++ qpid/trunk/qpid/cpp/src/tests/ha_test.py Wed Nov 14 16:04:04 2012
@@ -142,7 +142,9 @@ class HaBroker(Broker):
     # FIXME aconway 2012-05-01: do direct python call to qpid-config code.
     def qpid_config(self, args):
         assert subprocess.call(
-            [self.qpid_config_path, "--broker", self.host_port()]+args) == 0
+            [self.qpid_config_path, "--broker", self.host_port()]+args,
+            stdout=1, stderr=subprocess.STDOUT
+            ) == 0
 
     def config_replicate(self, from_broker, queue):
         self.qpid_config(["add", "queue", "--start-replica", from_broker, queue])
@@ -160,12 +162,14 @@ class HaBroker(Broker):
         else:
             return Broker.connect(self, client_properties={"qpid.ha-admin":1}, **kwargs)
 
-    def wait_backup(self, address):
-        """Wait for address to become valid on a backup broker."""
+    def wait_address(self, address):
+        """Wait for address to become valid on the broker."""
         bs = self.connect_admin().session()
         try: wait_address(bs, address)
         finally: bs.connection.close()
 
+    def wait_backup(self, address): self.wait_address(address)
+
     def assert_browse(self, queue, expected, **kwargs):
         """Verify queue contents by browsing."""
         bs = self.connect().session()

Modified: qpid/trunk/qpid/cpp/src/tests/ha_tests.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/ha_tests.py?rev=1409241&r1=1409240&r2=1409241&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/ha_tests.py (original)
+++ qpid/trunk/qpid/cpp/src/tests/ha_tests.py Wed Nov 14 16:04:04 2012
@@ -270,6 +270,7 @@ class ReplicationTests(HaBrokerTest):
     def test_qpid_config_replication(self):
         """Set up replication via qpid-config"""
         brokers = HaCluster(self,2)
+        brokers[0].wait_status("active")
         brokers[0].config_declare("q","all")
         brokers[0].connect().session().sender("q").send("foo")
         brokers[1].assert_browse_backup("q", ["foo"])
@@ -830,6 +831,41 @@ acl deny all all
             verify_qmf_events("q2")
         finally: l.restore()
 
+    def test_missed_recreate(self):
+        """If a queue or exchange is destroyed and one with the same name re-created
+        while a backup is disconnected, the backup should also delete/recreate
+        the object when it re-connects"""
+        cluster = HaCluster(self, 3)
+        sn = cluster[0].connect().session()
+        # Create a queue with messages
+        s = sn.sender("qq;{create:always}")
+        msgs = [str(i) for i in xrange(3)]
+        for m in msgs: s.send(m)
+        cluster[1].assert_browse_backup("qq", msgs)
+        cluster[2].assert_browse_backup("qq", msgs)
+        # Set up an exchange with a binding.
+        sn.sender("xx;{create:always,node:{type:topic}}")
+        sn.sender("xxq;{create:always,node:{x-bindings:[{exchange:'xx',queue:'xxq',key:xxq}]}}")
+        cluster[1].wait_address("xx")
+        self.assertEqual(cluster[1].agent().getExchange("xx").values["bindingCount"], 1)
+        cluster[2].wait_address("xx")
+        self.assertEqual(cluster[2].agent().getExchange("xx").values["bindingCount"], 1)
+
+        # Simulate the race by re-creating the objects before promoting the new primary
+        cluster.kill(0, False)
+        sn = cluster[1].connect_admin().session()
+        sn.sender("qq;{delete:always}").close()
+        s = sn.sender("qq;{create:always}")
+        s.send("foo")
+        sn.sender("xx;{delete:always}").close()
+        sn.sender("xx;{create:always,node:{type:topic}}")
+        cluster[1].promote()
+        cluster[1].wait_status("active")
+        # Verify we are not still using the old objects on cluster[2]
+        cluster[2].assert_browse_backup("qq", ["foo"])
+        cluster[2].wait_address("xx")
+        self.assertEqual(cluster[2].agent().getExchange("xx").values["bindingCount"], 0)
+
 def fairshare(msgs, limit, levels):
     """
     Generator to return prioritised messages in expected order for a given fairshare limit



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org