You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kp...@apache.org on 2013/09/20 20:59:50 UTC

svn commit: r1525101 [6/21] - in /qpid/branches/linearstore/qpid: ./ bin/ cpp/ cpp/bindings/ cpp/bindings/qmf/ cpp/bindings/qmf/python/ cpp/bindings/qmf/ruby/ cpp/bindings/qmf/tests/ cpp/bindings/qmf2/ cpp/bindings/qmf2/examples/cpp/ cpp/bindings/qmf2/...

Modified: qpid/branches/linearstore/qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.h
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.h?rev=1525101&r1=1525100&r2=1525101&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.h (original)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.h Fri Sep 20 18:59:30 2013
@@ -53,6 +53,10 @@ class ConnectionImpl : public qpid::mess
     void setOption(const std::string& name, const qpid::types::Variant& value);
     bool backoff();
     std::string getAuthenticatedUsername();
+    void reconnect(const std::string& url);
+    void reconnect();
+    std::string getUrl() const;
+    bool getAutoDecode() const;
   private:
     typedef std::map<std::string, qpid::messaging::Session> Sessions;
 
@@ -63,13 +67,14 @@ class ConnectionImpl : public qpid::mess
     bool replaceUrls;     // Replace rather than merging with reconnect-urls
     std::vector<std::string> urls;
     qpid::client::ConnectionSettings settings;
-    bool reconnect;
+    bool autoReconnect;
     double timeout;
     int32_t limit;
     double minReconnectInterval;
     double maxReconnectInterval;
     int32_t retries;
     bool reconnectOnLimitExceeded;
+    bool disableAutoDecode;
 
     void setOptions(const qpid::types::Variant::Map& options);
     void connect(const qpid::sys::AbsTime& started);

Modified: qpid/branches/linearstore/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp?rev=1525101&r1=1525100&r2=1525101&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp (original)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp Fri Sep 20 18:59:30 2013
@@ -399,7 +399,7 @@ void populate(qpid::messaging::Message& 
     //need to be able to link the message back to the transfer it was delivered by
     //e.g. for rejecting.
     MessageImplAccess::get(message).setInternalId(command.getId());
-        
+
     message.setContent(command.getContent());
 
     populateHeaders(message, command.getHeaders());

Modified: qpid/branches/linearstore/qpid/cpp/src/qpid/client/amqp0_10/OutgoingMessage.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/client/amqp0_10/OutgoingMessage.cpp?rev=1525101&r1=1525100&r2=1525101&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/client/amqp0_10/OutgoingMessage.cpp (original)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/client/amqp0_10/OutgoingMessage.cpp Fri Sep 20 18:59:30 2013
@@ -21,6 +21,7 @@
 #include "qpid/client/amqp0_10/OutgoingMessage.h"
 #include "qpid/client/amqp0_10/AddressResolution.h"
 #include "qpid/amqp_0_10/Codecs.h"
+#include "qpid/types/encodings.h"
 #include "qpid/types/Variant.h"
 #include "qpid/messaging/Address.h"
 #include "qpid/messaging/Message.h"
@@ -45,13 +46,30 @@ const std::string SUBJECT("qpid.subject"
 const std::string X_APP_ID("x-amqp-0-10.app-id");
 const std::string X_ROUTING_KEY("x-amqp-0-10.routing-key");
 const std::string X_CONTENT_ENCODING("x-amqp-0-10.content-encoding");
+const std::string TEXT_PLAIN("text/plain");
 }
 
 void OutgoingMessage::convert(const qpid::messaging::Message& from)
 {
     //TODO: need to avoid copying as much as possible
-    message.setData(from.getContent());
-    message.getMessageProperties().setContentType(from.getContentType());
+    if (from.getContentObject().getType() == qpid::types::VAR_MAP) {
+        std::string content;
+        qpid::amqp_0_10::MapCodec::encode(from.getContentObject().asMap(), content);
+        message.getMessageProperties().setContentType(qpid::amqp_0_10::MapCodec::contentType);
+        message.setData(content);
+    } else if (from.getContentObject().getType() == qpid::types::VAR_LIST) {
+        std::string content;
+        qpid::amqp_0_10::ListCodec::encode(from.getContentObject().asList(), content);
+        message.getMessageProperties().setContentType(qpid::amqp_0_10::ListCodec::contentType);
+        message.setData(content);
+    } else if (from.getContentObject().getType() == qpid::types::VAR_STRING &&
+               (from.getContentObject().getEncoding() == qpid::types::encodings::UTF8 || from.getContentObject().getEncoding() == qpid::types::encodings::ASCII)) {
+        message.getMessageProperties().setContentType(TEXT_PLAIN);
+        message.setData(from.getContent());
+    } else {
+        message.setData(from.getContent());
+        message.getMessageProperties().setContentType(from.getContentType());
+    }
     if ( !from.getCorrelationId().empty() )
         message.getMessageProperties().setCorrelationId(from.getCorrelationId());
     message.getMessageProperties().setUserId(from.getUserId());

Modified: qpid/branches/linearstore/qpid/cpp/src/qpid/client/amqp0_10/ReceiverImpl.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/client/amqp0_10/ReceiverImpl.cpp?rev=1525101&r1=1525100&r2=1525101&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/client/amqp0_10/ReceiverImpl.cpp (original)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/client/amqp0_10/ReceiverImpl.cpp Fri Sep 20 18:59:30 2013
@@ -25,6 +25,8 @@
 #include "qpid/messaging/exceptions.h"
 #include "qpid/messaging/Receiver.h"
 #include "qpid/messaging/Session.h"
+#include "qpid/amqp_0_10/Codecs.h"
+#include "qpid/types/encodings.h"
 
 namespace qpid {
 namespace client {
@@ -83,6 +85,7 @@ void ReceiverImpl::start()
     if (state == STOPPED) {
         state = STARTED;
         startFlow(l);
+        session.sendCompletion();
     }
 }
 
@@ -148,18 +151,42 @@ qpid::messaging::Address ReceiverImpl::g
 }
 
 ReceiverImpl::ReceiverImpl(SessionImpl& p, const std::string& name,
-                           const qpid::messaging::Address& a) :
+                           const qpid::messaging::Address& a, bool autoDecode_) :
 
-    parent(&p), destination(name), address(a), byteCredit(0xFFFFFFFF),
+    parent(&p), destination(name), address(a), byteCredit(0xFFFFFFFF), autoDecode(autoDecode_),
     state(UNRESOLVED), capacity(0), window(0) {}
 
+namespace {
+const std::string TEXT_PLAIN("text/plain");
+}
+
 bool ReceiverImpl::getImpl(qpid::messaging::Message& message, qpid::messaging::Duration timeout)
 {
     {
         sys::Mutex::ScopedLock l(lock);
         if (state == CANCELLED) return false;
     }
-    return parent->get(*this, message, timeout);
+    if (parent->get(*this, message, timeout)) {
+        if (autoDecode) {
+            if (message.getContentType() == qpid::amqp_0_10::MapCodec::contentType) {
+                message.getContentObject() = qpid::types::Variant::Map();
+                decode(message, message.getContentObject().asMap());
+            } else if (message.getContentType() == qpid::amqp_0_10::ListCodec::contentType) {
+                message.getContentObject() = qpid::types::Variant::List();
+                decode(message, message.getContentObject().asList());
+            } else if (!message.getContentBytes().empty()) {
+                message.getContentObject() = message.getContentBytes();
+                if (message.getContentType() == TEXT_PLAIN) {
+                    message.getContentObject().setEncoding(qpid::types::encodings::UTF8);
+                } else {
+                    message.getContentObject().setEncoding(qpid::types::encodings::BINARY);
+                }
+            }
+        }
+        return true;
+    } else {
+        return false;
+    }
 }
 
 bool ReceiverImpl::fetchImpl(qpid::messaging::Message& message, qpid::messaging::Duration timeout)

Modified: qpid/branches/linearstore/qpid/cpp/src/qpid/client/amqp0_10/ReceiverImpl.h
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/client/amqp0_10/ReceiverImpl.h?rev=1525101&r1=1525100&r2=1525101&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/client/amqp0_10/ReceiverImpl.h (original)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/client/amqp0_10/ReceiverImpl.h Fri Sep 20 18:59:30 2013
@@ -48,7 +48,7 @@ class ReceiverImpl : public qpid::messag
     enum State {UNRESOLVED, STOPPED, STARTED, CANCELLED};
 
     ReceiverImpl(SessionImpl& parent, const std::string& name,
-                 const qpid::messaging::Address& address);
+                 const qpid::messaging::Address& address, bool autoDecode);
 
     void init(qpid::client::AsyncSession session, AddressResolution& resolver);
     bool get(qpid::messaging::Message& message, qpid::messaging::Duration timeout);
@@ -74,6 +74,7 @@ class ReceiverImpl : public qpid::messag
     const std::string destination;
     const qpid::messaging::Address address;
     const uint32_t byteCredit;
+    const bool autoDecode;
     State state;
 
     std::auto_ptr<MessageSource> source;

Modified: qpid/branches/linearstore/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp?rev=1525101&r1=1525100&r2=1525101&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp (original)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp Fri Sep 20 18:59:30 2013
@@ -207,7 +207,7 @@ Receiver SessionImpl::createReceiverImpl
     ScopedLock l(lock);
     std::string name = address.getName();
     getFreeKey(name, receivers);
-    Receiver receiver(new ReceiverImpl(*this, name, address));
+    Receiver receiver(new ReceiverImpl(*this, name, address, connection->getAutoDecode()));
     getImplPtr<Receiver, ReceiverImpl>(receiver)->init(session, resolver);
     receivers[name] = receiver;
     return receiver;

Modified: qpid/branches/linearstore/qpid/cpp/src/qpid/client/windows/SaslFactory.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/client/windows/SaslFactory.cpp?rev=1525101&r1=1525100&r2=1525101&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/client/windows/SaslFactory.cpp (original)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/client/windows/SaslFactory.cpp Fri Sep 20 18:59:30 2013
@@ -26,6 +26,7 @@
 #include "qpid/sys/SecurityLayer.h"
 #include "qpid/sys/SecuritySettings.h"
 #include "qpid/log/Statement.h"
+#include "qpid/NullSaslServer.h"
 
 #include "boost/tokenizer.hpp"
 
@@ -107,6 +108,12 @@ std::auto_ptr<Sasl> SaslFactory::create(
     return sasl;
 }
 
+std::auto_ptr<SaslServer> SaslFactory::createServer( const std::string& realm, bool /*encryptionRequired*/, const qpid::sys::SecuritySettings& )
+{
+    std::auto_ptr<SaslServer> server(new NullSaslServer(realm));
+    return server;
+}
+
 namespace {
     const std::string ANONYMOUS = "ANONYMOUS";
     const std::string PLAIN = "PLAIN";

Modified: qpid/branches/linearstore/qpid/cpp/src/qpid/framing/Buffer.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/framing/Buffer.cpp?rev=1525101&r1=1525100&r2=1525101&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/framing/Buffer.cpp (original)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/framing/Buffer.cpp Fri Sep 20 18:59:30 2013
@@ -182,27 +182,27 @@ double Buffer::getDouble(){
 }
 
 template <>
-uint64_t Buffer::getUInt<1>() {
+QPID_COMMON_EXTERN uint64_t Buffer::getUInt<1>() {
     return getOctet();
 }
 
 template <>
-uint64_t Buffer::getUInt<2>() {
+QPID_COMMON_EXTERN uint64_t Buffer::getUInt<2>() {
     return getShort();
 }
 
 template <>
-uint64_t Buffer::getUInt<4>() {
+QPID_COMMON_EXTERN uint64_t Buffer::getUInt<4>() {
     return getLong();
 }
 
 template <>
-uint64_t Buffer::getUInt<8>() {
+QPID_COMMON_EXTERN uint64_t Buffer::getUInt<8>() {
     return getLongLong();
 }
 
 template <>
-void Buffer::putUInt<1>(uint64_t i) {
+QPID_COMMON_EXTERN void Buffer::putUInt<1>(uint64_t i) {
     if (std::numeric_limits<uint8_t>::min() <= i && i <= std::numeric_limits<uint8_t>::max()) {
         putOctet(i);
         return;
@@ -211,7 +211,7 @@ void Buffer::putUInt<1>(uint64_t i) {
 }
 
 template <>
-void Buffer::putUInt<2>(uint64_t i) {
+QPID_COMMON_EXTERN void Buffer::putUInt<2>(uint64_t i) {
     if (std::numeric_limits<uint16_t>::min() <= i && i <= std::numeric_limits<uint16_t>::max()) {
         putShort(i);
         return;
@@ -220,7 +220,7 @@ void Buffer::putUInt<2>(uint64_t i) {
 }
 
 template <>
-void Buffer::putUInt<4>(uint64_t i) {
+QPID_COMMON_EXTERN void Buffer::putUInt<4>(uint64_t i) {
     if (std::numeric_limits<uint32_t>::min() <= i && i <= std::numeric_limits<uint32_t>::max()) {
         putLong(i);
         return;
@@ -229,7 +229,7 @@ void Buffer::putUInt<4>(uint64_t i) {
 }
 
 template <>
-void Buffer::putUInt<8>(uint64_t i) {
+QPID_COMMON_EXTERN void Buffer::putUInt<8>(uint64_t i) {
     putLongLong(i);
 }
 

Modified: qpid/branches/linearstore/qpid/cpp/src/qpid/framing/FrameSet.h
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/framing/FrameSet.h?rev=1525101&r1=1525100&r2=1525101&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/framing/FrameSet.h (original)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/framing/FrameSet.h Fri Sep 20 18:59:30 2013
@@ -9,9 +9,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *   http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -44,6 +44,8 @@ class FrameSet
 
 public:
     typedef boost::shared_ptr<FrameSet> shared_ptr;
+    typedef Frames::iterator iterator;
+    typedef Frames::const_iterator const_iterator;
 
     QPID_COMMON_EXTERN FrameSet(const SequenceNumber& id);
     QPID_COMMON_EXTERN FrameSet(const FrameSet&);
@@ -62,7 +64,7 @@ public:
     QPID_COMMON_EXTERN AMQMethodBody* getMethod();
     QPID_COMMON_EXTERN const AMQHeaderBody* getHeaders() const;
     QPID_COMMON_EXTERN AMQHeaderBody* getHeaders();
-     
+
     template <class T> bool isA() const {
         const AMQMethodBody* method = getMethod();
         return method && method->isA<T>();
@@ -71,12 +73,12 @@ public:
     template <class T> const T* as() const {
         const AMQMethodBody* method = getMethod();
         return (method && method->isA<T>()) ? dynamic_cast<const T*>(method) : 0;
-    }    
+    }
 
     template <class T>  T* as()  {
         AMQMethodBody* method = getMethod();
         return (method && method->isA<T>()) ? dynamic_cast<T*>(method) : 0;
-    }    
+    }
 
     template <class T> const T* getHeaderProperties() const {
         const AMQHeaderBody* header = getHeaders();
@@ -85,7 +87,7 @@ public:
 
     Frames::const_iterator begin() const { return parts.begin(); }
     Frames::const_iterator end() const { return parts.end(); }
-    
+
     const SequenceNumber& getId() const { return id; }
 
     template <class P> void remove(P predicate) {

Modified: qpid/branches/linearstore/qpid/cpp/src/qpid/ha/AlternateExchangeSetter.h
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/ha/AlternateExchangeSetter.h?rev=1525101&r1=1525100&r2=1525101&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/ha/AlternateExchangeSetter.h (original)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/ha/AlternateExchangeSetter.h Fri Sep 20 18:59:30 2013
@@ -25,7 +25,6 @@
 #include "qpid/log/Statement.h"
 #include "qpid/broker/Exchange.h"
 #include "qpid/broker/ExchangeRegistry.h"
-#include "boost/function.hpp"
 #include <map>
 
 namespace qpid {

Modified: qpid/branches/linearstore/qpid/cpp/src/qpid/ha/Backup.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/ha/Backup.cpp?rev=1525101&r1=1525100&r2=1525101&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/ha/Backup.cpp (original)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/ha/Backup.cpp Fri Sep 20 18:59:30 2013
@@ -52,9 +52,7 @@ using sys::Mutex;
 Backup::Backup(HaBroker& hb, const Settings& s) :
     logPrefix("Backup: "), membership(hb.getMembership()), stopped(false),
     haBroker(hb), broker(hb.getBroker()), settings(s),
-    statusCheck(
-        new StatusCheck(
-            logPrefix, broker.getOptions().linkHeartbeatInterval, hb.getBrokerInfo()))
+    statusCheck(new StatusCheck(hb))
 {}
 
 void Backup::setBrokerUrl(const Url& brokers) {

Modified: qpid/branches/linearstore/qpid/cpp/src/qpid/ha/Backup.h
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/ha/Backup.h?rev=1525101&r1=1525100&r2=1525101&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/ha/Backup.h (original)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/ha/Backup.h Fri Sep 20 18:59:30 2013
@@ -59,6 +59,8 @@ class Backup : public Role
 
     Role* promote();
 
+    boost::shared_ptr<BrokerReplicator> getBrokerReplicator() { return replicator; }
+
   private:
     void stop(sys::Mutex::ScopedLock&);
     Role* recover(sys::Mutex::ScopedLock&);

Modified: qpid/branches/linearstore/qpid/cpp/src/qpid/ha/BrokerInfo.h
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/ha/BrokerInfo.h?rev=1525101&r1=1525100&r2=1525101&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/ha/BrokerInfo.h (original)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/ha/BrokerInfo.h Fri Sep 20 18:59:30 2013
@@ -23,6 +23,7 @@
  */
 
 #include "types.h"
+#include "hash.h"
 #include "qpid/Url.h"
 #include "qpid/framing/FieldTable.h"
 #include "qpid/types/Uuid.h"
@@ -42,7 +43,7 @@ class BrokerInfo
 {
   public:
     typedef std::set<BrokerInfo> Set;
-    typedef qpid::sys::unordered_map<types::Uuid, BrokerInfo, types::Uuid::Hasher> Map;
+    typedef qpid::sys::unordered_map<types::Uuid, BrokerInfo, Hasher<types::Uuid> > Map;
 
     BrokerInfo();
     BrokerInfo(const types::Uuid& id, BrokerStatus, const Address& = Address());

Modified: qpid/branches/linearstore/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp?rev=1525101&r1=1525100&r2=1525101&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp (original)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp Fri Sep 20 18:59:30 2013
@@ -21,6 +21,7 @@
 #include "BrokerReplicator.h"
 #include "HaBroker.h"
 #include "QueueReplicator.h"
+#include "TxReplicator.h"
 #include "qpid/broker/Broker.h"
 #include "qpid/broker/amqp_0_10/Connection.h"
 #include "qpid/broker/ConnectionObserver.h"
@@ -129,7 +130,6 @@ const string COLON(":");
 void sendQuery(const string& packageName, const string& className, const string& queueName,
                SessionHandler& sessionHandler)
 {
-    framing::AMQP_ServerProxy peer(sessionHandler.out);
     Variant::Map request;
     request[WHAT] = OBJECT;
     Variant::Map schema;
@@ -229,8 +229,8 @@ class BrokerReplicator::UpdateTracker {
     typedef boost::function<void (const std::string&)> CleanFn;
 
     UpdateTracker(const std::string& type_, // "queue" or "exchange"
-                  CleanFn f, const ReplicationTest& rt)
-        : type(type_), cleanFn(f), repTest(rt) {}
+                  CleanFn f)
+        : type(type_), cleanFn(f) {}
 
     /** Destructor cleans up remaining initial queues. */
     ~UpdateTracker() {
@@ -245,16 +245,10 @@ class BrokerReplicator::UpdateTracker {
     }
 
     /** Add an exchange name */
-    void addExchange(Exchange::shared_ptr ex)  {
-        if (repTest.getLevel(*ex))
-            initial.insert(ex->getName());
-    }
+    void addExchange(Exchange::shared_ptr ex)  { initial.insert(ex->getName()); }
 
     /** Add a queue name. */
-    void addQueue(Queue::shared_ptr q) {
-        if (repTest.getLevel(*q))
-            initial.insert(q->getName());
-    }
+    void addQueue(Queue::shared_ptr q) { initial.insert(q->getName()); }
 
     /** Received an event for name */
     void event(const std::string& name) {
@@ -275,13 +269,13 @@ class BrokerReplicator::UpdateTracker {
     void clean(const std::string& name) {
         QPID_LOG(info, "Backup: Deleted " << type << " " << name <<
                  ": no longer exists on primary");
-        cleanFn(name);
+        try { cleanFn(name); }
+        catch (const framing::NotFoundException&) {}
     }
 
     std::string type;
     Names initial, events;
     CleanFn cleanFn;
-    ReplicationTest repTest;
 };
 
 namespace {
@@ -349,7 +343,8 @@ BrokerReplicator::~BrokerReplicator() { 
 
 namespace {
 void collectQueueReplicators(
-    const boost::shared_ptr<Exchange> ex, set<boost::shared_ptr<QueueReplicator> >& collect)
+    const boost::shared_ptr<Exchange>& ex,
+    set<boost::shared_ptr<QueueReplicator> >& collect)
 {
     boost::shared_ptr<QueueReplicator> qr(boost::dynamic_pointer_cast<QueueReplicator>(ex));
     if (qr) collect.insert(qr);
@@ -390,16 +385,13 @@ void BrokerReplicator::connected(Bridge&
 
     exchangeTracker.reset(
         new UpdateTracker("exchange",
-                          boost::bind(&BrokerReplicator::deleteExchange, this, _1),
-                          replicationTest));
-    exchanges.eachExchange(
-        boost::bind(&UpdateTracker::addExchange, exchangeTracker.get(), _1));
+                          boost::bind(&BrokerReplicator::deleteExchange, this, _1)));
+    exchanges.eachExchange(boost::bind(&BrokerReplicator::existingExchange, this, _1));
 
     queueTracker.reset(
         new UpdateTracker("queue",
-                          boost::bind(&BrokerReplicator::deleteQueue, this, _1, true),
-                          replicationTest));
-    queues.eachQueue(boost::bind(&UpdateTracker::addQueue, queueTracker.get(), _1));
+                          boost::bind(&BrokerReplicator::deleteQueue, this, _1, true)));
+    queues.eachQueue(boost::bind(&BrokerReplicator::existingQueue, this, _1));
 
     framing::AMQP_ServerProxy peer(sessionHandler.out);
     const qmf::org::apache::qpid::broker::ArgsLinkBridge& args(bridge.getArgs());
@@ -428,6 +420,21 @@ void BrokerReplicator::connected(Bridge&
     sendQuery(ORG_APACHE_QPID_BROKER, BINDING, queueName, sessionHandler);
 }
 
+// Called for each queue in existence when the backup connects to a primary.
+void BrokerReplicator::existingQueue(const boost::shared_ptr<Queue>& q) {
+    if (replicationTest.getLevel(*q)) {
+        QPID_LOG(debug, "Existing queue: " << q->getName());
+        queueTracker->addQueue(q);
+    }
+}
+
+void BrokerReplicator::existingExchange(const boost::shared_ptr<Exchange>& ex) {
+    if (replicationTest.getLevel(*ex)) {
+        QPID_LOG(debug, "Existing exchange: " << ex->getName());
+        exchangeTracker->addExchange(ex);
+    }
+}
+
 void BrokerReplicator::route(Deliverable& msg) {
     // We transition from JOINING->CATCHUP on the first message received from the primary.
     // Until now we couldn't be sure if we had a good connection to the primary.
@@ -554,11 +561,7 @@ void BrokerReplicator::doEventExchangeDe
 void BrokerReplicator::doEventExchangeDelete(Variant::Map& values) {
     string name = values[EXNAME].asString();
     boost::shared_ptr<Exchange> exchange = exchanges.find(name);
-    if (!exchange) {
-        QPID_LOG(warning, logPrefix << "Exchange delete event, not found: " << name);
-    } else if (!replicationTest.getLevel(*exchange)) {
-        QPID_LOG(warning, logPrefix << "Exchange delete event, not replicated: " << name);
-    } else {
+    if (exchange && replicationTest.getLevel(*exchange)) {
         QPID_LOG(debug, logPrefix << "Exchange delete event:" << name);
         if (exchangeTracker.get()) exchangeTracker->event(name);
         deleteExchange(name);
@@ -651,8 +654,10 @@ void BrokerReplicator::doResponseQueue(V
     if (!queueTracker.get())
         throw Exception(QPID_MSG("Unexpected queue response: " << values));
     if (!queueTracker->response(name)) return; // Response is out-of-date
+
     QPID_LOG(debug, logPrefix << "Queue response: " << name);
     boost::shared_ptr<Queue> queue = queues.find(name);
+
     if (queue) { // Already exists
         bool uuidOk = (getHaUuid(queue->getSettings().original) == getHaUuid(argsMap));
         if (!uuidOk) QPID_LOG(debug, logPrefix << "UUID mismatch for queue: " << name);
@@ -660,6 +665,7 @@ void BrokerReplicator::doResponseQueue(V
         QPID_LOG(debug, logPrefix << "Queue response replacing queue:  " << name);
         deleteQueue(name);
     }
+
     framing::FieldTable args;
     qpid::amqp_0_10::translate(argsMap, args);
     boost::shared_ptr<QueueReplicator> qr = replicateQueue(
@@ -770,8 +776,13 @@ boost::shared_ptr<QueueReplicator> Broke
     const boost::shared_ptr<Queue>& queue)
 {
     if (replicationTest.getLevel(*queue) == ALL) {
-        boost::shared_ptr<QueueReplicator> qr(
-            new QueueReplicator(haBroker, queue, link));
+        boost::shared_ptr<QueueReplicator> qr;
+        if (TxReplicator::isTxQueue(queue->getName())){
+            qr.reset(new TxReplicator(haBroker, queue, link));
+        }
+        else {
+            qr.reset(new QueueReplicator(haBroker, queue, link));
+        }
         qr->activate();
         return qr;
     }
@@ -785,7 +796,7 @@ void BrokerReplicator::deleteQueue(const
         // messages. Any reroutes will be done at the primary and
         // replicated as normal.
         if (purge) queue->purge(0, boost::shared_ptr<Exchange>());
-        broker.deleteQueue(name, userId, remoteHost);
+        haBroker.getBroker().deleteQueue(name, userId, remoteHost);
         QPID_LOG(debug, logPrefix << "Queue deleted: " << name);
     }
 }
@@ -864,28 +875,35 @@ bool BrokerReplicator::isBound(boost::sh
 
 string BrokerReplicator::getType() const { return QPID_CONFIGURATION_REPLICATOR; }
 
-void BrokerReplicator::autoDeleteCheck(boost::shared_ptr<Exchange> ex) {
+void BrokerReplicator::disconnectedExchange(boost::shared_ptr<Exchange> ex) {
     boost::shared_ptr<QueueReplicator> qr(boost::dynamic_pointer_cast<QueueReplicator>(ex));
-    if (!qr) return;
-    assert(qr);
-    if (qr->getQueue()->isAutoDelete() && qr->isSubscribed()) {
-        if (qr->getQueue()->getSettings().autoDeleteDelay) {
-            // Start the auto-delete timer
-            qr->getQueue()->releaseFromUse();
-            qr->getQueue()->scheduleAutoDelete();
+    if (qr) {
+        qr->disconnect();
+        if (TxReplicator::isTxQueue(qr->getQueue()->getName())) {
+            // Transactions are aborted on failover so clean up tx-queues
+            deleteQueue(qr->getQueue()->getName());
         }
-        else {
-            // Delete immediately. Don't purge, the primary is gone so we need
-            // to reroute the deleted messages.
-            deleteQueue(qr->getQueue()->getName(), false);
+        else if (qr->getQueue()->isAutoDelete() && qr->isSubscribed()) {
+            if (qr->getQueue()->getSettings().autoDeleteDelay) {
+                // Start the auto-delete timer
+                qr->getQueue()->releaseFromUse();
+                qr->getQueue()->scheduleAutoDelete();
+            }
+            else {
+                // Delete immediately. Don't purge, the primary is gone so we need
+                // to reroute the deleted messages.
+                deleteQueue(qr->getQueue()->getName(), false);
+            }
         }
     }
 }
 
+typedef vector<boost::shared_ptr<Exchange> > ExchangeVector;
+
 // Callback function for accumulating exchange candidates
 namespace {
-	void exchangeAccumulatorCallback(vector<boost::shared_ptr<Exchange> >& c, const Exchange::shared_ptr& i) {
-		c.push_back(i);
+	void exchangeAccumulatorCallback(ExchangeVector& ev, const Exchange::shared_ptr& i) {
+		ev.push_back(i);
 	}
 }
 
@@ -893,13 +911,12 @@ namespace {
 void BrokerReplicator::disconnected() {
     QPID_LOG(info, logPrefix << "Disconnected from primary " << primary);
     connection = 0;
-    // Clean up auto-delete queues
-    vector<boost::shared_ptr<Exchange> > collect;
-    // Make a copy so we can work outside the ExchangeRegistry lock
-    exchanges.eachExchange(
-        boost::bind(&exchangeAccumulatorCallback, boost::ref(collect), _1));
-    for_each(collect.begin(), collect.end(),
-             boost::bind(&BrokerReplicator::autoDeleteCheck, this, _1));
+
+    // Make copy of exchanges so we can work outside the registry lock.
+    ExchangeVector exs;
+    exchanges.eachExchange(boost::bind(&exchangeAccumulatorCallback, boost::ref(exs), _1));
+    for_each(exs.begin(), exs.end(),
+             boost::bind(&BrokerReplicator::disconnectedExchange, this, _1));
 }
 
 void BrokerReplicator::setMembership(const Variant::List& brokers) {

Modified: qpid/branches/linearstore/qpid/cpp/src/qpid/ha/BrokerReplicator.h
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/ha/BrokerReplicator.h?rev=1525101&r1=1525100&r2=1525101&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/ha/BrokerReplicator.h (original)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/ha/BrokerReplicator.h Fri Sep 20 18:59:30 2013
@@ -71,6 +71,8 @@ class BrokerReplicator : public broker::
                          public boost::enable_shared_from_this<BrokerReplicator>
 {
   public:
+    typedef boost::shared_ptr<QueueReplicator> QueueReplicatorPtr;
+
     BrokerReplicator(HaBroker&, const boost::shared_ptr<broker::Link>&);
     ~BrokerReplicator();
 
@@ -84,8 +86,9 @@ class BrokerReplicator : public broker::
     bool isBound(boost::shared_ptr<broker::Queue>, const std::string* const, const framing::FieldTable* const);
     void shutdown();
 
+    QueueReplicatorPtr findQueueReplicator(const std::string& qname);
+
   private:
-    typedef boost::shared_ptr<QueueReplicator> QueueReplicatorPtr;
     typedef std::pair<boost::shared_ptr<broker::Queue>, bool> CreateQueueResult;
     typedef std::pair<boost::shared_ptr<broker::Exchange>, bool> CreateExchangeResult;
 
@@ -99,6 +102,8 @@ class BrokerReplicator : public broker::
     class ConnectionObserver;
 
     void connected(broker::Bridge&, broker::SessionHandler&);
+    void existingQueue(const boost::shared_ptr<broker::Queue>&);
+    void existingExchange(const boost::shared_ptr<broker::Exchange>&);
 
     void doEventQueueDeclare(types::Variant::Map& values);
     void doEventQueueDelete(types::Variant::Map& values);
@@ -114,7 +119,6 @@ class BrokerReplicator : public broker::
     void doResponseBind(types::Variant::Map& values);
     void doResponseHaBroker(types::Variant::Map& values);
 
-    QueueReplicatorPtr findQueueReplicator(const std::string& qname);
     QueueReplicatorPtr startQueueReplicator(const boost::shared_ptr<broker::Queue>&);
 
     QueueReplicatorPtr replicateQueue(
@@ -135,8 +139,7 @@ class BrokerReplicator : public broker::
     void deleteQueue(const std::string& name, bool purge=true);
     void deleteExchange(const std::string& name);
 
-    void autoDeleteCheck(boost::shared_ptr<broker::Exchange>);
-
+    void disconnectedExchange(boost::shared_ptr<broker::Exchange>);
     void disconnected();
 
     void setMembership(const types::Variant::List&); // Set membership from list.

Modified: qpid/branches/linearstore/qpid/cpp/src/qpid/ha/FailoverExchange.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/ha/FailoverExchange.cpp?rev=1525101&r1=1525100&r2=1525101&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/ha/FailoverExchange.cpp (original)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/ha/FailoverExchange.cpp Fri Sep 20 18:59:30 2013
@@ -19,7 +19,7 @@
  *
  */
 #include "FailoverExchange.h"
-#include "makeMessage.h"
+#include "Event.h"
 #include "qpid/broker/amqp_0_10/MessageTransfer.h"
 #include "qpid/broker/Message.h"
 #include "qpid/broker/DeliverableMessage.h"
@@ -117,7 +117,7 @@ void FailoverExchange::sendUpdate(const 
     if (urls.empty()) return;
     framing::Array array = vectorToUrlArray(urls);
     const ProtocolVersion v;
-    broker::Message message(makeMessage(Buffer(), typeName));
+    broker::Message message(makeMessage(std::string(), typeName, typeName));
     MessageTransfer& transfer = MessageTransfer::get(message);
     MessageProperties* props =
         transfer.getFrames().getHeaders()->get<framing::MessageProperties>(true);

Modified: qpid/branches/linearstore/qpid/cpp/src/qpid/ha/HaBroker.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/ha/HaBroker.cpp?rev=1525101&r1=1525100&r2=1525101&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/ha/HaBroker.cpp (original)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/ha/HaBroker.cpp Fri Sep 20 18:59:30 2013
@@ -64,6 +64,7 @@ using boost::dynamic_pointer_cast;
 HaBroker::HaBroker(broker::Broker& b, const Settings& s)
     : systemId(b.getSystem()->getSystemId().data()),
       settings(s),
+      userId(s.username+"@"+b.getOptions().realm),
       broker(b),
       observer(new ConnectionObserver(*this, systemId)),
       role(new StandAlone),
@@ -75,14 +76,14 @@ HaBroker::HaBroker(broker::Broker& b, co
     // otherwise there's a window for a client to connect before we get to
     // initialize()
     if (settings.cluster) {
-        QPID_LOG(debug, "Broker startup, rejecting client connections.");
+        QPID_LOG(debug, "Backup starting, rejecting client connections.");
         shared_ptr<broker::ConnectionObserver> excluder(new BackupConnectionExcluder);
         observer->setObserver(excluder, "Backup: ");
         broker.getConnectionObservers().add(observer);
         broker.getExchanges().registerExchange(failoverExchange);
     }
     // QueueSnapshots are needed for standalone replication as well as cluster.
-    broker.getConfigurationObservers().add(queueSnapshots);
+    broker.getBrokerObservers().add(queueSnapshots);
 }
 
 namespace {
@@ -94,7 +95,7 @@ bool isNone(const std::string& x) { retu
 void HaBroker::initialize() {
     if (settings.cluster) {
         membership.setStatus(JOINING);
-        QPID_LOG(notice, "Initializing HA broker: " << membership.getInfo());
+        QPID_LOG(notice, "Initializing HA broker: " << membership.getSelf());
     }
 
     // Set up the management object.
@@ -202,7 +203,7 @@ std::vector<Url> HaBroker::getKnownBroke
 }
 
 void HaBroker::shutdown(const std::string& message) {
-    QPID_LOG(critical, message);
+    QPID_LOG(critical, "Shutting down: " << message);
     broker.shutdown();
     throw Exception(message);
 }
@@ -213,7 +214,7 @@ BrokerStatus HaBroker::getStatus() const
 
 void HaBroker::setAddress(const Address& a) {
     QPID_LOG(info, role->getLogPrefix() << "Set self address to: " << a);
-    membership.setAddress(a);
+    membership.setSelfAddress(a);
 }
 
 boost::shared_ptr<QueueReplicator> HaBroker::findQueueReplicator(const std::string& queueName) {

Modified: qpid/branches/linearstore/qpid/cpp/src/qpid/ha/HaBroker.h
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/ha/HaBroker.h?rev=1525101&r1=1525100&r2=1525101&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/ha/HaBroker.h (original)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/ha/HaBroker.h Fri Sep 20 18:59:30 2013
@@ -84,6 +84,7 @@ class HaBroker : public management::Mana
 
     broker::Broker& getBroker() { return broker; }
     const Settings& getSettings() const { return settings; }
+    boost::shared_ptr<Role> getRole() const {return role; }
 
     /** Shut down the broker because of a critical error. */
     void shutdown(const std::string& message);
@@ -91,7 +92,7 @@ class HaBroker : public management::Mana
     BrokerStatus getStatus() const;
     boost::shared_ptr<ConnectionObserver> getObserver() { return observer; }
 
-    BrokerInfo getBrokerInfo() const { return membership.getInfo(); }
+    BrokerInfo getBrokerInfo() const { return membership.getSelf(); }
     Membership& getMembership() { return membership; }
     types::Uuid getSystemId() const { return systemId; }
 
@@ -101,6 +102,9 @@ class HaBroker : public management::Mana
 
     boost::shared_ptr<QueueReplicator> findQueueReplicator(const std::string& queueName);
 
+    /** Authenticated user ID for queue create/delete */
+    std::string getUserId() const { return userId; }
+
   private:
     void setPublicUrl(const Url&);
     void setBrokerUrl(const Url&);
@@ -111,6 +115,7 @@ class HaBroker : public management::Mana
     // Immutable members
     const types::Uuid systemId;
     const Settings settings;
+    const std::string userId;
 
     // Member variables protected by lock
     mutable sys::Mutex lock;

Modified: qpid/branches/linearstore/qpid/cpp/src/qpid/ha/Membership.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/ha/Membership.cpp?rev=1525101&r1=1525100&r2=1525101&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/ha/Membership.cpp (original)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/ha/Membership.cpp Fri Sep 20 18:59:30 2013
@@ -107,6 +107,14 @@ BrokerInfo::Set Membership::otherBackups
     return result;
 }
 
+BrokerInfo::Set Membership::getBrokers() const {
+    Mutex::ScopedLock l(lock);
+    BrokerInfo::Set result;
+    transform(brokers.begin(), brokers.end(), inserter(result, result.begin()),
+              boost::bind(&BrokerInfo::Map::value_type::second, _1));
+    return result;
+}
+
 bool Membership::get(const types::Uuid& id, BrokerInfo& result) const {
     Mutex::ScopedLock l(lock);
     BrokerInfo::Map::const_iterator i = brokers.find(id);
@@ -136,10 +144,9 @@ bool checkTransition(BrokerStatus from, 
 }
 } // namespace
 
-
 void Membership::update(Mutex::ScopedLock& l) {
     QPID_LOG(info, "Membership: " <<  brokers);
-    // Update managment and send update event.
+// Update managment and send update event.
     BrokerStatus newStatus = getStatus(l);
     Variant::List brokerList = asList(l);
     if (mgmtObject) {
@@ -198,14 +205,14 @@ BrokerStatus Membership::getStatus(sys::
     return i->second.getStatus();
 }
 
-BrokerInfo Membership::getInfo() const  {
+BrokerInfo Membership::getSelf() const  {
     Mutex::ScopedLock l(lock);
     BrokerInfo::Map::const_iterator i = brokers.find(self);
     assert(i != brokers.end());
     return i->second;
 }
 
-void Membership::setAddress(const Address& a) {
+void Membership::setSelfAddress(const Address& a) {
     Mutex::ScopedLock l(lock);
     brokers[self].setAddress(a);
     update(l);

Modified: qpid/branches/linearstore/qpid/cpp/src/qpid/ha/Membership.h
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/ha/Membership.h?rev=1525101&r1=1525100&r2=1525101&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/ha/Membership.h (original)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/ha/Membership.h Fri Sep 20 18:59:30 2013
@@ -26,6 +26,7 @@
 #include "types.h"
 #include "qpid/log/Statement.h"
 #include "qpid/sys/Mutex.h"
+#include "qpid/sys/Time.h"
 #include "qpid/types/Variant.h"
 #include <boost/function.hpp>
 #include <set>
@@ -69,16 +70,19 @@ class Membership
     /** Return IDs of all READY backups other than self */
     BrokerInfo::Set otherBackups() const;
 
+    /** Return IDs of all brokers */
+    BrokerInfo::Set getBrokers() const;
+
     void assign(const types::Variant::List&);
     types::Variant::List asList() const;
 
     bool get(const types::Uuid& id, BrokerInfo& result) const;
 
-    types::Uuid getSelf() const  { return self; }
-    BrokerInfo getInfo() const;
+    BrokerInfo getSelf() const;
     BrokerStatus getStatus() const;
     void setStatus(BrokerStatus s);
-    void setAddress(const Address&);
+
+    void setSelfAddress(const Address&);
 
   private:
     void update(sys::Mutex::ScopedLock&);

Modified: qpid/branches/linearstore/qpid/cpp/src/qpid/ha/Primary.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/ha/Primary.cpp?rev=1525101&r1=1525100&r2=1525101&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/ha/Primary.cpp (original)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/ha/Primary.cpp Fri Sep 20 18:59:30 2013
@@ -27,17 +27,19 @@
 #include "RemoteBackup.h"
 #include "ConnectionObserver.h"
 #include "QueueReplicator.h"
+#include "PrimaryTxObserver.h"
 #include "qpid/assert.h"
 #include "qpid/broker/Broker.h"
-#include "qpid/broker/ConfigurationObserver.h"
+#include "qpid/broker/BrokerObserver.h"
 #include "qpid/broker/Connection.h"
 #include "qpid/broker/Queue.h"
 #include "qpid/framing/FieldTable.h"
 #include "qpid/framing/FieldValue.h"
-#include "qpid/framing/Uuid.h"
 #include "qpid/log/Statement.h"
+#include "qpid/types/Uuid.h"
 #include "qpid/sys/Timer.h"
 #include <boost/bind.hpp>
+#include <boost/shared_ptr.hpp>
 
 namespace qpid {
 namespace ha {
@@ -59,15 +61,18 @@ class PrimaryConnectionObserver : public
     Primary& primary;
 };
 
-class PrimaryConfigurationObserver : public broker::ConfigurationObserver
+class PrimaryBrokerObserver : public broker::BrokerObserver
 {
   public:
-    PrimaryConfigurationObserver(Primary& p) : primary(p) {}
+    PrimaryBrokerObserver(Primary& p) : primary(p) {}
     void queueCreate(const Primary::QueuePtr& q) { primary.queueCreate(q); }
     void queueDestroy(const Primary::QueuePtr& q) { primary.queueDestroy(q); }
     void exchangeCreate(const Primary::ExchangePtr& q) { primary.exchangeCreate(q); }
     void exchangeDestroy(const Primary::ExchangePtr& q) { primary.exchangeDestroy(q); }
-  private:
+    void startTx(const shared_ptr<broker::TxBuffer>& tx) { primary.startTx(tx); }
+    void startDtx(const shared_ptr<broker::DtxBuffer>& dtx) { primary.startDtx(dtx); }
+
+ private:
     Primary& primary;
 };
 
@@ -82,8 +87,6 @@ class ExpectedBackupTimerTask : public s
 
 } // namespace
 
-Primary* Primary::instance = 0;
-
 Primary::Primary(HaBroker& hb, const BrokerInfo::Set& expect) :
     haBroker(hb), membership(hb.getMembership()),
     logPrefix("Primary: "), active(false),
@@ -92,13 +95,11 @@ Primary::Primary(HaBroker& hb, const Bro
     hb.getMembership().setStatus(RECOVERING);
     broker::QueueRegistry& queues = hb.getBroker().getQueues();
     queues.eachQueue(boost::bind(&Primary::initializeQueue, this, _1));
-    assert(instance == 0);
-    instance = this;            // Let queue replicators find us.
     if (expect.empty()) {
         QPID_LOG(notice, logPrefix << "Promoted to primary. No expected backups.");
     }
     else {
-        // NOTE: RemoteBackups must be created before we set the ConfigurationObserver
+        // NOTE: RemoteBackups must be created before we set the BrokerObserver
         // or ConnectionObserver so that there is no client activity while
         // the QueueGuards are created.
         QPID_LOG(notice, logPrefix << "Promoted to primary. Expected backups: " << expect);
@@ -113,8 +114,8 @@ Primary::Primary(HaBroker& hb, const Bro
         timerTask = new ExpectedBackupTimerTask(*this, deadline);
         hb.getBroker().getTimer().add(timerTask);
     }
-    configurationObserver.reset(new PrimaryConfigurationObserver(*this));
-    haBroker.getBroker().getConfigurationObservers().add(configurationObserver);
+    brokerObserver.reset(new PrimaryBrokerObserver(*this));
+    haBroker.getBroker().getBrokerObservers().add(brokerObserver);
     checkReady();               // Outside lock
 
     // Allow client connections
@@ -124,7 +125,7 @@ Primary::Primary(HaBroker& hb, const Bro
 
 Primary::~Primary() {
     if (timerTask) timerTask->cancel();
-    haBroker.getBroker().getConfigurationObservers().remove(configurationObserver);
+    haBroker.getBroker().getBrokerObservers().remove(brokerObserver);
     haBroker.getObserver()->reset();
 }
 
@@ -212,14 +213,42 @@ void Primary::readyReplica(const Replica
     if (backup) checkReady(backup);
 }
 
+void Primary::addReplica(ReplicatingSubscription& rs) {
+    // Note this is called before the ReplicatingSubscription has been activated
+    // on the queue.
+    sys::Mutex::ScopedLock l(lock);
+    replicas[make_pair(rs.getBrokerInfo().getSystemId(), rs.getQueue())] = &rs;
+}
+
+void Primary::skip(
+    const types::Uuid& backup,
+    const boost::shared_ptr<broker::Queue>& queue,
+    const ReplicationIdSet& ids)
+{
+    sys::Mutex::ScopedLock l(lock);
+    ReplicaMap::const_iterator i = replicas.find(make_pair(backup, queue));
+    if (i != replicas.end()) i->second->addSkip(ids);
+}
+
+void Primary::removeReplica(const ReplicatingSubscription& rs) {
+    sys::Mutex::ScopedLock l(lock);
+    replicas.erase(make_pair(rs.getBrokerInfo().getSystemId(), rs.getQueue()));
+
+    TxMap::const_iterator i = txMap.find(rs.getQueue()->getName());
+    if (i != txMap.end()) {
+        boost::shared_ptr<PrimaryTxObserver> tx = i->second.lock();
+        if (tx) tx->cancel(rs);
+    }
+}
+
 // NOTE: Called with queue registry lock held.
 void Primary::queueCreate(const QueuePtr& q) {
     // Set replication argument.
     ReplicateLevel level = replicationTest.useLevel(*q);
-    QPID_LOG(debug, logPrefix << "Created queue " << q->getName()
-             << " replication: " << printable(level));
     q->addArgument(QPID_REPLICATE, printable(level).str());
     if (level) {
+        QPID_LOG(debug, logPrefix << "Created queue " << q->getName()
+                 << " replication: " << printable(level));
         initializeQueue(q);
         // Give each queue a unique id. Used by backups to avoid confusion of
         // same-named queues.
@@ -235,24 +264,26 @@ void Primary::queueCreate(const QueuePtr
 
 // NOTE: Called with queue registry lock held.
 void Primary::queueDestroy(const QueuePtr& q) {
-    QPID_LOG(debug, logPrefix << "Destroyed queue " << q->getName());
-    {
-        Mutex::ScopedLock l(lock);
-        for (BackupMap::iterator i = backups.begin(); i != backups.end(); ++i)
-            i->second->queueDestroy(q);
+    if (replicationTest.useLevel(*q)) {
+        QPID_LOG(debug, logPrefix << "Destroyed queue " << q->getName());
+        {
+            Mutex::ScopedLock l(lock);
+            for (BackupMap::iterator i = backups.begin(); i != backups.end(); ++i)
+                i->second->queueDestroy(q);
+        }
+        checkReady();               // Outside lock
     }
-    checkReady();               // Outside lock
 }
 
 // NOTE: Called with exchange registry lock held.
 void Primary::exchangeCreate(const ExchangePtr& ex) {
     ReplicateLevel level = replicationTest.useLevel(*ex);
-    QPID_LOG(debug, logPrefix << "Created exchange " << ex->getName()
-             << " replication: " << printable(level));
     FieldTable args = ex->getArgs();
     args.setString(QPID_REPLICATE, printable(level).str()); // Set replication arg.
     if (level) {
-        // Give each exchange a unique id to avoid confusion of same-named exchanges.
+        QPID_LOG(debug, logPrefix << "Created exchange " << ex->getName()
+                 << " replication: " << printable(level));
+         // Give each exchange a unique id to avoid confusion of same-named exchanges.
         args.set(QPID_HA_UUID, FieldTable::ValuePtr(new UuidValue(&Uuid(true)[0])));
     }
     ex->setArgs(args);
@@ -260,8 +291,10 @@ void Primary::exchangeCreate(const Excha
 
 // NOTE: Called with exchange registry lock held.
 void Primary::exchangeDestroy(const ExchangePtr& ex) {
-    QPID_LOG(debug, logPrefix << "Destroyed exchange " << ex->getName());
-    // Do nothing
+    if (replicationTest.useLevel(*ex)) {
+        QPID_LOG(debug, logPrefix << "Destroyed exchange " << ex->getName());
+        // Do nothing
+    }
  }
 
 // New backup connected
@@ -280,6 +313,7 @@ void Primary::backupDisconnect(shared_pt
     backup->cancel();
     expectedBackups.erase(backup);
     backups.erase(id);
+    membership.remove(id);
 }
 
 
@@ -365,4 +399,19 @@ void Primary::setCatchupQueues(const Rem
     backup->startCatchup();
 }
 
+shared_ptr<PrimaryTxObserver> Primary::makeTxObserver() {
+    shared_ptr<PrimaryTxObserver> observer(new PrimaryTxObserver(haBroker));
+    observer->initialize();
+    txMap[observer->getTxQueue()->getName()] = observer;
+    return observer;
+}
+
+void Primary::startTx(const boost::shared_ptr<broker::TxBuffer>& tx) {
+    tx->setObserver(makeTxObserver());
+}
+
+void Primary::startDtx(const boost::shared_ptr<broker::DtxBuffer>& dtx) {
+    dtx->setObserver(makeTxObserver());
+}
+
 }} // namespace qpid::ha

Modified: qpid/branches/linearstore/qpid/cpp/src/qpid/ha/Primary.h
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/ha/Primary.h?rev=1525101&r1=1525100&r2=1525101&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/ha/Primary.h (original)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/ha/Primary.h Fri Sep 20 18:59:30 2013
@@ -23,12 +23,14 @@
  */
 
 #include "types.h"
+#include "hash.h"
 #include "BrokerInfo.h"
 #include "ReplicationTest.h"
 #include "Role.h"
 #include "qpid/sys/Mutex.h"
 #include "qpid/sys/unordered_map.h"
 #include <boost/shared_ptr.hpp>
+#include <boost/weak_ptr.hpp>
 #include <boost/intrusive_ptr.hpp>
 #include <string>
 
@@ -38,7 +40,9 @@ namespace broker {
 class Queue;
 class Connection;
 class ConnectionObserver;
-class ConfigurationObserver;
+class BrokerObserver;
+class TxBuffer;
+class DtxBuffer;
 }
 
 namespace sys {
@@ -51,6 +55,7 @@ class ReplicatingSubscription;
 class RemoteBackup;
 class QueueGuard;
 class Membership;
+class PrimaryTxObserver;
 
 /**
  * State associated with a primary broker:
@@ -66,8 +71,6 @@ class Primary : public Role
     typedef boost::shared_ptr<broker::Exchange> ExchangePtr;
     typedef boost::shared_ptr<RemoteBackup> RemoteBackupPtr;
 
-    static Primary* get() { return instance; }
-
     Primary(HaBroker& hb, const BrokerInfo::Set& expectedBackups);
     ~Primary();
 
@@ -77,13 +80,21 @@ class Primary : public Role
     void setBrokerUrl(const Url&) {}
 
     void readyReplica(const ReplicatingSubscription&);
-    void removeReplica(const std::string& q);
+    void addReplica(ReplicatingSubscription&);
+    void removeReplica(const ReplicatingSubscription&);
+
+    /** Skip replication of ids to queue on backup. */
+    void skip(const types::Uuid& backup,
+              const boost::shared_ptr<broker::Queue>& queue,
+              const ReplicationIdSet& ids);
 
-    // Called via ConfigurationObserver
+    // Called via BrokerObserver
     void queueCreate(const QueuePtr&);
     void queueDestroy(const QueuePtr&);
     void exchangeCreate(const ExchangePtr&);
     void exchangeDestroy(const ExchangePtr&);
+    void startTx(const boost::shared_ptr<broker::TxBuffer>&);
+    void startDtx(const boost::shared_ptr<broker::DtxBuffer>&);
 
     // Called via ConnectionObserver
     void opened(broker::Connection& connection);
@@ -95,11 +106,18 @@ class Primary : public Role
     void timeoutExpectedBackups();
 
   private:
-    typedef qpid::sys::unordered_map<
-      types::Uuid, RemoteBackupPtr, types::Uuid::Hasher > BackupMap;
+    typedef sys::unordered_map<
+      types::Uuid, RemoteBackupPtr, Hasher<types::Uuid> > BackupMap;
 
     typedef std::set<RemoteBackupPtr > BackupSet;
 
+    typedef std::pair<types::Uuid, boost::shared_ptr<broker::Queue> > UuidQueue;
+    typedef sys::unordered_map<UuidQueue, ReplicatingSubscription*,
+                               Hasher<UuidQueue> > ReplicaMap;
+
+    // Map of PrimaryTxObservers by tx-queue name
+    typedef sys::unordered_map<std::string, boost::weak_ptr<PrimaryTxObserver> > TxMap;
+
     RemoteBackupPtr backupConnect(const BrokerInfo&, broker::Connection&, sys::Mutex::ScopedLock&);
     void backupDisconnect(RemoteBackupPtr, sys::Mutex::ScopedLock&);
 
@@ -107,8 +125,10 @@ class Primary : public Role
     void checkReady();
     void checkReady(RemoteBackupPtr);
     void setCatchupQueues(const RemoteBackupPtr&, bool createGuards);
+    void deduplicate();
+    boost::shared_ptr<PrimaryTxObserver> makeTxObserver();
 
-    sys::Mutex lock;
+    mutable sys::Mutex lock;
     HaBroker& haBroker;
     Membership& membership;
     std::string logPrefix;
@@ -126,9 +146,10 @@ class Primary : public Role
      */
     BackupMap backups;
     boost::shared_ptr<broker::ConnectionObserver> connectionObserver;
-    boost::shared_ptr<broker::ConfigurationObserver> configurationObserver;
+    boost::shared_ptr<broker::BrokerObserver> brokerObserver;
     boost::intrusive_ptr<sys::TimerTask> timerTask;
-    static Primary* instance;
+    ReplicaMap replicas;
+    TxMap txMap;
 };
 }} // namespace qpid::ha
 

Modified: qpid/branches/linearstore/qpid/cpp/src/qpid/ha/QueueGuard.h
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/ha/QueueGuard.h?rev=1525101&r1=1525100&r2=1525101&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/ha/QueueGuard.h (original)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/ha/QueueGuard.h Fri Sep 20 18:59:30 2013
@@ -89,7 +89,7 @@ class QueueGuard {
     class QueueObserver;
     typedef qpid::sys::unordered_map<ReplicationId,
                                      boost::intrusive_ptr<broker::AsyncCompletion>,
-                                     TrivialHasher<ReplicationId> > Delayed;
+                                     Hasher<ReplicationId> > Delayed;
 
     bool complete(ReplicationId, sys::Mutex::ScopedLock &);
     void complete(Delayed::iterator, sys::Mutex::ScopedLock &);

Modified: qpid/branches/linearstore/qpid/cpp/src/qpid/ha/QueueReplicator.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/ha/QueueReplicator.cpp?rev=1525101&r1=1525100&r2=1525101&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/ha/QueueReplicator.cpp (original)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/ha/QueueReplicator.cpp Fri Sep 20 18:59:30 2013
@@ -19,12 +19,13 @@
  *
  */
 
-#include "makeMessage.h"
+#include "Event.h"
 #include "HaBroker.h"
 #include "QueueReplicator.h"
 #include "QueueSnapshots.h"
 #include "ReplicatingSubscription.h"
 #include "Settings.h"
+#include "types.h"
 #include "qpid/broker/Bridge.h"
 #include "qpid/broker/Broker.h"
 #include "qpid/broker/Link.h"
@@ -38,36 +39,31 @@
 #include "qpid/Msg.h"
 #include "qpid/assert.h"
 #include <boost/shared_ptr.hpp>
+#include <boost/bind.hpp>
 
-namespace {
-const std::string QPID_REPLICATOR_("qpid.replicator-");
-const std::string TYPE_NAME("qpid.queue-replicator");
-const std::string QPID_HA("qpid.ha-");
-}
 
 namespace qpid {
 namespace ha {
 using namespace broker;
 using namespace framing;
 using namespace std;
+using std::exception;
 using sys::Mutex;
 
-const std::string QueueReplicator::DEQUEUE_EVENT_KEY(QPID_HA+"dequeue");
-const std::string QueueReplicator::ID_EVENT_KEY(QPID_HA+"id");
 const std::string QueueReplicator::QPID_SYNC_FREQUENCY("qpid.sync_frequency");
 
-std::string QueueReplicator::replicatorName(const std::string& queueName) {
-    return QPID_REPLICATOR_ + queueName;
+namespace {
+const string QPID_HA(QPID_HA_PREFIX);
+const std::string TYPE_NAME(QPID_HA+"queue-replicator");
 }
 
-bool QueueReplicator::isReplicatorName(const std::string& name) {
-    return name.compare(0, QPID_REPLICATOR_.size(), QPID_REPLICATOR_) == 0;
+
+std::string QueueReplicator::replicatorName(const std::string& queueName) {
+    return QUEUE_REPLICATOR_PREFIX + queueName;
 }
 
-bool QueueReplicator::isEventKey(const std::string key) {
-    const std::string& prefix = QPID_HA;
-    bool ret = key.size() > prefix.size() && key.compare(0, prefix.size(), prefix) == 0;
-    return ret;
+bool QueueReplicator::isReplicatorName(const std::string& name) {
+    return startsWith(name, QUEUE_REPLICATOR_PREFIX);
 }
 
 class QueueReplicator::ErrorListener : public SessionHandler::ErrorListener {
@@ -109,12 +105,15 @@ QueueReplicator::QueueReplicator(HaBroke
                                  boost::shared_ptr<Link> l)
     : Exchange(replicatorName(q->getName()), 0, q->getBroker()),
       haBroker(hb),
+      brokerInfo(hb.getBrokerInfo()),
+      link(l),
+      queue(q),
+      sessionHandler(0),
       logPrefix("Backup of "+q->getName()+": "),
-      queue(q), link(l), brokerInfo(hb.getBrokerInfo()), subscribed(false),
-      settings(hb.getSettings()), destroyed(false),
+      subscribed(false),
+      settings(hb.getSettings()),
       nextId(0), maxId(0)
 {
-    QPID_LOG(debug, logPrefix << "Created");
     args.setString(QPID_REPLICATE, printable(NONE).str());
     Uuid uuid(true);
     bridgeName = replicatorName(q->getName()) + std::string(".") + uuid.str();
@@ -122,12 +121,18 @@ QueueReplicator::QueueReplicator(HaBroke
     args.setString(QPID_REPLICATE, printable(NONE).str());
     setArgs(args);
     if (q->isAutoDelete()) q->markInUse();
+
+    dispatch[DequeueEvent::KEY] =
+        boost::bind(&QueueReplicator::dequeueEvent, this, _1, _2);
+    dispatch[IdEvent::KEY] =
+        boost::bind(&QueueReplicator::idEvent, this, _1, _2);
 }
 
 // This must be called immediately after the constructor.
 // It has to be separate so we can call shared_from_this().
 void QueueReplicator::activate() {
     Mutex::ScopedLock l(lock);
+    QPID_LOG(debug, logPrefix << "Created");
     if (!queue) return;         // Already destroyed
 
     // Enable callback to route()
@@ -163,6 +168,11 @@ void QueueReplicator::activate() {
         boost::shared_ptr<QueueObserver>(new QueueObserver(shared_from_this())));
 }
 
+void QueueReplicator::disconnect() {
+    Mutex::ScopedLock l(lock);
+    sessionHandler = 0;
+}
+
 QueueReplicator::~QueueReplicator() {}
 
 // Called from Queue::destroyed()
@@ -170,25 +180,24 @@ void QueueReplicator::destroy() {
     boost::shared_ptr<Bridge> bridge2; // To call outside of lock
     {
         Mutex::ScopedLock l(lock);
-        if (destroyed) return;
-        destroyed = true;
+        if (!queue) return;     // Already destroyed
         QPID_LOG(debug, logPrefix << "Destroyed");
+        bridge2 = bridge;       // call close outside the lock.
         // Need to drop shared pointers to avoid pointer cycles keeping this in memory.
         queue.reset();
-        link.reset();
         bridge.reset();
         getBroker()->getExchanges().destroy(getName());
-        bridge2 = bridge;
     }
     if (bridge2) bridge2->close(); // Outside of lock, avoid deadlock.
 }
 
 // Called in a broker connection thread when the bridge is created.
 // Note: called with the Link lock held.
-void QueueReplicator::initializeBridge(Bridge& bridge, SessionHandler& sessionHandler) {
+void QueueReplicator::initializeBridge(Bridge& bridge, SessionHandler& sessionHandler_) {
     Mutex::ScopedLock l(lock);
-    if (destroyed) return;         // Already destroyed
-    AMQP_ServerProxy peer(sessionHandler.out);
+    if (!queue) return;         // Already destroyed
+    sessionHandler = &sessionHandler_;
+    AMQP_ServerProxy peer(sessionHandler->out);
     const qmf::org::apache::qpid::broker::ArgsLinkBridge& args(bridge.getArgs());
     FieldTable arguments;
     arguments.setInt(ReplicatingSubscription::QPID_REPLICATING_SUBSCRIPTION, 1);
@@ -224,44 +233,57 @@ template <class T> T decodeContent(Messa
 }
 }
 
-void QueueReplicator::dequeue(const ReplicationIdSet& dequeues, Mutex::ScopedLock&) {
-    QPID_LOG(trace, logPrefix << "Dequeue " << dequeues);
+void QueueReplicator::dequeueEvent(const string& data, Mutex::ScopedLock&) {
+    DequeueEvent e;
+    decodeStr(data, e);
+    QPID_LOG(trace, logPrefix << "Dequeue " << e.ids);
     //TODO: should be able to optimise the following
-    for (ReplicationIdSet::iterator i = dequeues.begin(); i != dequeues.end(); ++i) {
+    for (ReplicationIdSet::iterator i = e.ids.begin(); i != e.ids.end(); ++i) {
         PositionMap::iterator j = positions.find(*i);
         if (j != positions.end()) queue->dequeueMessageAt(j->second);
     }
 }
 
 // Called in connection thread of the queues bridge to primary.
-void QueueReplicator::route(Deliverable& msg)
+void QueueReplicator::route(Deliverable& deliverable)
 {
     try {
         Mutex::ScopedLock l(lock);
-        if (destroyed) return;
-        const std::string& key = msg.getMessage().getRoutingKey();
-        if (!isEventKey(key)) { // Replicated message
+        if (!queue) return;     // Already destroyed
+        broker::Message& message(deliverable.getMessage());
+        string key(message.getRoutingKey());
+        if (!isEventKey(message.getRoutingKey())) {
             ReplicationId id = nextId++;
             maxId = std::max(maxId, id);
-            msg.getMessage().setReplicationId(id);
-            msg.deliverTo(queue);
+            message.setReplicationId(id);
+            deliver(message);
             QueuePosition position = queue->getPosition();
             positions[id] = position;
             QPID_LOG(trace, logPrefix << "Enqueued " << LogMessageId(*queue,position,id));
         }
-        else if (key == DEQUEUE_EVENT_KEY) {
-            dequeue(decodeContent<ReplicationIdSet>(msg.getMessage()), l);
-        }
-        else if (key == ID_EVENT_KEY) {
-            nextId = decodeContent<ReplicationId>(msg.getMessage());
+        else {
+            DispatchMap::iterator i = dispatch.find(key);
+            if (i == dispatch.end()) {
+                QPID_LOG(info, logPrefix << "Ignoring unknown event: " << key);
+            }
+            else {
+                (i->second)(message.getContent(), l);
+            }
         }
-        // Ignore unknown event keys, may be introduced in later versions.
     }
     catch (const std::exception& e) {
         haBroker.shutdown(QPID_MSG(logPrefix << "Replication failed: " << e.what()));
     }
 }
 
+void QueueReplicator::deliver(const broker::Message& m) {
+    queue->deliver(m);
+}
+
+void QueueReplicator::idEvent(const string& data, Mutex::ScopedLock&) {
+    nextId = decodeStr<IdEvent>(data).id;
+}
+
 ReplicationId QueueReplicator::getMaxId() {
     Mutex::ScopedLock l(lock);
     return maxId;
@@ -273,4 +295,5 @@ bool QueueReplicator::unbind(boost::shar
 bool QueueReplicator::isBound(boost::shared_ptr<Queue>, const std::string* const, const FieldTable* const) { return false; }
 std::string QueueReplicator::getType() const { return TYPE_NAME; }
 
+
 }} // namespace qpid::broker

Modified: qpid/branches/linearstore/qpid/cpp/src/qpid/ha/QueueReplicator.h
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/ha/QueueReplicator.h?rev=1525101&r1=1525100&r2=1525101&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/ha/QueueReplicator.h (original)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/ha/QueueReplicator.h Fri Sep 20 18:59:30 2013
@@ -26,6 +26,7 @@
 #include "hash.h"
 #include "qpid/broker/Exchange.h"
 #include <boost/enable_shared_from_this.hpp>
+#include <boost/function.hpp>
 #include <iosfwd>
 
 namespace qpid {
@@ -56,30 +57,24 @@ class QueueReplicator : public broker::E
                         public boost::enable_shared_from_this<QueueReplicator>
 {
   public:
-    static const std::string DEQUEUE_EVENT_KEY;
-    static const std::string ID_EVENT_KEY;
     static const std::string QPID_SYNC_FREQUENCY;
+    static const std::string REPLICATOR_PREFIX;
 
     static std::string replicatorName(const std::string& queueName);
     static bool isReplicatorName(const std::string&);
 
-    /** Test if a string is an event key */
-    static bool isEventKey(const std::string key);
-
     QueueReplicator(HaBroker&,
                     boost::shared_ptr<broker::Queue> q,
                     boost::shared_ptr<broker::Link> l);
 
     ~QueueReplicator();
 
-    void activate();            // Must be called immediately after constructor.
+    void activate();        // Must be called immediately after constructor.
+    void disconnect();      // Called when we are disconnected from the primary.
 
     std::string getType() const;
-    bool bind(boost::shared_ptr<broker::Queue
-              >, const std::string&, const framing::FieldTable*);
-    bool unbind(boost::shared_ptr<broker::Queue>, const std::string&, const framing::FieldTable*);
+
     void route(broker::Deliverable&);
-    bool isBound(boost::shared_ptr<broker::Queue>, const std::string* const, const framing::FieldTable* const);
 
     // Set if the queue has ever been subscribed to, used for auto-delete cleanup.
     void setSubscribed() { subscribed = true; }
@@ -89,27 +84,44 @@ class QueueReplicator : public broker::E
 
     ReplicationId getMaxId();
 
-  private:
-    typedef qpid::sys::unordered_map<ReplicationId, QueuePosition, TrivialHasher<int32_t> > PositionMap;
+    // No-op unused Exchange virtual functions.
+    bool bind(boost::shared_ptr<broker::Queue>, const std::string&, const framing::FieldTable*);
+    bool unbind(boost::shared_ptr<broker::Queue>, const std::string&, const framing::FieldTable*);
+    bool isBound(boost::shared_ptr<broker::Queue>, const std::string* const, const framing::FieldTable* const);
+
+  protected:
+    typedef boost::function<void(const std::string&, sys::Mutex::ScopedLock&)> DispatchFn;
+    typedef qpid::sys::unordered_map<std::string, DispatchFn> DispatchMap;
+
+    virtual void deliver(const broker::Message&);
+    virtual void destroy();             // Called when the queue is destroyed.
 
+    sys::Mutex lock;
+    HaBroker& haBroker;
+    const BrokerInfo brokerInfo;
+    DispatchMap dispatch;
+    boost::shared_ptr<broker::Link> link;
+    boost::shared_ptr<broker::Bridge> bridge;
+    boost::shared_ptr<broker::Queue> queue;
+    broker::SessionHandler* sessionHandler;
+
+  private:
+    typedef qpid::sys::unordered_map<
+      ReplicationId, QueuePosition, Hasher<ReplicationId> > PositionMap;
     class ErrorListener;
     class QueueObserver;
 
     void initializeBridge(broker::Bridge& bridge, broker::SessionHandler& sessionHandler);
-    void destroy();             // Called when the queue is destroyed.
-    void dequeue(const ReplicationIdSet&, sys::Mutex::ScopedLock&);
 
-    HaBroker& haBroker;
+    // Dispatch functions
+    void dequeueEvent(const std::string& data, sys::Mutex::ScopedLock&);
+    void idEvent(const std::string& data, sys::Mutex::ScopedLock&);
+
     std::string logPrefix;
     std::string bridgeName;
-    sys::Mutex lock;
-    boost::shared_ptr<broker::Queue> queue;
-    boost::shared_ptr<broker::Link> link;
-    boost::shared_ptr<broker::Bridge> bridge;
-    BrokerInfo brokerInfo;
+
     bool subscribed;
     const Settings& settings;
-    bool destroyed;
     PositionMap positions;
     ReplicationIdSet idSet; // Set of replicationIds on the queue.
     ReplicationId nextId;   // ID for next message to arrive.

Modified: qpid/branches/linearstore/qpid/cpp/src/qpid/ha/QueueSnapshots.h
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/ha/QueueSnapshots.h?rev=1525101&r1=1525100&r2=1525101&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/ha/QueueSnapshots.h (original)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/ha/QueueSnapshots.h Fri Sep 20 18:59:30 2013
@@ -27,7 +27,7 @@
 #include "hash.h"
 
 #include "qpid/assert.h"
-#include "qpid/broker/ConfigurationObserver.h"
+#include "qpid/broker/BrokerObserver.h"
 #include "qpid/broker/Queue.h"
 #include "qpid/sys/Mutex.h"
 
@@ -37,10 +37,10 @@ namespace qpid {
 namespace ha {
 
 /**
- * ConfigurationObserver that maintains a map of the QueueSnapshot for each queue.
+ * BrokerObserver that maintains a map of the QueueSnapshot for each queue.
  * THREAD SAFE.
  */
-class QueueSnapshots : public broker::ConfigurationObserver
+class QueueSnapshots : public broker::BrokerObserver
 {
   public:
     boost::shared_ptr<QueueSnapshot> get(const boost::shared_ptr<broker::Queue>& q) const {
@@ -49,7 +49,7 @@ class QueueSnapshots : public broker::Co
         return i != snapshots.end() ? i->second : boost::shared_ptr<QueueSnapshot>();
     }
 
-    // ConfigurationObserver overrides.
+    // BrokerObserver overrides.
     void queueCreate(const boost::shared_ptr<broker::Queue>& q) {
         sys::Mutex::ScopedLock l(lock);
         boost::shared_ptr<QueueSnapshot> observer(new QueueSnapshot);
@@ -69,7 +69,7 @@ class QueueSnapshots : public broker::Co
   private:
     typedef qpid::sys::unordered_map<boost::shared_ptr<broker::Queue>,
                                      boost::shared_ptr<QueueSnapshot>,
-                                     SharedPtrHasher<broker::Queue>
+                                     Hasher<boost::shared_ptr<broker::Queue> >
                                      > SnapshotMap;
     SnapshotMap snapshots;
     mutable sys::Mutex lock;

Modified: qpid/branches/linearstore/qpid/cpp/src/qpid/ha/RemoteBackup.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/ha/RemoteBackup.cpp?rev=1525101&r1=1525100&r2=1525101&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/ha/RemoteBackup.cpp (original)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/ha/RemoteBackup.cpp Fri Sep 20 18:59:30 2013
@@ -20,6 +20,7 @@
  */
 #include "RemoteBackup.h"
 #include "QueueGuard.h"
+#include "TxReplicator.h"
 #include "qpid/broker/Broker.h"
 #include "qpid/broker/Connection.h"
 #include "qpid/broker/Queue.h"
@@ -108,13 +109,13 @@ void RemoteBackup::ready(const QueuePtr&
         QPID_LOG(debug, logPrefix << "Caught up on queue: " << q->getName() );
 }
 
-// Called via ConfigurationObserver::queueCreate and from catchupQueue
+// Called via BrokerObserver::queueCreate and from catchupQueue
 void RemoteBackup::queueCreate(const QueuePtr& q) {
     if (replicationTest.getLevel(*q) == ALL)
         guards[q].reset(new QueueGuard(*q, brokerInfo));
 }
 
-// Called via ConfigurationObserver
+// Called via BrokerObserver
 void RemoteBackup::queueDestroy(const QueuePtr& q) {
     catchupQueues.erase(q);
     GuardMap::iterator i = guards.find(q);

Modified: qpid/branches/linearstore/qpid/cpp/src/qpid/ha/RemoteBackup.h
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/ha/RemoteBackup.h?rev=1525101&r1=1525100&r2=1525101&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/ha/RemoteBackup.h (original)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/ha/RemoteBackup.h Fri Sep 20 18:59:30 2013
@@ -71,10 +71,10 @@ class RemoteBackup
      */
     void ready(const QueuePtr& queue);
 
-    /** Called via ConfigurationObserver */
+    /** Called via BrokerObserver */
     void queueCreate(const QueuePtr&);
 
-    /** Called via ConfigurationObserver. Note: may set isReady() */
+    /** Called via BrokerObserver. Note: may set isReady() */
     void queueDestroy(const QueuePtr&);
 
     /**@return true when all catch-up queues for this backup are ready. */
@@ -96,8 +96,10 @@ class RemoteBackup
     void startCatchup() { started = true; }
 
   private:
-    typedef qpid::sys::unordered_map<QueuePtr, GuardPtr,
-                                     SharedPtrHasher<broker::Queue> > GuardMap;
+    typedef qpid::sys::unordered_map<
+      QueuePtr, GuardPtr, Hasher<boost::shared_ptr<broker::Queue> >
+      > GuardMap;
+
     typedef std::set<QueuePtr> QueueSet;
 
     std::string logPrefix;

Modified: qpid/branches/linearstore/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp?rev=1525101&r1=1525100&r2=1525101&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp (original)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp Fri Sep 20 18:59:30 2013
@@ -19,7 +19,7 @@
  *
  */
 
-#include "makeMessage.h"
+#include "Event.h"
 #include "IdSetter.h"
 #include "QueueGuard.h"
 #include "QueueReplicator.h"
@@ -111,7 +111,8 @@ ReplicatingSubscription::ReplicatingSubs
 ) : ConsumerImpl(parent, name, queue, ack, REPLICATOR, exclusive, tag,
                  resumeId, resumeTtl, arguments),
     position(0), ready(false), cancelled(false),
-    haBroker(hb)
+    haBroker(hb),
+    primary(boost::dynamic_pointer_cast<Primary>(haBroker.getRole()))
 {
     try {
         FieldTable ft;
@@ -137,7 +138,7 @@ ReplicatingSubscription::ReplicatingSubs
         }
 
         // If there's already a guard (we are in failover) use it, else create one.
-        if (Primary::get()) guard = Primary::get()->getGuard(queue, info);
+        if (primary) guard = primary->getGuard(queue, info);
         if (!guard) guard.reset(new QueueGuard(*queue, info));
 
         // NOTE: Once the observer is attached we can have concurrent
@@ -148,20 +149,19 @@ ReplicatingSubscription::ReplicatingSubs
         // between the snapshot and attaching the observer.
         observer.reset(new QueueObserver(*this));
         queue->addObserver(observer);
-        ReplicationIdSet primary = haBroker.getQueueSnapshots()->get(queue)->snapshot();
+        ReplicationIdSet primaryIds = haBroker.getQueueSnapshots()->get(queue)->snapshot();
         std::string backupStr = arguments.getAsString(ReplicatingSubscription::QPID_ID_SET);
-        ReplicationIdSet backup;
-        if (!backupStr.empty()) backup = decodeStr<ReplicationIdSet>(backupStr);
+        ReplicationIdSet backupIds;
+        if (!backupStr.empty()) backupIds = decodeStr<ReplicationIdSet>(backupStr);
 
         // Initial dequeues are messages on backup but not on primary.
-        ReplicationIdSet initDequeues = backup - primary;
+        ReplicationIdSet initDequeues = backupIds - primaryIds;
         QueuePosition front,back;
         queue->getRange(front, back, broker::REPLICATOR); // Outside lock, getRange locks queue
         {
             sys::Mutex::ScopedLock l(lock); // Concurrent calls to dequeued()
             dequeues += initDequeues;       // Messages on backup that are not on primary.
-            skip = backup - initDequeues;   // Messages already on the backup.
-
+            skip = backupIds - initDequeues; // Messages already on the backup.
             // Queue front is moving but we know this subscriptions will start at a
             // position >= front so if front is safe then position must be.
             position = front;
@@ -189,6 +189,7 @@ ReplicatingSubscription::~ReplicatingSub
 //
 void ReplicatingSubscription::initialize() {
     try {
+        if (primary) primary->addReplica(*this);
         Mutex::ScopedLock l(lock); // Note dequeued() can be called concurrently.
         // Send initial dequeues to the backup.
         // There must be a shared_ptr(this) when sending.
@@ -216,9 +217,8 @@ bool ReplicatingSubscription::deliver(
     try {
         bool result = false;
         if (skip.contains(id)) {
+            QPID_LOG(trace, logPrefix << "Skip " << LogMessageId(*getQueue(), m));
             skip -= id;
-            QPID_LOG(trace, logPrefix << "On backup, skip " <<
-                     LogMessageId(*getQueue(), m));
             guard->complete(id); // This will never be acknowledged.
             notify();
             result = true;
@@ -238,16 +238,13 @@ bool ReplicatingSubscription::deliver(
     }
 }
 
-/**
- *@param position: must be <= last position seen by subscription.
- */
 void ReplicatingSubscription::checkReady(sys::Mutex::ScopedLock& l) {
     if (!ready && isGuarded(l) && unready.empty()) {
         ready = true;
         sys::Mutex::ScopedUnlock u(lock);
         // Notify Primary that a subscription is ready.
         QPID_LOG(debug, logPrefix << "Caught up");
-        if (Primary::get()) Primary::get()->readyReplica(*this);
+        if (primary) primary->readyReplica(*this);
     }
 }
 
@@ -260,6 +257,7 @@ void ReplicatingSubscription::cancel()
         cancelled = true;
     }
     QPID_LOG(debug, logPrefix << "Cancelled");
+    if (primary) primary->removeReplica(*this);
     getQueue()->removeObserver(observer);
     guard->cancel();
     ConsumerImpl::cancel();
@@ -285,9 +283,7 @@ void ReplicatingSubscription::sendDequeu
 {
     if (dequeues.empty()) return;
     QPID_LOG(trace, logPrefix << "Sending dequeues " << dequeues);
-    string buffer = encodeStr(dequeues);
-    dequeues.clear();
-    sendEvent(QueueReplicator::DEQUEUE_EVENT_KEY, buffer, l);
+    sendEvent(DequeueEvent(dequeues), l);
 }
 
 // Called after the message has been removed
@@ -307,23 +303,16 @@ void ReplicatingSubscription::dequeued(R
 // Called with lock held. Called in subscription's connection thread.
 void ReplicatingSubscription::sendIdEvent(ReplicationId pos, Mutex::ScopedLock& l)
 {
-    sendEvent(QueueReplicator::ID_EVENT_KEY, encodeStr(pos), l);
+    sendEvent(IdEvent(pos), l);
 }
 
-void ReplicatingSubscription::sendEvent(const std::string& key,
-                                        const std::string& buffer,
-                                        Mutex::ScopedLock&)
+void ReplicatingSubscription::sendEvent(const Event& event, Mutex::ScopedLock&)
 {
     Mutex::ScopedUnlock u(lock);
-    broker::Message message = makeMessage(buffer);
-    MessageTransfer& transfer = MessageTransfer::get(message);
-    DeliveryProperties* props =
-        transfer.getFrames().getHeaders()->get<DeliveryProperties>(true);
-    props->setRoutingKey(key);
     // Send the event directly to the base consumer implementation.  The dummy
     // consumer prevents acknowledgements being handled, which is what we want
     // for events
-    ConsumerImpl::deliver(QueueCursor(), message, boost::shared_ptr<Consumer>());
+    ConsumerImpl::deliver(QueueCursor(), event.message(), boost::shared_ptr<Consumer>());
 }
 
 // Called in subscription's connection thread.
@@ -342,4 +331,9 @@ bool ReplicatingSubscription::doDispatch
     }
 }
 
+void ReplicatingSubscription::addSkip(const ReplicationIdSet& ids) {
+    Mutex::ScopedLock l(lock);
+    skip += ids;
+}
+
 }} // namespace qpid::ha

Modified: qpid/branches/linearstore/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h?rev=1525101&r1=1525100&r2=1525101&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h (original)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h Fri Sep 20 18:59:30 2013
@@ -25,7 +25,6 @@
 #include "BrokerInfo.h"
 #include "qpid/broker/SemanticState.h"
 #include "qpid/broker/ConsumerFactory.h"
-#include "qpid/types/Uuid.h"
 #include <iosfwd>
 
 namespace qpid {
@@ -44,6 +43,8 @@ class Buffer;
 namespace ha {
 class QueueGuard;
 class HaBroker;
+class Event;
+class Primary;
 
 /**
  * A susbcription that replicates to a remote backup.
@@ -118,6 +119,9 @@ class ReplicatingSubscription : public b
 
     BrokerInfo getBrokerInfo() const { return info; }
 
+    /** Skip replicating enqueue of of ids. */
+    void addSkip(const ReplicationIdSet& ids);
+
   protected:
     bool doDispatch();
 
@@ -128,7 +132,7 @@ class ReplicatingSubscription : public b
     std::string logPrefix;
     QueuePosition position;
     ReplicationIdSet dequeues;  // Dequeues to be sent in next dequeue event.
-    ReplicationIdSet skip;      // Messages already on backup will be skipped.
+    ReplicationIdSet skip;   // Skip enqueues: messages already on backup and tx enqueues.
     ReplicationIdSet unready;   // Unguarded, replicated and un-acknowledged.
     bool ready;
     bool cancelled;
@@ -136,12 +140,13 @@ class ReplicatingSubscription : public b
     boost::shared_ptr<QueueGuard> guard;
     HaBroker& haBroker;
     boost::shared_ptr<QueueObserver> observer;
+    boost::shared_ptr<Primary> primary;
 
     bool isGuarded(sys::Mutex::ScopedLock&);
     void dequeued(ReplicationId);
     void sendDequeueEvent(sys::Mutex::ScopedLock&);
     void sendIdEvent(ReplicationId, sys::Mutex::ScopedLock&);
-    void sendEvent(const std::string& key, const std::string& data, sys::Mutex::ScopedLock&);
+    void sendEvent(const Event&, sys::Mutex::ScopedLock&);
     void checkReady(sys::Mutex::ScopedLock&);
   friend class Factory;
 };

Modified: qpid/branches/linearstore/qpid/cpp/src/qpid/ha/ReplicationTest.h
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/ha/ReplicationTest.h?rev=1525101&r1=1525100&r2=1525101&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/ha/ReplicationTest.h (original)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/ha/ReplicationTest.h Fri Sep 20 18:59:30 2013
@@ -41,6 +41,13 @@ namespace ha {
 /**
  * Test whether something is replicated, taking into account the
  * default replication level.
+ *
+ * The primary uses a ReplicationTest with default based on configuration
+ * settings, and marks objects to be replicated with an explict replication
+ * argument.
+ *
+ * The backup uses a default of NONE, so it always accepts what the primary has
+ * marked on the object.
  */
 class ReplicationTest
 {

Modified: qpid/branches/linearstore/qpid/cpp/src/qpid/ha/StatusCheck.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/ha/StatusCheck.cpp?rev=1525101&r1=1525100&r2=1525101&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/ha/StatusCheck.cpp (original)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/ha/StatusCheck.cpp Fri Sep 20 18:59:30 2013
@@ -20,6 +20,8 @@
  */
 #include "StatusCheck.h"
 #include "ConnectionObserver.h"
+#include "HaBroker.h"
+#include "qpid/broker/Broker.h"
 #include "qpid/log/Statement.h"
 #include "qpid/messaging/Address.h"
 #include "qpid/messaging/Connection.h"
@@ -41,27 +43,32 @@ const string HA_BROKER = "org.apache.qpi
 
 class StatusCheckThread : public sys::Runnable {
   public:
-    StatusCheckThread(StatusCheck& sc, const qpid::Address& addr, const BrokerInfo& self)
-        : url(addr), statusCheck(sc), brokerInfo(self) {}
+    StatusCheckThread(StatusCheck& sc, const qpid::Address& addr)
+        : url(addr), statusCheck(sc) {}
     void run();
   private:
     Url url;
     StatusCheck& statusCheck;
-    BrokerInfo brokerInfo;
 };
 
 void StatusCheckThread::run() {
-    QPID_LOG(debug, statusCheck.logPrefix << "Checking status of " << url);
+    string logPrefix("Status check " + url.str() + ": ");
     Connection c;
     try {
+        // Check for self connections
         Variant::Map options, clientProperties;
-        clientProperties = brokerInfo.asMap(); // Detect self connections.
         clientProperties[ConnectionObserver::ADMIN_TAG] = 1; // Allow connection to backups.
         clientProperties[ConnectionObserver::ADDRESS_TAG] = url.str();
-        clientProperties[ConnectionObserver::BACKUP_TAG] = brokerInfo.asMap();
+        clientProperties[ConnectionObserver::BACKUP_TAG] = statusCheck.haBroker.getBrokerInfo().asMap();
 
+        // Set connection options
+        Settings settings(statusCheck.haBroker.getSettings());
+        if (settings.username.size()) options["username"] = settings.username;
+        if (settings.password.size()) options["password"] = settings.password;
+        if (settings.mechanism.size()) options["sasl_mechanisms"] = settings.mechanism;
         options["client-properties"] = clientProperties;
-        options["heartbeat"] = statusCheck.linkHeartbeatInterval/sys::TIME_SEC;
+        sys::Duration heartbeat(statusCheck.haBroker.getBroker().getOptions().linkHeartbeatInterval);
+        options["heartbeat"] = heartbeat/sys::TIME_SEC;
         c = Connection(url.str(), options);
 
         c.open();
@@ -81,7 +88,7 @@ void StatusCheckThread::run() {
         content["_object_id"] = oid;
         encode(content, request);
         s.send(request);
-        messaging::Duration timeout(statusCheck.linkHeartbeatInterval/sys::TIME_MSEC);
+        messaging::Duration timeout(heartbeat/sys::TIME_MSEC);
         Message response = r.fetch(timeout);
         session.acknowledge();
         Variant::List contentIn;
@@ -89,29 +96,24 @@ void StatusCheckThread::run() {
         if (contentIn.size() == 1) {
             Variant::Map details = contentIn.front().asMap()["_values"].asMap();
             string status = details["status"].getString();
+            QPID_LOG(debug, logPrefix << status);
             if (status != "joining") {
                 statusCheck.setPromote(false);
-                QPID_LOG(info, statusCheck.logPrefix << "Status of " << url << " is "
-                         << status << ", this broker will refuse promotion.");
+                QPID_LOG(info, logPrefix << "Joining established cluster");
             }
-            QPID_LOG(debug, statusCheck.logPrefix << "Status of " << url << ": " << status);
         }
+        else
+            QPID_LOG(error, logPrefix << "Invalid response " << response.getContent())
     } catch(const exception& error) {
-        QPID_LOG(info, statusCheck.logPrefix << "Checking status of " << url <<  ": " << error.what());
-    }
-    try { c.close(); }
-    catch(const exception&) {
-        QPID_LOG(warning, statusCheck.logPrefix << "Error closing status check connection to " << url);
-    }
-    try { c.close(); }
-    catch(const exception&) {
-        QPID_LOG(warning, "Error closing status check connection to " << url);
+        // Its not an error to fail to connect to self.
+        if (statusCheck.haBroker.getBrokerInfo().getAddress() != url[0])
+            QPID_LOG(warning, logPrefix << error.what());
     }
+    try { c.close(); } catch(...) {}
     delete this;
 }
 
-StatusCheck::StatusCheck(const string& lp, sys::Duration lh, const BrokerInfo& self)
-    : logPrefix(lp), promote(true), linkHeartbeatInterval(lh), brokerInfo(self)
+StatusCheck::StatusCheck(HaBroker& hb) : promote(true), haBroker(hb)
 {}
 
 StatusCheck::~StatusCheck() {
@@ -122,7 +124,7 @@ StatusCheck::~StatusCheck() {
 void StatusCheck::setUrl(const Url& url) {
     Mutex::ScopedLock l(lock);
     for (size_t i = 0; i < url.size(); ++i)
-        threads.push_back(Thread(new StatusCheckThread(*this, url[i], brokerInfo)));
+        threads.push_back(Thread(new StatusCheckThread(*this, url[i])));
 }
 
 bool StatusCheck::canPromote() {



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org