You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2013/11/12 17:58:53 UTC

svn commit: r1541146 - in /qpid/trunk/qpid/cpp/src: ./ qpid/broker/ qpid/ha/ tests/

Author: aconway
Date: Tue Nov 12 16:58:52 2013
New Revision: 1541146

URL: http://svn.apache.org/r1541146
Log:
QPID-5275: HA transactions failing in qpid-cluster-benchmark

The test was failing due to incorrect handling of the transaction lifecycle:
- Failing to handle the automatic rollback of the empty TX at session close.
- Deleting the tx-q before all backups were finished with it.

The fixes include
- Make tx-q auto-delete, deleted only when the TxReplicators cancel their subscriptions.
- Use markInUse/releaseFromUse on the primary to keep the tx-q until the primary is done.
- Count TxReplicators for auto-delete (unlike normal QueueReplicators)
- Improved error handling and log messages
- Handle *incoming* exceptions on a federation link by passing to ErrorListener
- QueueReplicator catches incoming not-found and resource-deleted exceptions
  - close the backup bridge, handle race between subscribe and delete.
- Simplify QueueSnapshots, remove need for snapshot map.

Added:
    qpid/trunk/qpid/cpp/src/qpid/ha/TxReplicatingSubscription.cpp   (with props)
    qpid/trunk/qpid/cpp/src/qpid/ha/TxReplicatingSubscription.h   (with props)
Modified:
    qpid/trunk/qpid/cpp/src/CMakeLists.txt
    qpid/trunk/qpid/cpp/src/qpid/broker/Bridge.cpp
    qpid/trunk/qpid/cpp/src/qpid/broker/Bridge.h
    qpid/trunk/qpid/cpp/src/qpid/broker/Link.cpp
    qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp
    qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.h
    qpid/trunk/qpid/cpp/src/qpid/broker/SessionAdapter.cpp
    qpid/trunk/qpid/cpp/src/qpid/broker/SessionHandler.cpp
    qpid/trunk/qpid/cpp/src/qpid/broker/SessionHandler.h
    qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.h
    qpid/trunk/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp
    qpid/trunk/qpid/cpp/src/qpid/ha/Event.cpp
    qpid/trunk/qpid/cpp/src/qpid/ha/Event.h
    qpid/trunk/qpid/cpp/src/qpid/ha/PrimaryTxObserver.cpp
    qpid/trunk/qpid/cpp/src/qpid/ha/PrimaryTxObserver.h
    qpid/trunk/qpid/cpp/src/qpid/ha/QueueReplicator.cpp
    qpid/trunk/qpid/cpp/src/qpid/ha/QueueReplicator.h
    qpid/trunk/qpid/cpp/src/qpid/ha/QueueSnapshots.h
    qpid/trunk/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp
    qpid/trunk/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h
    qpid/trunk/qpid/cpp/src/qpid/ha/TxReplicator.cpp
    qpid/trunk/qpid/cpp/src/qpid/ha/TxReplicator.h
    qpid/trunk/qpid/cpp/src/tests/brokertest.py
    qpid/trunk/qpid/cpp/src/tests/ha_tests.py

Modified: qpid/trunk/qpid/cpp/src/CMakeLists.txt
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/CMakeLists.txt?rev=1541146&r1=1541145&r2=1541146&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/CMakeLists.txt (original)
+++ qpid/trunk/qpid/cpp/src/CMakeLists.txt Tue Nov 12 16:58:52 2013
@@ -623,6 +623,8 @@ if (BUILD_HA)
         qpid/ha/StandAlone.h
         qpid/ha/StatusCheck.cpp
         qpid/ha/StatusCheck.h
+        qpid/ha/TxReplicatingSubscription.cpp
+        qpid/ha/TxReplicatingSubscription.h
         qpid/ha/PrimaryTxObserver.cpp
         qpid/ha/PrimaryTxObserver.h
         qpid/ha/types.cpp

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Bridge.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Bridge.cpp?rev=1541146&r1=1541145&r2=1541146&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Bridge.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Bridge.cpp Tue Nov 12 16:58:52 2013
@@ -442,6 +442,12 @@ void Bridge::executionException(
     if (errorListener) errorListener->executionException(code, msg);
 }
 
+void Bridge::incomingExecutionException(
+    framing::execution::ErrorCode code, const std::string& msg)
+{
+    if (errorListener) errorListener->incomingExecutionException(code, msg);
+}
+
 void Bridge::detach() {
     detached = true;
     if (errorListener) errorListener->detach();

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Bridge.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Bridge.h?rev=1541146&r1=1541145&r2=1541146&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Bridge.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Bridge.h Tue Nov 12 16:58:52 2013
@@ -111,6 +111,7 @@ class Bridge : public PersistableConfig,
     void connectionException(framing::connection::CloseCode code, const std::string& msg);
     void channelException(framing::session::DetachCode, const std::string& msg);
     void executionException(framing::execution::ErrorCode, const std::string& msg);
+    void incomingExecutionException(framing::execution::ErrorCode, const std::string& msg);
     void detach();
 
     void setErrorListener(boost::shared_ptr<ErrorListener> e) { errorListener = e; }

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Link.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Link.cpp?rev=1541146&r1=1541145&r2=1541146&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Link.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Link.cpp Tue Nov 12 16:58:52 2013
@@ -273,6 +273,7 @@ class DetachedCallback : public SessionH
     void connectionException(framing::connection::CloseCode, const std::string&) {}
     void channelException(framing::session::DetachCode, const std::string&) {}
     void executionException(framing::execution::ErrorCode, const std::string&) {}
+    void incomingExecutionException(framing::execution::ErrorCode, const std::string& ) {}
     void detach() {}
   private:
     const std::string name;

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp?rev=1541146&r1=1541145&r2=1541146&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp Tue Nov 12 16:58:52 2013
@@ -206,9 +206,9 @@ void SemanticState::rollback()
 {
     if (!txBuffer)
         throw CommandInvalidException(QPID_MSG("Session has not been selected for use with transactions"));
+    StartTxOnExit e(*this);
     session.rollbackTx();       // Just to update statistics
     txBuffer->rollback();
-    startTx();                  // Start a new TX automatically.
 }
 
 void SemanticState::selectDtx()

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.h?rev=1541146&r1=1541145&r2=1541146&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.h Tue Nov 12 16:58:52 2013
@@ -125,6 +125,7 @@ class SemanticState : private boost::non
 
     SessionContext& getSession();
     const SessionContext& getSession() const;
+    SessionState& getSessionState() { return session; }
 
     const boost::shared_ptr<ConsumerImpl> find(const std::string& destination) const;
     bool find(const std::string& destination, boost::shared_ptr<ConsumerImpl>&) const;
@@ -200,8 +201,9 @@ class SemanticStateConsumerImpl : public
     protected:
     mutable qpid::sys::Mutex lock;
     SemanticState* const parent;
-    private:
     const boost::shared_ptr<Queue> queue;
+
+    private:
     const bool ackExpected;
     const bool acquire;
     bool blocked;

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/SessionAdapter.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/SessionAdapter.cpp?rev=1541146&r1=1541145&r2=1541146&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/SessionAdapter.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/SessionAdapter.cpp Tue Nov 12 16:58:52 2013
@@ -536,16 +536,17 @@ void SessionAdapter::ExecutionHandlerImp
     //TODO: but currently never used client->server
 }
 
-void SessionAdapter::ExecutionHandlerImpl::exception(uint16_t /*errorCode*/,
+void SessionAdapter::ExecutionHandlerImpl::exception(uint16_t errorCode,
                                                      const SequenceNumber& /*commandId*/,
                                                      uint8_t /*classCode*/,
                                                      uint8_t /*commandCode*/,
                                                      uint8_t /*fieldIndex*/,
-                                                     const std::string& /*description*/,
+                                                     const std::string& description,
                                                      const framing::FieldTable& /*errorInfo*/)
 {
-    //TODO: again, not really used client->server but may be important
-    //for inter-broker links
+    broker::SessionHandler* s = state.getSessionState().getHandler();
+    if (s) s->incomingExecutionException(
+        framing::execution::ErrorCode(errorCode), description);
 }
 
 

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/SessionHandler.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/SessionHandler.cpp?rev=1541146&r1=1541145&r2=1541146&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/SessionHandler.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/SessionHandler.cpp Tue Nov 12 16:58:52 2013
@@ -68,6 +68,13 @@ void SessionHandler::executionException(
         errorListener->executionException(code, msg);
 }
 
+void SessionHandler::incomingExecutionException(
+    framing::execution::ErrorCode code, const std::string& msg)
+{
+    if (errorListener)
+        errorListener->incomingExecutionException(code, msg);
+}
+
 amqp_0_10::Connection& SessionHandler::getConnection() { return connection; }
 
 const amqp_0_10::Connection& SessionHandler::getConnection() const { return connection; }

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/SessionHandler.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/SessionHandler.h?rev=1541146&r1=1541145&r2=1541146&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/SessionHandler.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/SessionHandler.h Tue Nov 12 16:58:52 2013
@@ -46,12 +46,23 @@ class SessionHandler : public qpid::amqp
     class ErrorListener {
       public:
         virtual ~ErrorListener() {}
+
+        /** Called when there is an outgoing connection-exception */
         virtual void connectionException(
             framing::connection::CloseCode code, const std::string& msg) = 0;
+        /** Called when there is an outgoing channel-exception */
         virtual void channelException(
             framing::session::DetachCode, const std::string& msg) = 0;
+        /** Called when there is an outgoing execution-exception */
         virtual void executionException(
             framing::execution::ErrorCode, const std::string& msg) = 0;
+
+        /** Called when there is an incoming execution-exception.
+         * Useful for inter-broker bridges.
+         */
+        virtual void incomingExecutionException(
+            framing::execution::ErrorCode, const std::string& msg) = 0;
+
         /** Called when it is safe to delete the ErrorListener. */
         virtual void detach() = 0;
     };
@@ -77,15 +88,18 @@ class SessionHandler : public qpid::amqp
 
     void setErrorListener(boost::shared_ptr<ErrorListener> e) { errorListener = e; }
 
+    // Called by SessionAdapter
+    void incomingExecutionException(framing::execution::ErrorCode, const std::string& msg);
+
   protected:
-    virtual void setState(const std::string& sessionName, bool force);
-    virtual qpid::SessionState* getState();
-    virtual framing::FrameHandler* getInHandler();
-    virtual void connectionException(framing::connection::CloseCode code, const std::string& msg);
-    virtual void channelException(framing::session::DetachCode, const std::string& msg);
-    virtual void executionException(framing::execution::ErrorCode, const std::string& msg);
-    virtual void detaching();
-    virtual void readyToSend();
+    void setState(const std::string& sessionName, bool force);
+    qpid::SessionState* getState();
+    framing::FrameHandler* getInHandler();
+    void connectionException(framing::connection::CloseCode code, const std::string& msg);
+    void channelException(framing::session::DetachCode, const std::string& msg);
+    void executionException(framing::execution::ErrorCode, const std::string& msg);
+    void detaching();
+    void readyToSend();
 
   private:
     struct SetChannelProxy : public framing::AMQP_ClientProxy { // Proxy that sets the channel.

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.h?rev=1541146&r1=1541145&r2=1541146&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.h Tue Nov 12 16:58:52 2013
@@ -82,6 +82,8 @@ class SessionState : public qpid::Sessio
     void attach(SessionHandler& handler);
     void disableOutput();
 
+    SessionHandler* getHandler() { return handler; }
+
     /** @pre isAttached() */
     framing::AMQP_ClientProxy& getProxy();
 

Modified: qpid/trunk/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp?rev=1541146&r1=1541145&r2=1541146&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp Tue Nov 12 16:58:52 2013
@@ -188,6 +188,12 @@ class BrokerReplicator::ErrorListener : 
     void executionException(framing::execution::ErrorCode, const std::string& msg) {
         QPID_LOG(error, logPrefix << "Execution error: " << msg);
     }
+
+    void incomingExecutionException(
+        framing::execution::ErrorCode, const std::string& msg) {
+        QPID_LOG(error, logPrefix << "Incoming execution error: " << msg);
+    }
+
     void detach() {
         QPID_LOG(debug, logPrefix << "Session detached.");
     }
@@ -453,7 +459,7 @@ void BrokerReplicator::route(Deliverable
         if (msg.getMessage().getPropertyAsString(QMF_CONTENT) == EVENT) {
             for (Variant::List::iterator i = list.begin(); i != list.end(); ++i) {
                 Variant::Map& map = i->asMap();
-                QPID_LOG(trace, "Broker replicator event: " << map);
+                QPID_LOG(debug, "Broker replicator event: " << map);
                 Variant::Map& schema = map[SCHEMA_ID].asMap();
                 Variant::Map& values = map[VALUES].asMap();
                 std::string key = (schema[PACKAGE_NAME].asString() +
@@ -465,7 +471,7 @@ void BrokerReplicator::route(Deliverable
         } else if (msg.getMessage().getPropertyAsString(QMF_OPCODE) == QUERY_RESPONSE) {
             for (Variant::List::iterator i = list.begin(); i != list.end(); ++i) {
                 Variant::Map& map = i->asMap();
-                QPID_LOG(trace, "Broker replicator response: " << map);
+                QPID_LOG(debug, "Broker replicator response: " << map);
                 string type = map[SCHEMA_ID].asMap()[CLASS_NAME].asString();
                 Variant::Map& values = map[VALUES].asMap();
                 framing::FieldTable args;
@@ -758,7 +764,7 @@ const string REPLICATE_DEFAULT="replicat
 // Received the ha-broker configuration object for the primary broker.
 void BrokerReplicator::doResponseHaBroker(Variant::Map& values) {
     try {
-        QPID_LOG(trace, logPrefix << "HA Broker response: " << values);
+        QPID_LOG(debug, logPrefix << "HA Broker response: " << values);
         ReplicateLevel mine = haBroker.getSettings().replicateDefault.get();
         ReplicateLevel primary = replicationTest.getLevel(values[REPLICATE_DEFAULT].asString());
         if (mine != primary)
@@ -882,6 +888,7 @@ string BrokerReplicator::getType() const
 
 void BrokerReplicator::disconnectedExchange(boost::shared_ptr<Exchange> ex) {
     boost::shared_ptr<QueueReplicator> qr(boost::dynamic_pointer_cast<QueueReplicator>(ex));
+    // FIXME aconway 2013-11-01: move logic with releaseFromUse to QueueReplicator
     if (qr) {
         qr->disconnect();
         if (TxReplicator::isTxQueue(qr->getQueue()->getName())) {

Modified: qpid/trunk/qpid/cpp/src/qpid/ha/Event.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/Event.cpp?rev=1541146&r1=1541145&r2=1541146&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/Event.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/Event.cpp Tue Nov 12 16:58:52 2013
@@ -51,7 +51,7 @@ const string TxCommitEvent::KEY(QPID_HA+
 const string TxRollbackEvent::KEY(QPID_HA+"txrb");
 const string TxPrepareOkEvent::KEY(QPID_HA+"txok");
 const string TxPrepareFailEvent::KEY(QPID_HA+"txno");
-const string TxMembersEvent::KEY(QPID_HA+"txmem");
+const string TxBackupsEvent::KEY(QPID_HA+"txmem");
 
 broker::Message makeMessage(
     const string& data, const string& destination, const string& routingKey)

Modified: qpid/trunk/qpid/cpp/src/qpid/ha/Event.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/Event.h?rev=1541146&r1=1541145&r2=1541146&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/Event.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/Event.h Tue Nov 12 16:58:52 2013
@@ -178,14 +178,14 @@ struct TxPrepareFailEvent : public Event
     void print(std::ostream& o) const { o << broker; }
 };
 
-struct TxMembersEvent : public EventBase<TxMembersEvent> {
+struct TxBackupsEvent : public EventBase<TxBackupsEvent> {
     static const std::string KEY;
-    UuidSet members;
-    TxMembersEvent(const UuidSet& s=UuidSet()) : members(s) {}
-    void encode(framing::Buffer& b) const { b.put(members); }
-    void decode(framing::Buffer& b) { b.get(members); }
-    size_t encodedSize() const { return members.encodedSize(); }
-    void print(std::ostream& o) const { o << members; }
+    UuidSet backups;
+    TxBackupsEvent(const UuidSet& s=UuidSet()) : backups(s) {}
+    void encode(framing::Buffer& b) const { b.put(backups); }
+    void decode(framing::Buffer& b) { b.get(backups); }
+    size_t encodedSize() const { return backups.encodedSize(); }
+    void print(std::ostream& o) const { o << backups; }
 };
 
 }} // namespace qpid::ha

Modified: qpid/trunk/qpid/cpp/src/qpid/ha/PrimaryTxObserver.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/PrimaryTxObserver.cpp?rev=1541146&r1=1541145&r2=1541146&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/PrimaryTxObserver.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/PrimaryTxObserver.cpp Tue Nov 12 16:58:52 2013
@@ -30,6 +30,7 @@
 
 #include "qpid/broker/Broker.h"
 #include "qpid/broker/Queue.h"
+#include "qpid/framing/reply_exceptions.h"
 #include <boost/lexical_cast.hpp>
 #include <algorithm>
 
@@ -40,8 +41,9 @@ class FieldTable;
 namespace ha {
 
 using namespace std;
-using namespace qpid::broker;
-using namespace qpid::framing;
+using namespace sys;
+using namespace broker;
+using namespace framing;
 using types::Uuid;
 
 // Exchange to receive prepare OK events.
@@ -51,6 +53,7 @@ class PrimaryTxObserver::Exchange : publ
         broker::Exchange(tx_->getExchangeName()),
         tx(tx_)
     {
+        args.setString(QPID_REPLICATE, printable(NONE).str()); // Set replication arg.
         dispatch[TxPrepareOkEvent::KEY] =
             boost::bind(&PrimaryTxObserver::txPrepareOkEvent, tx, _1);
         dispatch[TxPrepareFailEvent::KEY] =
@@ -72,7 +75,7 @@ class PrimaryTxObserver::Exchange : publ
   private:
     static const string TYPE_NAME;
     typedef boost::function<void(const std::string&)> DispatchFn;
-    typedef qpid::sys::unordered_map<std::string, DispatchFn> DispatchMap;
+    typedef unordered_map<std::string, DispatchFn> DispatchMap;
 
     DispatchMap dispatch;
     boost::shared_ptr<PrimaryTxObserver> tx;
@@ -83,50 +86,62 @@ const string PrimaryTxObserver::Exchange
 PrimaryTxObserver::PrimaryTxObserver(
     Primary& p, HaBroker& hb, const boost::intrusive_ptr<broker::TxBuffer>& tx
 ) :
+    state(SENDING),
     primary(p), haBroker(hb), broker(hb.getBroker()),
     replicationTest(hb.getSettings().replicateDefault.get()),
     txBuffer(tx),
     id(true),
-    exchangeName(TRANSACTION_REPLICATOR_PREFIX+id.str()),
-    complete(false)
+    exchangeName(TRANSACTION_REPLICATOR_PREFIX+id.str())
 {
     logPrefix = "Primary transaction "+shortStr(id)+": ";
 
     // The brokers known at this point are the ones that will be included
     // in the transaction. Brokers that join later are not included.
     //
-    BrokerInfo::Set backups(haBroker.getMembership().otherBackups());
-    std::transform(backups.begin(), backups.end(), inserter(members, members.begin()),
+    BrokerInfo::Set backups_(haBroker.getMembership().otherBackups());
+    std::transform(backups_.begin(), backups_.end(), inserter(backups, backups.begin()),
 		   boost::bind(&BrokerInfo::getSystemId, _1));
 
+    // Delay completion of TX untill all backups have responded to prepare.
+    incomplete = backups;
+    for (size_t i = 0; i < incomplete.size(); ++i)
+        txBuffer->startCompleter();
+
     QPID_LOG(debug, logPrefix << "Started TX " << id);
-    QPID_LOG(debug, logPrefix << "Members: " << members);
-    unprepared = unfinished = members;
+    QPID_LOG(debug, logPrefix << "Backups: " << backups);
+}
 
+void PrimaryTxObserver::initialize() {
+    boost::shared_ptr<Exchange> ex(new Exchange(shared_from_this()));
+    broker.getExchanges().registerExchange(ex);
     pair<QueuePtr, bool> result =
         broker.getQueues().declare(
             exchangeName, QueueSettings(/*durable*/false, /*autodelete*/true));
-    assert(result.second);
+    if (!result.second)
+        throw InvalidArgumentException(
+            QPID_MSG(logPrefix << "TX replication queue already exists."));
     txQueue = result.first;
-    txQueue->deliver(TxMembersEvent(members).message());
+    txQueue->markInUse(true); // Prevent auto-delete till we are done.
+    txQueue->deliver(TxBackupsEvent(backups).message());
+
 }
 
+
 PrimaryTxObserver::~PrimaryTxObserver() {
     QPID_LOG(debug, logPrefix << "Ended");
 }
 
-void PrimaryTxObserver::initialize() {
-    boost::shared_ptr<Exchange> ex(new Exchange(shared_from_this()));
-    FieldTable args = ex->getArgs();
-    args.setString(QPID_REPLICATE, printable(NONE).str()); // Set replication arg.
-    broker.getExchanges().registerExchange(ex);
+void PrimaryTxObserver::checkState(State expect, const std::string& msg) {
+    if (state != expect)
+        throw IllegalStateException(QPID_MSG(logPrefix << "Illegal state: " << msg));
 }
 
 void PrimaryTxObserver::enqueue(const QueuePtr& q, const broker::Message& m)
 {
-    sys::Mutex::ScopedLock l(lock);
+    Mutex::ScopedLock l(lock);
     if (replicationTest.useLevel(*q) == ALL) { // Ignore unreplicated queues.
         QPID_LOG(trace, logPrefix << "Enqueue: " << LogMessageId(*q, m));
+        checkState(SENDING, "Too late for enqueue");
         enqueues[q] += m.getReplicationId();
         txQueue->deliver(TxEnqueueEvent(q->getName(), m.getReplicationId()).message());
         txQueue->deliver(m);
@@ -136,7 +151,8 @@ void PrimaryTxObserver::enqueue(const Qu
 void PrimaryTxObserver::dequeue(
     const QueuePtr& q, QueuePosition pos, ReplicationId id)
 {
-    sys::Mutex::ScopedLock l(lock);
+    Mutex::ScopedLock l(lock);
+    checkState(SENDING, "Too late for dequeue");
     if (replicationTest.useLevel(*q) == ALL) { // Ignore unreplicated queues.
         QPID_LOG(trace, logPrefix << "Dequeue: " << LogMessageId(*q, pos, id));
         txQueue->deliver(TxDequeueEvent(q->getName(), id).message());
@@ -163,14 +179,14 @@ struct Skip {
 } // namespace
 
 bool PrimaryTxObserver::prepare() {
-    QPID_LOG(debug, logPrefix << "Prepare " << members);
+    QPID_LOG(debug, logPrefix << "Prepare " << backups);
     vector<Skip> skips;
     {
-        sys::Mutex::ScopedLock l(lock);
-        for (size_t i = 0; i < members.size(); ++i) txBuffer->startCompleter();
-
+        Mutex::ScopedLock l(lock);
+        checkState(SENDING, "Too late for prepare");
+        state = PREPARING;
         // Tell replicating subscriptions to skip IDs in the transaction.
-        for (UuidSet::iterator b = members.begin(); b != members.end(); ++b)
+        for (UuidSet::iterator b = backups.begin(); b != backups.end(); ++b)
             for (QueueIdsMap::iterator q = enqueues.begin(); q != enqueues.end(); ++q)
                 skips.push_back(Skip(*b, q->first, q->second));
     }
@@ -183,69 +199,91 @@ bool PrimaryTxObserver::prepare() {
 
 void PrimaryTxObserver::commit() {
     QPID_LOG(debug, logPrefix << "Commit");
-    sys::Mutex::ScopedLock l(lock);
-    txQueue->deliver(TxCommitEvent().message());
-    complete = true;
-    end(l);
+    Mutex::ScopedLock l(lock);
+    checkState(PREPARING, "Cannot commit, not preparing");
+    if (incomplete.size() == 0) {
+        txQueue->deliver(TxCommitEvent().message());
+        end(l);
+    } else {
+        txQueue->deliver(TxRollbackEvent().message());
+        end(l);
+        throw PreconditionFailedException(
+            QPID_MSG(logPrefix << "Cannot commit, " << incomplete.size()
+                     << " incomplete backups"));
+    }
 }
 
 void PrimaryTxObserver::rollback() {
     QPID_LOG(debug, logPrefix << "Rollback");
-    sys::Mutex::ScopedLock l(lock);
-    txQueue->deliver(TxRollbackEvent().message());
-    complete = true;
-    end(l);
-}
-
-void PrimaryTxObserver::end(sys::Mutex::ScopedLock&) {
-    // Don't destroy the tx-queue until the transaction is complete and there
-    // are no connected subscriptions.
-    if (txBuffer && complete && unfinished.empty()) {
-        txBuffer = 0;       // Break pointer cycle.
-        try {
-            haBroker.getBroker().deleteQueue(txQueue->getName(), haBroker.getUserId(), string());
-        } catch (const std::exception& e) {
-            QPID_LOG(error, logPrefix << "Deleting transaction queue: "  << e.what());
-        }
-        try {
-            broker.getExchanges().destroy(getExchangeName());
-        } catch (const std::exception& e) {
-            QPID_LOG(error, logPrefix << "Deleting transaction exchange: "  << e.what());
-        }
+    Mutex::ScopedLock l(lock);
+    if (state != ENDED) {
+        txQueue->deliver(TxRollbackEvent().message());
+        end(l);
     }
 }
 
+void PrimaryTxObserver::end(Mutex::ScopedLock&) {
+    if (state == ENDED) return;
+    state = ENDED;
+    // If there are no outstanding completions, break pointer cycle here.
+    // Otherwise break it in cancel() when the remaining completions are done.
+    if (incomplete.empty()) txBuffer = 0;
+    txQueue->releaseFromUse(true); // txQueue will auto-delete
+    txQueue.reset();
+    try {
+        broker.getExchanges().destroy(getExchangeName());
+    } catch (const std::exception& e) {
+        QPID_LOG(error, logPrefix << "Deleting transaction exchange: "  << e.what());
+    }
+}
+
+bool PrimaryTxObserver::completed(const Uuid& id, Mutex::ScopedLock&) {
+    if (incomplete.erase(id)) {
+        txBuffer->finishCompleter();
+        return true;
+    }
+    return false;
+}
+
+bool PrimaryTxObserver::error(const Uuid& id, const char* msg, Mutex::ScopedLock& l)
+{
+    if (incomplete.find(id) != incomplete.end()) {
+        // Note: setError before completed since completed may trigger completion.
+        txBuffer->setError(QPID_MSG(logPrefix << msg << id));
+        completed(id, l);
+        return true;
+    }
+    return false;
+}
+
 void PrimaryTxObserver::txPrepareOkEvent(const string& data) {
-    sys::Mutex::ScopedLock l(lock);
+    Mutex::ScopedLock l(lock);
     types::Uuid backup = decodeStr<TxPrepareOkEvent>(data).broker;
-    if (unprepared.erase(backup)) {
+    if (completed(backup, l)) {
         QPID_LOG(debug, logPrefix << "Backup prepared ok: " << backup);
-        txBuffer->finishCompleter();
+    } else {
+        QPID_LOG(error, logPrefix << "Unexpected prepare-ok response from " << backup);
     }
 }
 
 void PrimaryTxObserver::txPrepareFailEvent(const string& data) {
-    sys::Mutex::ScopedLock l(lock);
+    Mutex::ScopedLock l(lock);
     types::Uuid backup = decodeStr<TxPrepareFailEvent>(data).broker;
-    if (unprepared.erase(backup)) {
-        QPID_LOG(error, logPrefix << "Prepare failed on backup: " << backup);
-        txBuffer->setError(
-            QPID_MSG(logPrefix << "Prepare failed on backup: " << backup));
-        txBuffer->finishCompleter();
+    if (error(backup, "Prepare failed on backup: ", l)) {
+        QPID_LOG(error, logPrefix << "Prepare failed on backup " << backup);
+    } else {
+        QPID_LOG(error, logPrefix << "Unexpected prepare-fail response from " << backup);
     }
 }
 
 void PrimaryTxObserver::cancel(const ReplicatingSubscription& rs) {
-    sys::Mutex::ScopedLock l(lock);
+    Mutex::ScopedLock l(lock);
     types::Uuid backup = rs.getBrokerInfo().getSystemId();
-    if (unprepared.erase(backup) ){
-        complete = true;          // Cancelled before prepared.
-        txBuffer->setError(
-            QPID_MSG(logPrefix << "Backup disconnected: " << rs.getBrokerInfo()));
-        txBuffer->finishCompleter();
-    }
-    unfinished.erase(backup);
-    end(l);
+    QPID_LOG(debug, logPrefix << "Backup disconnected: " << backup);
+    // Normally the backup should be completed before it is cancelled.
+    if (completed(backup, l)) error(backup, "Unexpected disconnect:", l);
+    // Break the pointer cycle if backups have completed and we are done with txBuffer.
+    if (state == ENDED && incomplete.empty()) txBuffer = 0;
 }
 
 }} // namespace qpid::ha

Modified: qpid/trunk/qpid/cpp/src/qpid/ha/PrimaryTxObserver.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/PrimaryTxObserver.h?rev=1541146&r1=1541145&r2=1541146&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/PrimaryTxObserver.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/PrimaryTxObserver.h Tue Nov 12 16:58:52 2013
@@ -90,13 +90,21 @@ class PrimaryTxObserver : public broker:
     typedef qpid::sys::unordered_map<
       QueuePtr, ReplicationIdSet, Hasher<QueuePtr> > QueueIdsMap;
 
-    void membership(const BrokerInfo::Map&);
+    enum State {
+        SENDING,                ///< Sending TX messages and acks
+        PREPARING,              ///< Prepare sent, waiting for response
+        ENDED                   ///< Commit or rollback sent, local transaction ended.
+    };
+
+    void checkState(State expect, const std::string& msg);
     void end(sys::Mutex::ScopedLock&);
     void txPrepareOkEvent(const std::string& data);
     void txPrepareFailEvent(const std::string& data);
-
+    bool completed(const types::Uuid& id, sys::Mutex::ScopedLock&);
+    bool error(const types::Uuid& id, const char* msg, sys::Mutex::ScopedLock& l);
 
     sys::Monitor lock;
+    State state;
     std::string logPrefix;
     Primary& primary;
     HaBroker& haBroker;
@@ -110,10 +118,8 @@ class PrimaryTxObserver : public broker:
     std::string exchangeName;
     QueuePtr txQueue;
     QueueIdsMap enqueues;
-    bool complete;
-    UuidSet members;            // All members of transaction.
-    UuidSet unprepared;         // Members that have not yet responded to prepare.
-    UuidSet unfinished;         // Members that have not yet disconnected.
+    UuidSet backups;            // All backups of transaction.
+    UuidSet incomplete;         // Incomplete backups (not yet responded to prepare)
 };
 
 }} // namespace qpid::ha

Modified: qpid/trunk/qpid/cpp/src/qpid/ha/QueueReplicator.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/QueueReplicator.cpp?rev=1541146&r1=1541145&r2=1541146&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/QueueReplicator.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/QueueReplicator.cpp Tue Nov 12 16:58:52 2013
@@ -46,18 +46,13 @@ namespace qpid {
 namespace ha {
 using namespace broker;
 using namespace framing;
+using namespace framing::execution;
 using namespace std;
 using std::exception;
 using sys::Mutex;
 
 const std::string QueueReplicator::QPID_SYNC_FREQUENCY("qpid.sync_frequency");
 
-namespace {
-const string QPID_HA(QPID_HA_PREFIX);
-const std::string TYPE_NAME(QPID_HA+"queue-replicator");
-}
-
-
 std::string QueueReplicator::replicatorName(const std::string& queueName) {
     return QUEUE_REPLICATOR_PREFIX + queueName;
 }
@@ -68,20 +63,21 @@ bool QueueReplicator::isReplicatorName(c
 
 class QueueReplicator::ErrorListener : public SessionHandler::ErrorListener {
   public:
-    ErrorListener(const std::string& prefix) : logPrefix(prefix) {}
-    void connectionException(framing::connection::CloseCode, const std::string& msg) {
-        QPID_LOG(error, logPrefix << "Connection error: " << msg);
-    }
-    void channelException(framing::session::DetachCode, const std::string& msg) {
-        QPID_LOG(error, logPrefix << "Channel error: " << msg);
-    }
-    void executionException(framing::execution::ErrorCode, const std::string& msg) {
-        QPID_LOG(error, logPrefix << "Execution error: " << msg);
+    ErrorListener(const boost::shared_ptr<QueueReplicator>& qr)
+        : queueReplicator(qr), logPrefix(qr->logPrefix) {}
+
+    void connectionException(framing::connection::CloseCode, const std::string&) {}
+    void channelException(framing::session::DetachCode, const std::string&) {}
+    void executionException(framing::execution::ErrorCode, const std::string&) {}
+
+    void incomingExecutionException(ErrorCode e, const std::string& msg) {
+        queueReplicator->incomingExecutionException(e, msg);
     }
     void detach() {
         QPID_LOG(debug, logPrefix << "Session detached");
     }
   private:
+    boost::shared_ptr<QueueReplicator> queueReplicator;
     std::string logPrefix;
 };
 
@@ -128,6 +124,8 @@ QueueReplicator::QueueReplicator(HaBroke
         boost::bind(&QueueReplicator::idEvent, this, _1, _2);
 }
 
+QueueReplicator::~QueueReplicator() {}
+
 // This must be called immediately after the constructor.
 // It has to be separate so we can call shared_from_this().
 void QueueReplicator::activate() {
@@ -161,7 +159,7 @@ void QueueReplicator::activate() {
     );
     bridge = result.first;
     bridge->setErrorListener(
-        boost::shared_ptr<ErrorListener>(new ErrorListener(logPrefix)));
+        boost::shared_ptr<ErrorListener>(new ErrorListener(shared_from_this())));
 
     // Enable callback to destroy()
     queue->addObserver(
@@ -173,8 +171,6 @@ void QueueReplicator::disconnect() {
     sessionHandler = 0;
 }
 
-QueueReplicator::~QueueReplicator() {}
-
 // Called from Queue::destroyed()
 void QueueReplicator::destroy() {
     boost::shared_ptr<Bridge> bridge2; // To call outside of lock
@@ -200,7 +196,7 @@ void QueueReplicator::initializeBridge(B
     AMQP_ServerProxy peer(sessionHandler->out);
     const qmf::org::apache::qpid::broker::ArgsLinkBridge& args(bridge.getArgs());
     FieldTable arguments;
-    arguments.setInt(ReplicatingSubscription::QPID_REPLICATING_SUBSCRIPTION, 1);
+    arguments.setString(ReplicatingSubscription::QPID_REPLICATING_SUBSCRIPTION, getType());
     arguments.setInt(QPID_SYNC_FREQUENCY, 1); // TODO aconway 2012-05-22: optimize?
     arguments.setTable(ReplicatingSubscription::QPID_BROKER_INFO, brokerInfo.asFieldTable());
     arguments.setString(ReplicatingSubscription::QPID_ID_SET,
@@ -289,12 +285,26 @@ ReplicationId QueueReplicator::getMaxId(
     return maxId;
 }
 
+void QueueReplicator::incomingExecutionException(ErrorCode e, const std::string& msg) {
+    if (e == ERROR_CODE_NOT_FOUND || e == ERROR_CODE_RESOURCE_DELETED) {
+        // If the queue is destroyed at the same time we are subscribing, we may
+        // get a not-found or resource-deleted exception before the
+        // BrokerReplicator gets the queue-delete event. Shut down the bridge by
+        // calling destroy(), we can let the BrokerReplicator delete the queue
+        // when the queue-delete arrives.
+        QPID_LOG(debug, logPrefix << "Deleted on primary: " << msg);
+        destroy();
+    }
+    else
+        QPID_LOG(error, logPrefix << "Incoming execution exception: " << msg);
+}
+
 // Unused Exchange methods.
 bool QueueReplicator::bind(boost::shared_ptr<Queue>, const std::string&, const FieldTable*) { return false; }
 bool QueueReplicator::unbind(boost::shared_ptr<Queue>, const std::string&, const FieldTable*) { return false; }
 bool QueueReplicator::isBound(boost::shared_ptr<Queue>, const std::string* const, const FieldTable* const) { return false; }
 bool QueueReplicator::hasBindings() { return false; }
-std::string QueueReplicator::getType() const { return TYPE_NAME; }
+std::string QueueReplicator::getType() const { return ReplicatingSubscription::QPID_QUEUE_REPLICATOR; }
 
 
 }} // namespace qpid::broker

Modified: qpid/trunk/qpid/cpp/src/qpid/ha/QueueReplicator.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/QueueReplicator.h?rev=1541146&r1=1541145&r2=1541146&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/QueueReplicator.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/QueueReplicator.h Tue Nov 12 16:58:52 2013
@@ -118,6 +118,9 @@ class QueueReplicator : public broker::E
     void dequeueEvent(const std::string& data, sys::Mutex::ScopedLock&);
     void idEvent(const std::string& data, sys::Mutex::ScopedLock&);
 
+    void incomingExecutionException(framing::execution::ErrorCode e,
+                                    const std::string& msg);
+
     std::string logPrefix;
     std::string bridgeName;
 
@@ -127,6 +130,8 @@ class QueueReplicator : public broker::E
     ReplicationIdSet idSet; // Set of replicationIds on the queue.
     ReplicationId nextId;   // ID for next message to arrive.
     ReplicationId maxId;    // Max ID used so far.
+
+  friend class ErrorListener;
 };
 
 

Modified: qpid/trunk/qpid/cpp/src/qpid/ha/QueueSnapshots.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/QueueSnapshots.h?rev=1541146&r1=1541145&r2=1541146&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/QueueSnapshots.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/QueueSnapshots.h Tue Nov 12 16:58:52 2013
@@ -32,6 +32,7 @@
 #include "qpid/sys/Mutex.h"
 
 #include <boost/shared_ptr.hpp>
+#include <boost/make_shared.hpp>
 
 namespace qpid {
 namespace ha {
@@ -44,35 +45,28 @@ class QueueSnapshots : public broker::Br
 {
   public:
     boost::shared_ptr<QueueSnapshot> get(const boost::shared_ptr<broker::Queue>& q) const {
-        sys::Mutex::ScopedLock l(lock);
-        SnapshotMap::const_iterator i = snapshots.find(q);
-        return i != snapshots.end() ? i->second : boost::shared_ptr<QueueSnapshot>();
+        boost::shared_ptr<QueueSnapshot> qs;
+        q->eachObserver(
+            boost::bind(QueueSnapshots::saveQueueSnapshot, _1, boost::ref(qs)));
+        return qs;
     }
 
     // BrokerObserver overrides.
     void queueCreate(const boost::shared_ptr<broker::Queue>& q) {
-        sys::Mutex::ScopedLock l(lock);
-        boost::shared_ptr<QueueSnapshot> observer(new QueueSnapshot);
-        snapshots[q] = observer;
-        q->addObserver(observer);
+        q->addObserver(boost::make_shared<QueueSnapshot>());
     }
 
     void queueDestroy(const boost::shared_ptr<broker::Queue>& q) {
-        sys::Mutex::ScopedLock l(lock);
-        SnapshotMap::iterator i = snapshots.find(q);
-        if (i != snapshots.end()) {
-            q->removeObserver(i->second);
-            snapshots.erase(i);
-        }
+        q->removeObserver(get(q));
     }
 
   private:
-    typedef qpid::sys::unordered_map<boost::shared_ptr<broker::Queue>,
-                                     boost::shared_ptr<QueueSnapshot>,
-                                     Hasher<boost::shared_ptr<broker::Queue> >
-                                     > SnapshotMap;
-    SnapshotMap snapshots;
-    mutable sys::Mutex lock;
+    static void saveQueueSnapshot(
+        const boost::shared_ptr<broker::QueueObserver>& observer,
+        boost::shared_ptr<QueueSnapshot>& out)
+    {
+        if (!out) out = boost::dynamic_pointer_cast<QueueSnapshot>(observer);
+    }
 };
 
 

Modified: qpid/trunk/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp?rev=1541146&r1=1541145&r2=1541146&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp Tue Nov 12 16:58:52 2013
@@ -25,15 +25,16 @@
 #include "QueueReplicator.h"
 #include "QueueSnapshots.h"
 #include "ReplicatingSubscription.h"
+#include "TxReplicatingSubscription.h"
 #include "Primary.h"
 #include "HaBroker.h"
 #include "qpid/assert.h"
 #include "qpid/broker/Queue.h"
-#include "qpid/broker/QueueObserver.h"
 #include "qpid/broker/SessionContext.h"
 #include "qpid/broker/amqp_0_10/MessageTransfer.h"
 #include "qpid/framing/AMQFrame.h"
 #include "qpid/framing/MessageTransferBody.h"
+#include "qpid/framing/reply_exceptions.h"
 #include "qpid/log/Statement.h"
 #include "qpid/types/Uuid.h"
 #include <sstream>
@@ -47,22 +48,12 @@ using namespace broker;
 using namespace std;
 using sys::Mutex;
 using broker::amqp_0_10::MessageTransfer;
-
-const string ReplicatingSubscription::QPID_REPLICATING_SUBSCRIPTION("qpid.ha-replicating-subscription");
-const string ReplicatingSubscription::QPID_BROKER_INFO("qpid.ha-broker-info");
-const string ReplicatingSubscription::QPID_ID_SET("qpid.ha-info");
-
-class ReplicatingSubscription::QueueObserver : public broker::QueueObserver {
-  public:
-    QueueObserver(ReplicatingSubscription& rs_) : rs(rs_) {}
-    void enqueued(const broker::Message&) {}
-    void dequeued(const broker::Message& m) { rs.dequeued(m.getReplicationId()); }
-    void acquired(const broker::Message&) {}
-    void requeued(const broker::Message&) {}
-  private:
-    ReplicatingSubscription& rs;
-};
-
+namespace { const string QPID_HA(QPID_HA_PREFIX); }
+const string ReplicatingSubscription::QPID_REPLICATING_SUBSCRIPTION(QPID_HA+"repsub");
+const string ReplicatingSubscription::QPID_BROKER_INFO(QPID_HA+"info");
+const string ReplicatingSubscription::QPID_ID_SET(QPID_HA+"ids");
+const string ReplicatingSubscription::QPID_QUEUE_REPLICATOR(QPID_HA+"qrep");
+const string ReplicatingSubscription::QPID_TX_REPLICATOR(QPID_HA+"txrep");
 
 /* Called by SemanticState::consume to create a consumer */
 boost::shared_ptr<broker::SemanticState::ConsumerImpl>
@@ -79,13 +70,20 @@ ReplicatingSubscription::Factory::create
     const framing::FieldTable& arguments
 ) {
     boost::shared_ptr<ReplicatingSubscription> rs;
-    if (arguments.isSet(QPID_REPLICATING_SUBSCRIPTION)) {
+    std::string type = arguments.getAsString(QPID_REPLICATING_SUBSCRIPTION);
+    if (type == QPID_QUEUE_REPLICATOR) {
         rs.reset(new ReplicatingSubscription(
                      haBroker,
                      parent, name, queue, ack, acquire, exclusive, tag,
                      resumeId, resumeTtl, arguments));
-        rs->initialize();
     }
+    else if (type == QPID_TX_REPLICATOR) {
+        rs.reset(new TxReplicatingSubscription(
+                     haBroker,
+                     parent, name, queue, ack, acquire, exclusive, tag,
+                     resumeId, resumeTtl, arguments));
+    }
+    if (rs) rs->initialize();
     return rs;
 }
 
@@ -100,7 +98,7 @@ ReplicatingSubscription::ReplicatingSubs
     HaBroker& hb,
     SemanticState* parent,
     const string& name,
-    Queue::shared_ptr queue,
+    Queue::shared_ptr queue_,
     bool ack,
     bool /*acquire*/,
     bool exclusive,
@@ -108,16 +106,22 @@ ReplicatingSubscription::ReplicatingSubs
     const string& resumeId,
     uint64_t resumeTtl,
     const framing::FieldTable& arguments
-) : ConsumerImpl(parent, name, queue, ack, REPLICATOR, exclusive, tag,
+) : ConsumerImpl(parent, name, queue_, ack, REPLICATOR, exclusive, tag,
                  resumeId, resumeTtl, arguments),
     position(0), ready(false), cancelled(false),
     haBroker(hb),
     primary(boost::dynamic_pointer_cast<Primary>(haBroker.getRole()))
-{
+{}
+
+// Called in subscription's connection thread when the subscription is created.
+// Separate from ctor because we need to use shared_from_this
+//
+void ReplicatingSubscription::initialize() {
     try {
         FieldTable ft;
-        if (!arguments.getTable(ReplicatingSubscription::QPID_BROKER_INFO, ft))
-            throw Exception("Replicating subscription does not have broker info: " + tag);
+        if (!getArguments().getTable(ReplicatingSubscription::QPID_BROKER_INFO, ft))
+            throw InvalidArgumentException(
+                logPrefix+"Can't subscribe, no broker info: "+getTag());
         info.assign(ft);
 
         // Set a log prefix message that identifies the remote broker.
@@ -147,10 +151,17 @@ ReplicatingSubscription::ReplicatingSubs
         // However we must attach the observer _before_ we snapshot for
         // initial dequeues to be sure we don't miss any dequeues
         // between the snapshot and attaching the observer.
-        observer.reset(new QueueObserver(*this));
-        queue->addObserver(observer);
-        ReplicationIdSet primaryIds = haBroker.getQueueSnapshots()->get(queue)->snapshot();
-        std::string backupStr = arguments.getAsString(ReplicatingSubscription::QPID_ID_SET);
+        queue->addObserver(
+            boost::dynamic_pointer_cast<ReplicatingSubscription>(shared_from_this()));
+        boost::shared_ptr<QueueSnapshot> snapshot = haBroker.getQueueSnapshots()->get(queue);
+        // There may be no snapshot if the queue is being deleted concurrently.
+        if (!snapshot) {
+            queue->removeObserver(
+                boost::dynamic_pointer_cast<ReplicatingSubscription>(shared_from_this()));
+            throw ResourceDeletedException(logPrefix+"Can't subscribe, queue deleted");
+        }
+        ReplicationIdSet primaryIds = snapshot->snapshot();
+        std::string backupStr = getArguments().getAsString(ReplicatingSubscription::QPID_ID_SET);
         ReplicationIdSet backupIds;
         if (!backupStr.empty()) backupIds = decodeStr<ReplicationIdSet>(backupStr);
 
@@ -172,23 +183,7 @@ ReplicatingSubscription::ReplicatingSubs
                      << ", on backup " << skip);
             checkReady(l);
         }
-    }
-    catch (const std::exception& e) {
-        QPID_LOG(error, logPrefix << "Creation error: " << e.what()
-                 << ": arguments=" << getArguments());
-        throw;
-    }
-}
 
-ReplicatingSubscription::~ReplicatingSubscription() {}
-
-
-// Called in subscription's connection thread when the subscription is created.
-// Called separate from ctor because sending events requires
-// shared_from_this
-//
-void ReplicatingSubscription::initialize() {
-    try {
         if (primary) primary->addReplica(*this);
         Mutex::ScopedLock l(lock); // Note dequeued() can be called concurrently.
         // Send initial dequeues to the backup.
@@ -196,12 +191,14 @@ void ReplicatingSubscription::initialize
         sendDequeueEvent(l);
     }
     catch (const std::exception& e) {
-        QPID_LOG(error, logPrefix << "Initialization error: " << e.what()
-                 << ": arguments=" << getArguments());
+        QPID_LOG(error, logPrefix << "Subscribe failed: " << e.what());
         throw;
     }
 }
 
+ReplicatingSubscription::~ReplicatingSubscription() {}
+
+
 // True if the next position for the ReplicatingSubscription is a guarded position.
 bool ReplicatingSubscription::isGuarded(sys::Mutex::ScopedLock&) {
     return position+1 >= guard->getFirst();
@@ -258,7 +255,8 @@ void ReplicatingSubscription::cancel()
     }
     QPID_LOG(debug, logPrefix << "Cancelled");
     if (primary) primary->removeReplica(*this);
-    getQueue()->removeObserver(observer);
+    getQueue()->removeObserver(
+        boost::dynamic_pointer_cast<ReplicatingSubscription>(shared_from_this()));
     guard->cancel();
     ConsumerImpl::cancel();
 }
@@ -289,8 +287,9 @@ void ReplicatingSubscription::sendDequeu
 // Called after the message has been removed
 // from the deque and under the messageLock in the queue. Called in
 // arbitrary connection threads.
-void ReplicatingSubscription::dequeued(ReplicationId id)
+void ReplicatingSubscription::dequeued(const broker::Message& m)
 {
+    ReplicationId id = m.getReplicationId();
     QPID_LOG(trace, logPrefix << "Dequeued ID " << id);
     {
         Mutex::ScopedLock l(lock);

Modified: qpid/trunk/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h?rev=1541146&r1=1541145&r2=1541146&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h Tue Nov 12 16:58:52 2013
@@ -25,6 +25,8 @@
 #include "BrokerInfo.h"
 #include "qpid/broker/SemanticState.h"
 #include "qpid/broker/ConsumerFactory.h"
+#include "qpid/broker/QueueObserver.h"
+#include <boost/enable_shared_from_this.hpp>
 #include <iosfwd>
 
 namespace qpid {
@@ -65,81 +67,91 @@ class Primary;
  *
  *  ReplicatingSubscription makes calls on QueueGuard, but not vice-versa.
  */
-class ReplicatingSubscription : public broker::SemanticState::ConsumerImpl
+class ReplicatingSubscription :
+    public broker::SemanticState::ConsumerImpl,
+    public broker::QueueObserver
 {
-  public:
-    typedef broker::SemanticState::ConsumerImpl ConsumerImpl;
+public:
+typedef broker::SemanticState::ConsumerImpl ConsumerImpl;
 
-    class Factory : public broker::ConsumerFactory {
-      public:
-        Factory(HaBroker& hb) : haBroker(hb) {}
-
-        HaBroker& getHaBroker() const { return haBroker; }
-
-        boost::shared_ptr<broker::SemanticState::ConsumerImpl> create(
-            broker::SemanticState* parent,
-            const std::string& name, boost::shared_ptr<broker::Queue> ,
-            bool ack, bool acquire, bool exclusive, const std::string& tag,
-            const std::string& resumeId, uint64_t resumeTtl,
-            const framing::FieldTable& arguments);
-      private:
-        HaBroker& haBroker;
-    };
-
-    // Argument names for consume command.
-    static const std::string QPID_REPLICATING_SUBSCRIPTION;
-    static const std::string QPID_BROKER_INFO;
-    static const std::string QPID_ID_SET;
+class Factory : public broker::ConsumerFactory {
+public:
+Factory(HaBroker& hb) : haBroker(hb) {}
+
+HaBroker& getHaBroker() const { return haBroker; }
+
+boost::shared_ptr<broker::SemanticState::ConsumerImpl> create(
+broker::SemanticState* parent,
+    const std::string& name, boost::shared_ptr<broker::Queue> ,
+    bool ack, bool acquire, bool exclusive, const std::string& tag,
+    const std::string& resumeId, uint64_t resumeTtl,
+    const framing::FieldTable& arguments);
+private:
+HaBroker& haBroker;
+};
+
+// Argument names for consume command.
+static const std::string QPID_REPLICATING_SUBSCRIPTION;
+static const std::string QPID_BROKER_INFO;
+static const std::string QPID_ID_SET;
+// Replicator types: argument values for QPID_REPLICATING_SUBSCRIPTION argument.
+static const std::string QPID_QUEUE_REPLICATOR;
+static const std::string QPID_TX_REPLICATOR;
 
-    ReplicatingSubscription(HaBroker& haBroker,
+ReplicatingSubscription(HaBroker& haBroker,
                             broker::SemanticState* parent,
                             const std::string& name, boost::shared_ptr<broker::Queue> ,
                             bool ack, bool acquire, bool exclusive, const std::string& tag,
                             const std::string& resumeId, uint64_t resumeTtl,
                             const framing::FieldTable& arguments);
 
-    ~ReplicatingSubscription();
+~ReplicatingSubscription();
+
+
+// Consumer overrides.
+bool deliver(const broker::QueueCursor& cursor, const broker::Message& msg);
+void cancel();
+void acknowledged(const broker::DeliveryRecord&);
+bool browseAcquired() const { return true; }
+// Hide the "queue deleted" error for a ReplicatingSubscription when a
+// queue is deleted, this is normal and not an error.
+bool hideDeletedError() { return true; }
+
+// QueueObserver overrides
+void enqueued(const broker::Message&) {}
+void dequeued(const broker::Message&);
+void acquired(const broker::Message&) {}
+void requeued(const broker::Message&) {}
+
+/** A ReplicatingSubscription is a passive observer, not counted for auto
+ * deletion and immediate message purposes.
+ */
+bool isCounted() { return false; }
+
+/** Initialization that must be done separately from construction
+ * because it requires a shared_ptr to this to exist.
+ */
+void initialize();
+
+BrokerInfo getBrokerInfo() const { return info; }
 
+/** Skip replicating enqueue of of ids. */
+void addSkip(const ReplicationIdSet& ids);
 
-    // Consumer overrides.
-    bool deliver(const broker::QueueCursor& cursor, const broker::Message& msg);
-    void cancel();
-    void acknowledged(const broker::DeliveryRecord&);
-    bool browseAcquired() const { return true; }
-    // Hide the "queue deleted" error for a ReplicatingSubscription when a
-    // queue is deleted, this is normal and not an error.
-    bool hideDeletedError() { return true; }
-    // Not counted for auto deletion and immediate message purposes.
-    bool isCounted() { return false; }
-
-    /** Initialization that must be done separately from construction
-     * because it requires a shared_ptr to this to exist.
-     */
-    void initialize();
-
-    BrokerInfo getBrokerInfo() const { return info; }
-
-    /** Skip replicating enqueue of of ids. */
-    void addSkip(const ReplicationIdSet& ids);
-
-  protected:
-    bool doDispatch();
-
-  private:
-    class QueueObserver;
-  friend class QueueObserver;
-
-    std::string logPrefix;
-    QueuePosition position;
-    ReplicationIdSet dequeues;  // Dequeues to be sent in next dequeue event.
-    ReplicationIdSet skip;   // Skip enqueues: messages already on backup and tx enqueues.
-    ReplicationIdSet unready;   // Unguarded, replicated and un-acknowledged.
-    bool ready;
-    bool cancelled;
-    BrokerInfo info;
-    boost::shared_ptr<QueueGuard> guard;
+protected:
+bool doDispatch();
+
+private:
+std::string logPrefix;
+QueuePosition position;
+ReplicationIdSet dequeues;  // Dequeues to be sent in next dequeue event.
+ReplicationIdSet skip;   // Skip enqueues: messages already on backup and tx enqueues.
+ReplicationIdSet unready;   // Unguarded, replicated and un-acknowledged.
+bool ready;
+bool cancelled;
+BrokerInfo info;
+boost::shared_ptr<QueueGuard> guard;
     HaBroker& haBroker;
-    boost::shared_ptr<QueueObserver> observer;
     boost::shared_ptr<Primary> primary;
 
     bool isGuarded(sys::Mutex::ScopedLock&);

Added: qpid/trunk/qpid/cpp/src/qpid/ha/TxReplicatingSubscription.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/TxReplicatingSubscription.cpp?rev=1541146&view=auto
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/TxReplicatingSubscription.cpp (added)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/TxReplicatingSubscription.cpp Tue Nov 12 16:58:52 2013
@@ -0,0 +1,45 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "TxReplicatingSubscription.h"
+
+namespace qpid {
+namespace ha {
+using namespace std;
+using namespace broker;
+
+TxReplicatingSubscription::TxReplicatingSubscription(
+    HaBroker& hb,
+    SemanticState* parent,
+    const string& name,
+    boost::shared_ptr<Queue> queue,
+    bool ack,
+    bool acquire,
+    bool exclusive,
+    const string& tag,
+    const string& resumeId,
+    uint64_t resumeTtl,
+    const framing::FieldTable& arguments
+) : ReplicatingSubscription(hb, parent, name, queue, ack, acquire, exclusive, tag,
+                            resumeId, resumeTtl, arguments)
+{}
+
+}} // namespace qpid::ha

Propchange: qpid/trunk/qpid/cpp/src/qpid/ha/TxReplicatingSubscription.cpp
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: qpid/trunk/qpid/cpp/src/qpid/ha/TxReplicatingSubscription.cpp
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: qpid/trunk/qpid/cpp/src/qpid/ha/TxReplicatingSubscription.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/TxReplicatingSubscription.h?rev=1541146&view=auto
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/TxReplicatingSubscription.h (added)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/TxReplicatingSubscription.h Tue Nov 12 16:58:52 2013
@@ -0,0 +1,50 @@
+#ifndef QPID_HA_TXREPLICATINGSUBSCRIPTION_H
+#define QPID_HA_TXREPLICATINGSUBSCRIPTION_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "ReplicatingSubscription.h"
+
+namespace qpid {
+namespace ha {
+
+/**
+ * Replicating subscription for a TX queue.
+ */
+class TxReplicatingSubscription : public ReplicatingSubscription
+{
+  public:
+    TxReplicatingSubscription(HaBroker& haBroker,
+                              broker::SemanticState* parent,
+                              const std::string& name, boost::shared_ptr<broker::Queue> ,
+                              bool ack, bool acquire, bool exclusive, const std::string& tag,
+                              const std::string& resumeId, uint64_t resumeTtl,
+                              const framing::FieldTable& arguments);
+
+    /** A TxReplicatingSubscription is counted for auto-delete so we can clean
+     * up the TX queue when all backups are done.
+     */
+    bool isCounted() { return true; }
+};
+}} // namespace qpid::ha
+
+#endif  /*!QPID_HA_TXREPLICATINGSUBSCRIPTION_H*/

Propchange: qpid/trunk/qpid/cpp/src/qpid/ha/TxReplicatingSubscription.h
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: qpid/trunk/qpid/cpp/src/qpid/ha/TxReplicatingSubscription.h
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Modified: qpid/trunk/qpid/cpp/src/qpid/ha/TxReplicator.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/TxReplicator.cpp?rev=1541146&r1=1541145&r2=1541146&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/TxReplicator.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/TxReplicator.cpp Tue Nov 12 16:58:52 2013
@@ -26,6 +26,7 @@
 #include "BrokerReplicator.h"
 #include "Event.h"
 #include "HaBroker.h"
+#include "ReplicatingSubscription.h"
 #include "types.h"
 #include "qpid/broker/Broker.h"
 #include "qpid/broker/Link.h"
@@ -53,10 +54,7 @@ using qpid::broker::amqp_0_10::MessageTr
 using qpid::types::Uuid;
 
 namespace {
-const string QPID_HA(QPID_HA_PREFIX);
-const string TYPE_NAME(QPID_HA+"tx-replicator");
 const string PREFIX(TRANSACTION_REPLICATOR_PREFIX);
-
 } // namespace
 
 
@@ -70,17 +68,16 @@ string TxReplicator::getTxId(const strin
     return q.substr(PREFIX.size());
 }
 
-string TxReplicator::getType() const { return TYPE_NAME; }
+string TxReplicator::getType() const { return ReplicatingSubscription::QPID_TX_REPLICATOR; }
 
 TxReplicator::TxReplicator(
     HaBroker& hb,
     const boost::shared_ptr<broker::Queue>& txQueue,
     const boost::shared_ptr<broker::Link>& link) :
     QueueReplicator(hb, txQueue, link),
-    txBuffer(new broker::TxBuffer),
     store(hb.getBroker().hasStore() ? &hb.getBroker().getStore() : 0),
     channel(link->nextChannel()),
-    complete(false), ignore(false),
+    ended(false),
     dequeueState(hb.getBroker().getQueues())
 {
     string id(getTxId(txQueue->getName()));
@@ -100,8 +97,8 @@ TxReplicator::TxReplicator(
         boost::bind(&TxReplicator::commit, this, _1, _2);
     dispatch[TxRollbackEvent::KEY] =
         boost::bind(&TxReplicator::rollback, this, _1, _2);
-    dispatch[TxMembersEvent::KEY] =
-        boost::bind(&TxReplicator::members, this, _1, _2);
+    dispatch[TxBackupsEvent::KEY] =
+        boost::bind(&TxReplicator::backups, this, _1, _2);
 }
 
 TxReplicator::~TxReplicator() {
@@ -121,11 +118,12 @@ void TxReplicator::sendMessage(const bro
 }
 
 void TxReplicator::route(broker::Deliverable& deliverable) {
-    if (!ignore) QueueReplicator::route(deliverable);
+    QueueReplicator::route(deliverable);
 }
 
 void TxReplicator::deliver(const broker::Message& m_) {
     sys::Mutex::ScopedLock l(lock);
+    if (!txBuffer) return;
     // Deliver message to the target queue, not the tx-queue.
     broker::Message m(m_);
     m.setReplicationId(enq.id); // Use replicated id.
@@ -138,6 +136,7 @@ void TxReplicator::deliver(const broker:
 
 void TxReplicator::enqueue(const string& data, sys::Mutex::ScopedLock&) {
     sys::Mutex::ScopedLock l(lock);
+    if (!txBuffer) return;
     TxEnqueueEvent e;
     decodeStr(data, e);
     QPID_LOG(trace, logPrefix << "Enqueue: " << e);
@@ -146,6 +145,7 @@ void TxReplicator::enqueue(const string&
 
 void TxReplicator::dequeue(const string& data, sys::Mutex::ScopedLock&) {
     sys::Mutex::ScopedLock l(lock);
+    if (!txBuffer) return;
     TxDequeueEvent e;
     decodeStr(data, e);
     QPID_LOG(trace, logPrefix << "Dequeue: " << e);
@@ -195,18 +195,20 @@ boost::shared_ptr<TxAccept> TxReplicator
 }
 
 void TxReplicator::prepare(const string&, sys::Mutex::ScopedLock& l) {
+    if (!txBuffer) return;
     txBuffer->enlist(dequeueState.makeAccept());
     context = store->begin();
     if (txBuffer->prepare(context.get())) {
-        QPID_LOG(debug, logPrefix << "Prepared OK");
+        QPID_LOG(debug, logPrefix << "Local prepare OK");
         sendMessage(TxPrepareOkEvent(haBroker.getSystemId()).message(queue->getName()), l);
     } else {
-        QPID_LOG(debug, logPrefix << "Prepare failed");
+        QPID_LOG(debug, logPrefix << "Local prepare failed");
         sendMessage(TxPrepareFailEvent(haBroker.getSystemId()).message(queue->getName()), l);
     }
 }
 
 void TxReplicator::commit(const string&, sys::Mutex::ScopedLock& l) {
+    if (!txBuffer) return;
     QPID_LOG(debug, logPrefix << "Commit");
     if (context.get()) store->commit(*context);
     txBuffer->commit();
@@ -214,34 +216,44 @@ void TxReplicator::commit(const string&,
 }
 
 void TxReplicator::rollback(const string&, sys::Mutex::ScopedLock& l) {
+    if (!txBuffer) return;
     QPID_LOG(debug, logPrefix << "Rollback");
     if (context.get()) store->abort(*context);
     txBuffer->rollback();
     end(l);
 }
 
-void TxReplicator::members(const string& data, sys::Mutex::ScopedLock&) {
-    TxMembersEvent e;
+void TxReplicator::backups(const string& data, sys::Mutex::ScopedLock& l) {
+    TxBackupsEvent e;
     decodeStr(data, e);
-    QPID_LOG(debug, logPrefix << "Members: " << e.members);
-    if (!e.members.count(haBroker.getMembership().getSelf().getSystemId())) {
+    if (!e.backups.count(haBroker.getMembership().getSelf().getSystemId())) {
         QPID_LOG(info, logPrefix << "Not participating in transaction");
-        ignore = true;
+        end(l);
+    } else {
+        QPID_LOG(debug, logPrefix << "Backups: " << e.backups);
+        txBuffer = new broker::TxBuffer;
     }
 }
 
 void TxReplicator::end(sys::Mutex::ScopedLock&) {
-    complete = true;
-    if (!getQueue()) return;    // Already destroyed
-    // Destroy will cancel the subscription to the primary tx-queue which
-    // informs the primary that we have completed the transaction.
-    destroy();
+    ended = true;
+    txBuffer.reset();
+    // QueueReplicator::destroy cancels subscription to the primary tx-queue
+    // which allows the primary to clean up resources.
+    sys::Mutex::ScopedUnlock u(lock);
+    QueueReplicator::destroy();
 }
 
+// Called when the tx queue is deleted.
 void TxReplicator::destroy() {
+    {
+        sys::Mutex::ScopedLock l(lock);
+        if (!ended) {
+            QPID_LOG(error, logPrefix << "Destroyed prematurely, rollback.");
+            rollback(string(), l);
+        }
+    }
     QueueReplicator::destroy();
-    sys::Mutex::ScopedLock l(lock);
-    if (!ignore && !complete) rollback(string(), l);
 }
 
 }} // namespace qpid::ha

Modified: qpid/trunk/qpid/cpp/src/qpid/ha/TxReplicator.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/TxReplicator.h?rev=1541146&r1=1541145&r2=1541146&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/TxReplicator.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/TxReplicator.h Tue Nov 12 16:58:52 2013
@@ -84,7 +84,7 @@ class TxReplicator : public QueueReplica
     void prepare(const std::string& data, sys::Mutex::ScopedLock&);
     void commit(const std::string& data, sys::Mutex::ScopedLock&);
     void rollback(const std::string& data, sys::Mutex::ScopedLock&);
-    void members(const std::string& data, sys::Mutex::ScopedLock&);
+    void backups(const std::string& data, sys::Mutex::ScopedLock&);
     void end(sys::Mutex::ScopedLock&);
 
     std::string logPrefix;
@@ -93,7 +93,7 @@ class TxReplicator : public QueueReplica
     broker::MessageStore* store;
     std::auto_ptr<broker::TransactionContext> context;
     framing::ChannelId channel; // Channel to send prepare-complete.
-    bool complete, ignore;
+    bool empty, ended;
 
     // Class to process dequeues and create DeliveryRecords to populate a
     // TxAccept.

Modified: qpid/trunk/qpid/cpp/src/tests/brokertest.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/brokertest.py?rev=1541146&r1=1541145&r2=1541146&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/brokertest.py (original)
+++ qpid/trunk/qpid/cpp/src/tests/brokertest.py Tue Nov 12 16:58:52 2013
@@ -381,6 +381,16 @@ class Broker(Popen):
                         "Broker %s not responding: (%s)%s"%(
                             self.name,e,error_line(self.log, 5)))
 
+    def assert_log_clean(self, ignore=None):
+        log = open(self.get_log())
+        try:
+            error = re.compile("] error|] critical")
+            if ignore: ignore = re.compile(ignore)
+            else: ignore = re.compile("\000") # Won't match anything
+            for line in log.readlines():
+                assert not error.search(line) or ignore.search(line), "Errors in log file %s: %s"%(log, line)
+        finally: log.close()
+
 def browse(session, queue, timeout=0, transform=lambda m: m.content):
     """Return a list with the contents of each message on queue."""
     r = session.receiver("%s;{mode:browse}"%(queue))
@@ -549,7 +559,11 @@ class NumberedSender(Thread):
                     self.condition.release()
                 self.write_message(self.sent)
                 self.sent += 1
-        except Exception: self.error = RethrownException(self.sender.pname)
+        except Exception, e:
+            self.error = RethrownException(
+                "%s: (%s)%s"%(self.sender.pname,e,
+                              error_line(self.sender.outfile("err"))))
+
 
     def notify_received(self, count):
         """Called by receiver to enable flow control. count = messages received so far."""
@@ -612,8 +626,10 @@ class NumberedReceiver(Thread):
                     if self.sender:
                         self.sender.notify_received(self.received)
                 m = self.read_message()
-        except Exception:
-            self.error = RethrownException(self.receiver.pname)
+        except Exception, e:
+            self.error = RethrownException(
+                "%s: (%s)%s"%(self.receiver.pname,e,
+                              error_line(self.receiver.outfile("err"))))
 
     def check(self):
         """Raise an exception if there has been an error"""

Modified: qpid/trunk/qpid/cpp/src/tests/ha_tests.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/ha_tests.py?rev=1541146&r1=1541145&r2=1541146&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/ha_tests.py (original)
+++ qpid/trunk/qpid/cpp/src/tests/ha_tests.py Tue Nov 12 16:58:52 2013
@@ -30,19 +30,10 @@ from qpidtoollibs import BrokerAgent, Ev
 
 log = getLogger(__name__)
 
-def grep(filename, regexp):
-    for line in open(filename).readlines():
-        if (regexp.search(line)): return True
-    return False
 
 class HaBrokerTest(BrokerTest):
     """Base class for HA broker tests"""
 
-    def assert_log_no_errors(self, broker):
-        log = broker.get_log()
-        if grep(log, re.compile("] error|] critical")):
-            self.fail("Errors in log file %s"%(log))
-
 class ReplicationTests(HaBrokerTest):
     """Correctness tests for  HA replication."""
 
@@ -838,7 +829,7 @@ acl deny all all
         # It is possible for the backup to attempt to subscribe after the queue
         # is deleted. This is not an error, but is logged as an error on the primary.
         # The backup does not log this as an error so we only check the backup log for errors.
-        self.assert_log_no_errors(cluster[1])
+        cluster[1].assert_log_clean()
 
     def test_missed_recreate(self):
         """If a queue or exchange is destroyed and one with the same name re-created
@@ -1003,6 +994,32 @@ class LongTests(HaBrokerTest):
             dead = filter(lambda b: not b.is_running(), brokers)
             if dead: raise Exception("Brokers not running: %s"%dead)
 
+    def test_tx_send_receive(self):
+        brokers = HaCluster(self, 3)
+        sender = self.popen(
+            ["qpid-send",
+             "--broker", brokers[0].host_port(),
+             "--address", "q;{create:always}",
+             "--messages=1000",
+             "--tx=10"
+             ])
+        receiver = self.popen(
+            ["qpid-receive",
+             "--broker", brokers[0].host_port(),
+             "--address", "q;{create:always}",
+             "--messages=990",
+             "--timeout=10",
+             "--tx=10"
+             ])
+        self.assertEqual(sender.wait(), 0)
+        self.assertEqual(receiver.wait(), 0)
+        expect = [long(i) for i in range(991, 1001)]
+        sn = lambda m: m.properties["sn"]
+        brokers[0].assert_browse("q", expect, transform=sn)
+        brokers[1].assert_browse_backup("q", expect, transform=sn)
+        brokers[2].assert_browse_backup("q", expect, transform=sn)
+
+
     def test_qmf_order(self):
         """QPID 4402:  HA QMF events can be out of order.
         This test mimics the test described in the JIRA. Two threads repeatedly
@@ -1352,12 +1369,14 @@ class TransactionTests(HaBrokerTest):
     def assert_tx_clean(self, b):
         """Verify that there are no transaction artifacts
         (exchanges, queues, subscriptions) on b."""
-        queues=[]
-        def txq(): queues = b.agent().tx_queues(); return not queues
-        assert retry(txq), "%s: unexpected %s"%(b,queues)
-        subs=[]
-        def txs(): subs = self.tx_subscriptions(b); return not subs
-        assert retry(txs), "%s: unexpected %s"%(b,subs)
+        class FunctionCache:    # Call a function and cache the result.
+            def __init__(self, f): self.f, self.value = f, None
+            def __call__(self): self.value = self.f(); return self.value
+
+        txq= FunctionCache(b.agent().tx_queues)
+        assert retry(lambda: not txq()), "%s: unexpected %s"%(b, txq.value)
+        txsub = FunctionCache(lambda: self.tx_subscriptions(b))
+        assert retry(lambda: not txsub()), "%s: unexpected %s"%(b, txsub.value)
         # TODO aconway 2013-10-15: TX exchanges don't show up in management.
 
     def assert_simple_commit_outcome(self, b, tx_queues):
@@ -1462,18 +1481,22 @@ class TransactionTests(HaBrokerTest):
         self.assertEqual([1,1,1], [len(b.agent().tx_queues()) for b in cluster])
         cluster[1].kill(final=False)
         s.send("b")
-        self.assert_commit_raises(tx)
-        for b in [cluster[0],cluster[2]]: self.assert_tx_clean(b)
+        tx.commit()
+        tx.close()
+        for b in [cluster[0],cluster[2]]:
+            self.assert_tx_clean(b)
+            b.assert_browse_backup("q", ["a","b"], msg=b)
         # Joining
         tx = cluster[0].connect().session(transactional=True)
         s = tx.sender("q;{create:always}")
         s.send("foo")
-        cluster.restart(1)
+        cluster.restart(1)      # Not a part of the current transaction.
         tx.commit()
         tx.close()
         for b in cluster: self.assert_tx_clean(b)
         # The new member is not in the tx but  receives the results normal replication.
-        for b in cluster: b.assert_browse_backup("q", ["foo"], msg=b)
+        for b in cluster: b.assert_browse_backup("q", ["a", "b", "foo"], msg=b)
+        # FIXME aconway 2013-11-07: assert_log_clean
 
     def test_tx_block_threads(self):
         """Verify that TXs blocked in commit don't deadlock."""



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org