You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kp...@apache.org on 2013/09/20 20:59:50 UTC

svn commit: r1525101 [8/21] - in /qpid/branches/linearstore/qpid: ./ bin/ cpp/ cpp/bindings/ cpp/bindings/qmf/ cpp/bindings/qmf/python/ cpp/bindings/qmf/ruby/ cpp/bindings/qmf/tests/ cpp/bindings/qmf2/ cpp/bindings/qmf2/examples/cpp/ cpp/bindings/qmf2/...

Modified: qpid/branches/linearstore/qpid/cpp/src/qpid/messaging/amqp/ReceiverContext.h
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/messaging/amqp/ReceiverContext.h?rev=1525101&r1=1525100&r2=1525101&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/messaging/amqp/ReceiverContext.h (original)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/messaging/amqp/ReceiverContext.h Fri Sep 20 18:59:30 2013
@@ -46,6 +46,7 @@ class ReceiverContext
   public:
     ReceiverContext(pn_session_t* session, const std::string& name, const qpid::messaging::Address& source);
     ~ReceiverContext();
+    void reset(pn_session_t* session);
     void setCapacity(uint32_t);
     uint32_t getCapacity();
     uint32_t getAvailable();
@@ -56,7 +57,7 @@ class ReceiverContext
     const std::string& getSource() const;
     bool isClosed() const;
     void configure();
-    void verify(pn_terminus_t*);
+    void verify();
     Address getAddress() const;
   private:
     friend class ConnectionContext;

Modified: qpid/branches/linearstore/qpid/cpp/src/qpid/messaging/amqp/ReceiverHandle.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/messaging/amqp/ReceiverHandle.cpp?rev=1525101&r1=1525100&r2=1525101&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/messaging/amqp/ReceiverHandle.cpp (original)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/messaging/amqp/ReceiverHandle.cpp Fri Sep 20 18:59:30 2013
@@ -84,7 +84,7 @@ uint32_t ReceiverHandle::getUnsettled()
 
 void ReceiverHandle::close()
 {
-    session->closeReceiver(getName());
+    connection->detach(session, receiver);
 }
 
 const std::string& ReceiverHandle::getName() const

Modified: qpid/branches/linearstore/qpid/cpp/src/qpid/messaging/amqp/Sasl.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/messaging/amqp/Sasl.cpp?rev=1525101&r1=1525100&r2=1525101&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/messaging/amqp/Sasl.cpp (original)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/messaging/amqp/Sasl.cpp Fri Sep 20 18:59:30 2013
@@ -93,7 +93,7 @@ void Sasl::mechanisms(const std::string&
         mechanisms = offered;
     }
 
-    if (sasl->start(mechanisms, response)) {
+    if (sasl->start(mechanisms, response, context.getTransportSecuritySettings())) {
         init(sasl->getMechanism(), &response, hostname.size() ? &hostname : 0);
     } else {
         init(sasl->getMechanism(), 0, hostname.size() ? &hostname : 0);

Modified: qpid/branches/linearstore/qpid/cpp/src/qpid/messaging/amqp/SenderContext.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/messaging/amqp/SenderContext.cpp?rev=1525101&r1=1525100&r2=1525101&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/messaging/amqp/SenderContext.cpp (original)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/messaging/amqp/SenderContext.cpp Fri Sep 20 18:59:30 2013
@@ -42,7 +42,7 @@ SenderContext::SenderContext(pn_session_
   : name(n),
     address(a),
     helper(address),
-    sender(pn_sender(session, n.c_str())), capacity(1000) {}
+    sender(pn_sender(session, n.c_str())), capacity(1000), unreliable(helper.isUnreliable()) {}
 
 SenderContext::~SenderContext()
 {
@@ -67,7 +67,7 @@ uint32_t SenderContext::getCapacity()
 
 uint32_t SenderContext::getUnsettled()
 {
-    return processUnsettled();
+    return processUnsettled(true/*always allow retrieval of unsettled count, even if link has failed*/);
 }
 
 const std::string& SenderContext::getName() const
@@ -80,16 +80,26 @@ const std::string& SenderContext::getTar
     return address.getName();
 }
 
-SenderContext::Delivery* SenderContext::send(const qpid::messaging::Message& message)
+bool SenderContext::send(const qpid::messaging::Message& message, SenderContext::Delivery** out)
 {
-    if (processUnsettled() < capacity && pn_link_credit(sender)) {
-        deliveries.push_back(Delivery(nextId++));
-        Delivery& delivery = deliveries.back();
-        delivery.encode(MessageImplAccess::get(message), address);
-        delivery.send(sender);
-        return &delivery;
+    resend();//if there are any messages needing to be resent at the front of the queue, send them first
+    if (processUnsettled(false) < capacity && pn_link_credit(sender)) {
+        if (unreliable) {
+            Delivery delivery(nextId++);
+            delivery.encode(MessageImplAccess::get(message), address);
+            delivery.send(sender, unreliable);
+            *out = 0;
+            return true;
+        } else {
+            deliveries.push_back(Delivery(nextId++));
+            Delivery& delivery = deliveries.back();
+            delivery.encode(MessageImplAccess::get(message), address);
+            delivery.send(sender, unreliable);
+            *out = &delivery;
+            return true;
+        }
     } else {
-        return 0;
+        return false;
     }
 }
 
@@ -108,11 +118,13 @@ void SenderContext::check()
     }
 }
 
-uint32_t SenderContext::processUnsettled()
+uint32_t SenderContext::processUnsettled(bool silent)
 {
-    check();
+    if (!silent) {
+        check();
+    }
     //remove messages from front of deque once peer has confirmed receipt
-    while (!deliveries.empty() && deliveries.front().delivered()) {
+    while (!deliveries.empty() && deliveries.front().delivered() && !(pn_link_state(sender) & PN_REMOTE_CLOSED)) {
         deliveries.front().settle();
         deliveries.pop_front();
     }
@@ -122,6 +134,7 @@ namespace {
 const std::string X_AMQP("x-amqp-");
 const std::string X_AMQP_FIRST_ACQUIRER("x-amqp-first-acquirer");
 const std::string X_AMQP_DELIVERY_COUNT("x-amqp-delivery-count");
+const std::string X_AMQP_0_10_APP_ID("x-amqp-0-10.app-id");
 
 class HeaderAdapter : public qpid::amqp::MessageEncoder::Header
 {
@@ -341,7 +354,7 @@ class ApplicationPropertiesAdapter : pub
     {
         for (qpid::types::Variant::Map::const_iterator i = headers.begin(); i != headers.end(); ++i) {
             //strip out values with special keys as they are sent in standard fields
-            if (!startsWith(i->first, X_AMQP)) {
+            if (!startsWith(i->first, X_AMQP) || i->first == X_AMQP_0_10_APP_ID) {
                 qpid::amqp::CharSequence key(convert(i->first));
                 switch (i->second.getType()) {
                   case qpid::types::VAR_VOID:
@@ -413,7 +426,12 @@ bool changedSubject(const qpid::messagin
 
 }
 
-SenderContext::Delivery::Delivery(int32_t i) : id(i), token(0) {}
+SenderContext::Delivery::Delivery(int32_t i) : id(i), token(0), presettled(false) {}
+
+void SenderContext::Delivery::reset()
+{
+    token = 0;
+}
 
 void SenderContext::Delivery::encode(const qpid::messaging::MessageImpl& msg, const qpid::messaging::Address& address)
 {
@@ -440,7 +458,15 @@ void SenderContext::Delivery::encode(con
         PropertiesAdapter properties(msg, address.getSubject());
         ApplicationPropertiesAdapter applicationProperties(msg.getHeaders());
         //compute size:
-        encoded.resize(qpid::amqp::MessageEncoder::getEncodedSize(header, properties, applicationProperties, msg.getBytes()));
+        size_t contentSize = qpid::amqp::MessageEncoder::getEncodedSize(header)
+            + qpid::amqp::MessageEncoder::getEncodedSize(properties)
+            + qpid::amqp::MessageEncoder::getEncodedSize(applicationProperties);
+        if (msg.getContent().isVoid()) {
+            contentSize += qpid::amqp::MessageEncoder::getEncodedSizeForContent(msg.getBytes());
+        } else {
+            contentSize += qpid::amqp::MessageEncoder::getEncodedSizeForValue(msg.getContent()) + 3/*descriptor*/;
+        }
+        encoded.resize(contentSize);
         QPID_LOG(debug, "Sending message, buffer is " << encoded.getSize() << " bytes")
         qpid::amqp::MessageEncoder encoder(encoded.getData(), encoded.getSize());
         //write header:
@@ -451,7 +477,12 @@ void SenderContext::Delivery::encode(con
         //write application-properties
         encoder.writeApplicationProperties(applicationProperties);
         //write body
-        if (msg.getBytes().size()) encoder.writeBinary(msg.getBytes(), &qpid::amqp::message::DATA);//structured content not yet directly supported
+        if (!msg.getContent().isVoid()) {
+            //write as AmqpValue
+            encoder.writeValue(msg.getContent(), &qpid::amqp::message::AMQP_VALUE);
+        } else if (msg.getBytes().size()) {
+            encoder.writeBinary(msg.getBytes(), &qpid::amqp::message::DATA);//structured content not yet directly supported
+        }
         if (encoder.getPosition() < encoded.getSize()) {
             QPID_LOG(debug, "Trimming buffer from " << encoded.getSize() << " to " << encoder.getPosition());
             encoded.trim(encoder.getPosition());
@@ -459,19 +490,27 @@ void SenderContext::Delivery::encode(con
         //write footer (no annotations yet supported)
     }
 }
-void SenderContext::Delivery::send(pn_link_t* sender)
+void SenderContext::Delivery::send(pn_link_t* sender, bool unreliable)
 {
     pn_delivery_tag_t tag;
     tag.size = sizeof(id);
     tag.bytes = reinterpret_cast<const char*>(&id);
     token = pn_delivery(sender, tag);
     pn_link_send(sender, encoded.getData(), encoded.getSize());
+    if (unreliable) {
+        pn_delivery_settle(token);
+        presettled = true;
+    }
     pn_link_advance(sender);
 }
 
+bool SenderContext::Delivery::sent() const
+{
+    return presettled || token;
+}
 bool SenderContext::Delivery::delivered()
 {
-    if (pn_delivery_remote_state(token) || pn_delivery_settled(token)) {
+    if (presettled || (token && (pn_delivery_remote_state(token) || pn_delivery_settled(token)))) {
         //TODO: need a better means for signalling outcomes other than accepted
         if (rejected()) {
             QPID_LOG(warning, "delivery " << id << " was rejected by peer");
@@ -495,8 +534,19 @@ void SenderContext::Delivery::settle()
 {
     pn_delivery_settle(token);
 }
-void SenderContext::verify(pn_terminus_t* target)
+void SenderContext::verify()
 {
+    pn_terminus_t* target = pn_link_remote_target(sender);
+    if (!pn_terminus_get_address(target)) {
+        std::string msg("No such target : ");
+        msg += getTarget();
+        QPID_LOG(debug, msg);
+        throw qpid::messaging::NotFound(msg);
+    } else if (AddressImpl::isTemporary(address)) {
+        address.setName(pn_terminus_get_address(target));
+        QPID_LOG(debug, "Dynamic target name set to " << address.getName());
+    }
+
     helper.checkAssertion(target, AddressHelper::FOR_SENDER);
 }
 void SenderContext::configure()
@@ -505,12 +555,18 @@ void SenderContext::configure()
 }
 void SenderContext::configure(pn_terminus_t* target)
 {
-    helper.configure(target, AddressHelper::FOR_SENDER);
+    helper.configure(sender, target, AddressHelper::FOR_SENDER);
+    std::string option;
+    if (helper.getLinkSource(option)) {
+        pn_terminus_set_address(pn_link_source(sender), option.c_str());
+    } else {
+        pn_terminus_set_address(pn_link_source(sender), pn_terminus_get_address(pn_link_target(sender)));
+    }
 }
 
 bool SenderContext::settled()
 {
-    return processUnsettled() == 0;
+    return processUnsettled(false) == 0;
 }
 
 Address SenderContext::getAddress() const
@@ -518,4 +574,22 @@ Address SenderContext::getAddress() cons
     return address;
 }
 
+
+void SenderContext::reset(pn_session_t* session)
+{
+    sender = pn_sender(session, name.c_str());
+    configure();
+
+    for (Deliveries::iterator i = deliveries.begin(); i != deliveries.end(); ++i) {
+        i->reset();
+    }
+}
+
+void SenderContext::resend()
+{
+    for (Deliveries::iterator i = deliveries.begin(); i != deliveries.end() && pn_link_credit(sender) && !i->sent(); ++i) {
+        i->send(sender, false/*only resend reliable transfers*/);
+    }
+}
+
 }}} // namespace qpid::messaging::amqp

Modified: qpid/branches/linearstore/qpid/cpp/src/qpid/messaging/amqp/SenderContext.h
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/messaging/amqp/SenderContext.h?rev=1525101&r1=1525100&r2=1525101&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/messaging/amqp/SenderContext.h (original)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/messaging/amqp/SenderContext.h Fri Sep 20 18:59:30 2013
@@ -52,28 +52,32 @@ class SenderContext
       public:
         Delivery(int32_t id);
         void encode(const qpid::messaging::MessageImpl& message, const qpid::messaging::Address&);
-        void send(pn_link_t*);
+        void send(pn_link_t*, bool unreliable);
         bool delivered();
         bool accepted();
         bool rejected();
         void settle();
+        void reset();
+        bool sent() const;
       private:
         int32_t id;
         pn_delivery_t* token;
         EncodedMessage encoded;
+        bool presettled;
     };
 
     SenderContext(pn_session_t* session, const std::string& name, const qpid::messaging::Address& target);
     ~SenderContext();
+    void reset(pn_session_t* session);
     void close();
     void setCapacity(uint32_t);
     uint32_t getCapacity();
     uint32_t getUnsettled();
     const std::string& getName() const;
     const std::string& getTarget() const;
-    Delivery* send(const qpid::messaging::Message& message);
+    bool send(const qpid::messaging::Message& message, Delivery**);
     void configure();
-    void verify(pn_terminus_t*);
+    void verify();
     void check();
     bool settled();
     Address getAddress() const;
@@ -88,9 +92,11 @@ class SenderContext
     int32_t nextId;
     Deliveries deliveries;
     uint32_t capacity;
+    bool unreliable;
 
-    uint32_t processUnsettled();
+    uint32_t processUnsettled(bool silent);
     void configure(pn_terminus_t*);
+    void resend();
 };
 }}} // namespace qpid::messaging::amqp
 

Modified: qpid/branches/linearstore/qpid/cpp/src/qpid/messaging/amqp/SenderHandle.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/messaging/amqp/SenderHandle.cpp?rev=1525101&r1=1525100&r2=1525101&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/messaging/amqp/SenderHandle.cpp (original)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/messaging/amqp/SenderHandle.cpp Fri Sep 20 18:59:30 2013
@@ -44,7 +44,7 @@ void SenderHandle::send(const Message& m
 
 void SenderHandle::close()
 {
-    session->closeSender(getName());
+    connection->detach(session, sender);
 }
 
 void SenderHandle::setCapacity(uint32_t capacity)

Modified: qpid/branches/linearstore/qpid/cpp/src/qpid/messaging/amqp/SessionContext.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/messaging/amqp/SessionContext.cpp?rev=1525101&r1=1525100&r2=1525101&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/messaging/amqp/SessionContext.cpp (original)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/messaging/amqp/SessionContext.cpp Fri Sep 20 18:59:30 2013
@@ -79,14 +79,14 @@ boost::shared_ptr<ReceiverContext> Sessi
     }
 }
 
-void SessionContext::closeReceiver(const std::string&)
+void SessionContext::removeReceiver(const std::string& n)
 {
-
+    receivers.erase(n);
 }
 
-void SessionContext::closeSender(const std::string&)
+void SessionContext::removeSender(const std::string& n)
 {
-
+    senders.erase(n);
 }
 
 boost::shared_ptr<ReceiverContext> SessionContext::nextReceiver(qpid::messaging::Duration /*timeout*/)
@@ -153,4 +153,25 @@ bool SessionContext::settled()
     }
     return result;
 }
+
+void SessionContext::setName(const std::string& n)
+{
+    name = n;
+}
+std::string SessionContext::getName() const
+{
+    return name;
+}
+
+void SessionContext::reset(pn_connection_t* connection)
+{
+    session = pn_session(connection);
+    unacked.clear();
+    for (SessionContext::SenderMap::iterator i = senders.begin(); i != senders.end(); ++i) {
+        i->second->reset(session);
+    }
+    for (SessionContext::ReceiverMap::iterator i = receivers.begin(); i != receivers.end(); ++i) {
+        i->second->reset(session);
+    }
+}
 }}} // namespace qpid::messaging::amqp

Modified: qpid/branches/linearstore/qpid/cpp/src/qpid/messaging/amqp/SessionContext.h
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/messaging/amqp/SessionContext.h?rev=1525101&r1=1525100&r2=1525101&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/messaging/amqp/SessionContext.h (original)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/messaging/amqp/SessionContext.h Fri Sep 20 18:59:30 2013
@@ -50,16 +50,19 @@ class SessionContext
   public:
     SessionContext(pn_connection_t*);
     ~SessionContext();
+    void reset(pn_connection_t*);
     boost::shared_ptr<SenderContext> createSender(const qpid::messaging::Address& address);
     boost::shared_ptr<ReceiverContext> createReceiver(const qpid::messaging::Address& address);
     boost::shared_ptr<SenderContext> getSender(const std::string& name) const;
     boost::shared_ptr<ReceiverContext> getReceiver(const std::string& name) const;
-    void closeReceiver(const std::string&);
-    void closeSender(const std::string&);
+    void removeReceiver(const std::string&);
+    void removeSender(const std::string&);
     boost::shared_ptr<ReceiverContext> nextReceiver(qpid::messaging::Duration timeout);
     uint32_t getReceivable();
     uint32_t getUnsettledAcks();
     bool settled();
+    void setName(const std::string&);
+    std::string getName() const;
   private:
     friend class ConnectionContext;
     typedef std::map<std::string, boost::shared_ptr<SenderContext> > SenderMap;
@@ -70,6 +73,7 @@ class SessionContext
     ReceiverMap receivers;
     DeliveryMap unacked;
     qpid::framing::SequenceNumber next;
+    std::string name;
 
     qpid::framing::SequenceNumber record(pn_delivery_t*);
     void acknowledge();

Modified: qpid/branches/linearstore/qpid/cpp/src/qpid/messaging/amqp/SessionHandle.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/messaging/amqp/SessionHandle.cpp?rev=1525101&r1=1525100&r2=1525101&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/messaging/amqp/SessionHandle.cpp (original)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/messaging/amqp/SessionHandle.cpp Fri Sep 20 18:59:30 2013
@@ -84,15 +84,25 @@ void SessionHandle::sync(bool /*block*/)
 qpid::messaging::Sender SessionHandle::createSender(const qpid::messaging::Address& address)
 {
     boost::shared_ptr<SenderContext> sender = session->createSender(address);
-    connection->attach(session, sender);
-    return qpid::messaging::Sender(new SenderHandle(connection, session, sender));
+    try {
+        connection->attach(session, sender);
+        return qpid::messaging::Sender(new SenderHandle(connection, session, sender));
+    } catch (...) {
+        session->removeSender(sender->getName());
+        throw;
+    }
 }
 
 qpid::messaging::Receiver SessionHandle::createReceiver(const qpid::messaging::Address& address)
 {
     boost::shared_ptr<ReceiverContext> receiver = session->createReceiver(address);
-    connection->attach(session, receiver);
-    return qpid::messaging::Receiver(new ReceiverHandle(connection, session, receiver));
+    try {
+        connection->attach(session, receiver);
+        return qpid::messaging::Receiver(new ReceiverHandle(connection, session, receiver));
+    } catch (...) {
+        session->removeReceiver(receiver->getName());
+        throw;
+    }
 }
 
 bool SessionHandle::nextReceiver(Receiver& receiver, Duration timeout)

Modified: qpid/branches/linearstore/qpid/cpp/src/qpid/messaging/amqp/SslTransport.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/messaging/amqp/SslTransport.cpp?rev=1525101&r1=1525100&r2=1525101&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/messaging/amqp/SslTransport.cpp (original)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/messaging/amqp/SslTransport.cpp Fri Sep 20 18:59:30 2013
@@ -157,4 +157,11 @@ void SslTransport::activateOutput()
     if (aio) aio->notifyPendingWrite();
 }
 
+const qpid::sys::SecuritySettings* SslTransport::getSecuritySettings()
+{
+    securitySettings.ssf = socket.getKeyLen();
+    securitySettings.authid = "dummy";//set to non-empty string to enable external authentication
+    return &securitySettings;
+}
+
 }}} // namespace qpid::messaging::amqp

Modified: qpid/branches/linearstore/qpid/cpp/src/qpid/messaging/amqp/SslTransport.h
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/messaging/amqp/SslTransport.h?rev=1525101&r1=1525100&r2=1525101&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/messaging/amqp/SslTransport.h (original)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/messaging/amqp/SslTransport.h Fri Sep 20 18:59:30 2013
@@ -23,6 +23,7 @@
  */
 #include "qpid/messaging/amqp/Transport.h"
 #include "qpid/sys/Mutex.h"
+#include "qpid/sys/SecuritySettings.h"
 #include "qpid/sys/ssl/SslSocket.h"
 #include <boost/shared_ptr.hpp>
 
@@ -50,6 +51,7 @@ class SslTransport : public Transport
     void abort();
     void connectionEstablished() {};
     void close();
+    const qpid::sys::SecuritySettings* getSecuritySettings();
 
   private:
     qpid::sys::ssl::SslSocket socket;
@@ -59,6 +61,7 @@ class SslTransport : public Transport
     boost::shared_ptr<qpid::sys::Poller> poller;
     bool closed;
     std::string id;
+    qpid::sys::SecuritySettings securitySettings;
 
     void connected(const qpid::sys::Socket&);
     void failed(const std::string& msg);

Modified: qpid/branches/linearstore/qpid/cpp/src/qpid/messaging/amqp/TcpTransport.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/messaging/amqp/TcpTransport.cpp?rev=1525101&r1=1525100&r2=1525101&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/messaging/amqp/TcpTransport.cpp (original)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/messaging/amqp/TcpTransport.cpp Fri Sep 20 18:59:30 2013
@@ -159,4 +159,8 @@ void TcpTransport::activateOutput()
     if (aio) aio->notifyPendingWrite();
 }
 
+const qpid::sys::SecuritySettings* TcpTransport::getSecuritySettings()
+{
+    return 0;
+}
 }}} // namespace qpid::messaging::amqp

Modified: qpid/branches/linearstore/qpid/cpp/src/qpid/messaging/amqp/TcpTransport.h
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/messaging/amqp/TcpTransport.h?rev=1525101&r1=1525100&r2=1525101&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/messaging/amqp/TcpTransport.h (original)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/messaging/amqp/TcpTransport.h Fri Sep 20 18:59:30 2013
@@ -50,6 +50,7 @@ class TcpTransport : public Transport
     void abort();
     void connectionEstablished() {};
     void close();
+    const qpid::sys::SecuritySettings* getSecuritySettings();
 
   private:
     boost::scoped_ptr<qpid::sys::Socket> socket;

Modified: qpid/branches/linearstore/qpid/cpp/src/qpid/messaging/amqp/Transport.h
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/messaging/amqp/Transport.h?rev=1525101&r1=1525100&r2=1525101&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/messaging/amqp/Transport.h (original)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/messaging/amqp/Transport.h Fri Sep 20 18:59:30 2013
@@ -21,12 +21,14 @@
  * under the License.
  *
  */
+#include "qpid/CommonImportExport.h"
 #include "qpid/sys/OutputControl.h"
 #include <boost/shared_ptr.hpp>
 
 namespace qpid {
 namespace sys {
 class Poller;
+struct SecuritySettings;
 }
 namespace messaging {
 namespace amqp {
@@ -38,10 +40,11 @@ class Transport : public qpid::sys::Outp
     virtual ~Transport() {}
     virtual void connect(const std::string& host, const std::string& port) = 0;
     virtual void close() = 0;
+    virtual const qpid::sys::SecuritySettings* getSecuritySettings() = 0;
 
     typedef Transport* Factory(TransportContext&, boost::shared_ptr<qpid::sys::Poller>);
-    static Transport* create(const std::string& name, TransportContext&, boost::shared_ptr<qpid::sys::Poller>);
-    static void add(const std::string& name, Factory* factory);
+    QPID_COMMON_EXTERN static Transport* create(const std::string& name, TransportContext&, boost::shared_ptr<qpid::sys::Poller>);
+    QPID_COMMON_EXTERN static void add(const std::string& name, Factory* factory);
 };
 }}} // namespace qpid::messaging::amqp
 

Modified: qpid/branches/linearstore/qpid/cpp/src/qpid/store/CMakeLists.txt
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/store/CMakeLists.txt?rev=1525101&r1=1525100&r2=1525101&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/store/CMakeLists.txt (original)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/store/CMakeLists.txt Fri Sep 20 18:59:30 2013
@@ -32,7 +32,7 @@ set (store_SOURCES
      MessageStorePlugin.cpp
     )
 add_library (store MODULE ${store_SOURCES})
-target_link_libraries (store qpidbroker ${Boost_PROGRAM_OPTIONS_LIBRARY})
+target_link_libraries (store qpidbroker qpidcommon ${Boost_PROGRAM_OPTIONS_LIBRARY})
 if (CMAKE_COMPILER_IS_GNUCXX)
   set (GCC_CATCH_UNDEFINED "-Wl,--no-undefined")
   # gcc on SunOS uses native linker whose "-z defs" is too fussy

Modified: qpid/branches/linearstore/qpid/cpp/src/qpid/store/ms-clfs/MSSqlClfsProvider.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/store/ms-clfs/MSSqlClfsProvider.cpp?rev=1525101&r1=1525100&r2=1525101&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/store/ms-clfs/MSSqlClfsProvider.cpp (original)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/store/ms-clfs/MSSqlClfsProvider.cpp Fri Sep 20 18:59:30 2013
@@ -34,7 +34,6 @@
 #include <qpid/store/StorageProvider.h>
 #include <qpid/sys/Mutex.h>
 #include <boost/foreach.hpp>
-#include <boost/make_shared.hpp>
 
 // From ms-sql...
 #include "BlobAdapter.h"
@@ -356,7 +355,7 @@ MSSqlClfsProvider::finalizeMe()
 MSSqlClfsProvider::MSSqlClfsProvider()
     : options("MS SQL/CLFS Provider options")
 {
-    transactions = boost::make_shared<TransactionLog>();
+    transactions.reset(new TransactionLog());
 }
 
 MSSqlClfsProvider::~MSSqlClfsProvider()

Modified: qpid/branches/linearstore/qpid/cpp/src/qpid/types/Uuid.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/types/Uuid.cpp?rev=1525101&r1=1525100&r2=1525101&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/types/Uuid.cpp (original)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/types/Uuid.cpp Fri Sep 20 18:59:30 2013
@@ -52,6 +52,11 @@ Uuid::Uuid(const unsigned char* uuid)
     ::memcpy(bytes, uuid, Uuid::SIZE);
 }
 
+Uuid::Uuid(const char* uuid)
+{
+    ::memcpy(bytes, uuid, Uuid::SIZE);
+}
+
 Uuid& Uuid::operator=(const Uuid& other)
 {
     if (this == &other) return *this;

Modified: qpid/branches/linearstore/qpid/cpp/src/qpid/types/Variant.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/types/Variant.cpp?rev=1525101&r1=1525100&r2=1525101&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/types/Variant.cpp (original)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/types/Variant.cpp Fri Sep 20 18:59:30 2013
@@ -36,6 +36,7 @@ const std::string PREFIX("invalid conver
 }
 
 InvalidConversion::InvalidConversion(const std::string& msg) : Exception(PREFIX + msg) {}
+InvalidConversion::~InvalidConversion() throw() {}
 
 class VariantImpl
 {

Modified: qpid/branches/linearstore/qpid/cpp/src/qpid/xml/XmlExchange.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/xml/XmlExchange.cpp?rev=1525101&r1=1525100&r2=1525101&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/xml/XmlExchange.cpp (original)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/xml/XmlExchange.cpp Fri Sep 20 18:59:30 2013
@@ -372,6 +372,8 @@ bool XmlExchange::isBound(Queue::shared_
 
 XmlExchange::~XmlExchange() 
 {
+    if (mgmtExchange != 0)
+        mgmtExchange->debugStats("destroying");
     bindingsMap.clear();
 }
 

Modified: qpid/branches/linearstore/qpid/cpp/src/rdma.cmake
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/rdma.cmake?rev=1525101&r1=1525100&r2=1525101&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/rdma.cmake (original)
+++ qpid/branches/linearstore/qpid/cpp/src/rdma.cmake Fri Sep 20 18:59:30 2013
@@ -78,7 +78,7 @@ if (BUILD_RDMA)
            COMPONENT ${QPID_COMPONENT_COMMON})
 
   add_library (rdma MODULE qpid/sys/RdmaIOPlugin.cpp)
-  target_link_libraries (rdma qpidbroker rdmawrap)
+  target_link_libraries (rdma qpidbroker qpidcommon rdmawrap)
   set_target_properties (rdma PROPERTIES
                          COMPILE_DEFINITIONS _IN_QPID_BROKER
                          LINK_FLAGS "${CATCH_UNDEFINED}"
@@ -94,7 +94,7 @@ if (BUILD_RDMA)
            COMPONENT ${QPID_COMPONENT_BROKER})
 
   add_library (rdmaconnector MODULE qpid/client/RdmaConnector.cpp)
-  target_link_libraries (rdmaconnector qpidclient rdmawrap)
+  target_link_libraries (rdmaconnector qpidclient qpidcommon rdmawrap)
   set_target_properties (rdmaconnector PROPERTIES
                          LINK_FLAGS "${CATCH_UNDEFINED}"
                          PREFIX "")

Propchange: qpid/branches/linearstore/qpid/cpp/src/tests/
------------------------------------------------------------------------------
  Merged /qpid/trunk/qpid/cpp/src/tests:r1501885-1525056

Modified: qpid/branches/linearstore/qpid/cpp/src/tests/BrokerFixture.h
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/tests/BrokerFixture.h?rev=1525101&r1=1525100&r2=1525101&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/tests/BrokerFixture.h (original)
+++ qpid/branches/linearstore/qpid/cpp/src/tests/BrokerFixture.h Fri Sep 20 18:59:30 2013
@@ -42,11 +42,45 @@ namespace tests {
 struct  BrokerFixture : private boost::noncopyable {
     typedef qpid::broker::Broker Broker;
     typedef boost::intrusive_ptr<Broker> BrokerPtr;
+    typedef std::vector<std::string> Args;
 
     BrokerPtr broker;
+    uint16_t port;
     qpid::sys::Thread brokerThread;
 
-    BrokerFixture(Broker::Options opts=Broker::Options(), bool enableMgmt=false) {
+    BrokerFixture(const Args& args=Args(), const Broker::Options& opts=Broker::Options(),
+                  bool isExternalPort_=false, uint16_t externalPort_=0)
+    {
+        init(args, opts, isExternalPort_, externalPort_);
+    }
+
+    BrokerFixture(const Broker::Options& opts,
+                  bool isExternalPort_=false, uint16_t externalPort_=0)
+    {
+        init(Args(), opts, isExternalPort_, externalPort_);
+    }
+
+    void shutdownBroker() {
+        if (broker) {
+            broker->shutdown();
+            brokerThread.join();
+            broker = BrokerPtr();
+        }
+    }
+
+    ~BrokerFixture() {  shutdownBroker(); }
+
+    /** Open a connection to the broker. */
+    void open(qpid::client::Connection& c) {
+        c.open("localhost", getPort());
+    }
+
+    uint16_t getPort() { return port; }
+
+  private:
+    void init(const Args& args, Broker::Options opts,
+              bool isExternalPort=false, uint16_t externalPort=0)
+    {
         // Keep the tests quiet unless logging env. vars have been set by user.
         if (!::getenv("QPID_LOG_ENABLE") && !::getenv("QPID_TRACE")) {
             qpid::log::Options logOpts;
@@ -55,38 +89,28 @@ struct  BrokerFixture : private boost::n
             logOpts.selectors.push_back("error+");
             qpid::log::Logger::instance().configure(logOpts);
         }
+        // Default options, may be over-ridden when we parse args.
         opts.port=0;
         opts.listenInterfaces.push_back("127.0.0.1");
-        // Management doesn't play well with multiple in-process brokers.
-        opts.enableMgmt=enableMgmt;
         opts.workerThreads=1;
         opts.dataDir="";
         opts.auth=false;
+
+        // Argument parsing
+        std::vector<const char*> argv(args.size());
+        std::transform(args.begin(), args.end(), argv.begin(),
+                       boost::bind(&std::string::c_str, _1));
+        Plugin::addOptions(opts);
+        opts.parse(argv.size(), &argv[0]);
         broker = Broker::create(opts);
         // TODO aconway 2007-12-05: At one point BrokerFixture
         // tests could hang in Connection ctor if the following
         // line is removed. This may not be an issue anymore.
         broker->accept();
-        broker->getPort(qpid::broker::Broker::TCP_TRANSPORT);
+        if (isExternalPort) port = externalPort;
+        else port = broker->getPort(qpid::broker::Broker::TCP_TRANSPORT);
         brokerThread = qpid::sys::Thread(*broker);
     };
-
-    void shutdownBroker() {
-        if (broker) {
-            broker->shutdown();
-            brokerThread.join();
-            broker = BrokerPtr();
-        }
-    }
-
-    ~BrokerFixture() {  shutdownBroker(); }
-
-    /** Open a connection to the broker. */
-    void open(qpid::client::Connection& c) {
-        c.open("localhost", broker->getPort(qpid::broker::Broker::TCP_TRANSPORT));
-    }
-
-    uint16_t getPort() { return broker->getPort(qpid::broker::Broker::TCP_TRANSPORT); }
 };
 
 /** Connection that opens in its constructor */
@@ -125,8 +149,8 @@ template <class ConnectionType, class Se
 struct  SessionFixtureT : BrokerFixture, ClientT<ConnectionType,SessionType> {
 
     SessionFixtureT(Broker::Options opts=Broker::Options()) :
-        BrokerFixture(opts),
-        ClientT<ConnectionType,SessionType>(broker->getPort(qpid::broker::Broker::TCP_TRANSPORT))
+        BrokerFixture(BrokerFixture::Args(), opts),
+        ClientT<ConnectionType,SessionType>(getPort())
     {}
 
 };

Modified: qpid/branches/linearstore/qpid/cpp/src/tests/BrokerMgmtAgent.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/tests/BrokerMgmtAgent.cpp?rev=1525101&r1=1525100&r2=1525101&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/tests/BrokerMgmtAgent.cpp (original)
+++ qpid/branches/linearstore/qpid/cpp/src/tests/BrokerMgmtAgent.cpp Fri Sep 20 18:59:30 2013
@@ -63,6 +63,7 @@ class AgentFixture
                   qpid::broker::Broker::Options opts = qpid::broker::Broker::Options())
     {
         opts.enableMgmt=true;
+        opts.qmf1Support=!qmfV2;
         opts.qmf2Support=qmfV2;
         opts.mgmtPubInterval=pubInterval;
         mFix = new MessagingFixture(opts, true);

Modified: qpid/branches/linearstore/qpid/cpp/src/tests/CMakeLists.txt
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/tests/CMakeLists.txt?rev=1525101&r1=1525100&r2=1525101&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/tests/CMakeLists.txt (original)
+++ qpid/branches/linearstore/qpid/cpp/src/tests/CMakeLists.txt Fri Sep 20 18:59:30 2013
@@ -108,7 +108,6 @@ set(all_unit_tests
     ClientMessage
     ClientMessageTest
     ClientSessionTest
-    ConsoleTest
     DeliveryRecordTest
     DtxWorkRecordTest
     exception_test
@@ -147,6 +146,7 @@ set(all_unit_tests
     TimerTest
     TopicExchangeTest
     TxBufferTest
+    TransactionObserverTest
     Url
     Uuid
     Variant
@@ -171,7 +171,7 @@ add_executable (unit_test unit_test
                 ${actual_unit_tests} ${platform_test_additions})
 target_link_libraries (unit_test
                        ${qpid_test_boost_libs}
-                       qpidmessaging qpidbroker qmfconsole)
+                       qpidmessaging qpidtypes qpidbroker qpidclient qpidcommon "${Boost_PROGRAM_OPTIONS_LIBRARY}")
 set_target_properties (unit_test PROPERTIES COMPILE_DEFINITIONS _IN_QPID_BROKER)
 remember_location(unit_test)
 
@@ -191,105 +191,104 @@ endif (BUILD_SASL)
 # Other test programs
 #
 add_executable (qpid-perftest qpid-perftest.cpp ${platform_test_additions})
-target_link_libraries (qpid-perftest qpidclient)
+target_link_libraries (qpid-perftest qpidclient qpidcommon "${Boost_PROGRAM_OPTIONS_LIBRARY}")
 #qpid_perftest_SOURCES=qpid-perftest.cpp test_tools.h TestOptions.h ConnectionOptions.h
 remember_location(qpid-perftest)
 
 add_executable (qpid-txtest qpid-txtest.cpp ${platform_test_additions})
-target_link_libraries (qpid-txtest qpidclient)
+target_link_libraries (qpid-txtest qpidclient qpidcommon "${Boost_PROGRAM_OPTIONS_LIBRARY}")
 #qpid_txtest_SOURCES=qpid-txtest.cpp  TestOptions.h ConnectionOptions.h
 remember_location(qpid-txtest)
 
 add_executable (qpid-latency-test qpid-latency-test.cpp ${platform_test_additions})
-target_link_libraries (qpid-latency-test qpidclient)
+target_link_libraries (qpid-latency-test qpidclient qpidcommon "${Boost_PROGRAM_OPTIONS_LIBRARY}")
 #qpid_latencytest_SOURCES=qpid-latency-test.cpp TestOptions.h ConnectionOptions.h
 remember_location(qpid-latency-test)
 
 add_executable (echotest echotest.cpp ${platform_test_additions})
-target_link_libraries (echotest qpidclient)
+target_link_libraries (echotest qpidclient qpidcommon "${Boost_PROGRAM_OPTIONS_LIBRARY}")
 #echotest_SOURCES=echotest.cpp TestOptions.h ConnectionOptions.h
 remember_location(echotest)
 
 add_executable (qpid-client-test qpid-client-test.cpp ${platform_test_additions})
-target_link_libraries (qpid-client-test qpidclient)
+target_link_libraries (qpid-client-test qpidclient qpidcommon "${Boost_PROGRAM_OPTIONS_LIBRARY}")
 #qpid_client_test_SOURCES=qpid-client-test.cpp TestOptions.h ConnectionOptions.h
 remember_location(qpid-client-test)
 
 add_executable (qpid-topic-listener qpid-topic-listener.cpp ${platform_test_additions})
-target_link_libraries (qpid-topic-listener qpidclient)
+target_link_libraries (qpid-topic-listener qpidclient qpidcommon "${Boost_PROGRAM_OPTIONS_LIBRARY}")
 #qpid_topic_listener_SOURCES=qpid-topic-listener.cpp TestOptions.h ConnectionOptions.h
 remember_location(qpid-topic-listener)
 
 add_executable (qpid-topic-publisher qpid-topic-publisher.cpp ${platform_test_additions})
-target_link_libraries (qpid-topic-publisher qpidclient)
+target_link_libraries (qpid-topic-publisher qpidclient qpidcommon "${Boost_PROGRAM_OPTIONS_LIBRARY}")
 #qpid_topic_publisher_SOURCES=qpid-topic-publisher.cpp TestOptions.h ConnectionOptions.h
 remember_location(qpid-topic-publisher)
 
 add_executable (publish publish.cpp ${platform_test_additions})
-target_link_libraries (publish qpidclient)
+target_link_libraries (publish qpidclient qpidcommon "${Boost_PROGRAM_OPTIONS_LIBRARY}")
 #publish_SOURCES=publish.cpp TestOptions.h ConnectionOptions.h
 remember_location(publish)
 
 add_executable (consume consume.cpp ${platform_test_additions})
-target_link_libraries (consume qpidclient)
+target_link_libraries (consume qpidclient qpidcommon "${Boost_PROGRAM_OPTIONS_LIBRARY}")
 #consume_SOURCES=consume.cpp  TestOptions.h ConnectionOptions.h
 remember_location(consume)
 
 add_executable (header_test header_test.cpp ${platform_test_additions})
-target_link_libraries (header_test qpidclient)
+target_link_libraries (header_test qpidclient qpidcommon "${Boost_PROGRAM_OPTIONS_LIBRARY}")
 #header_test_SOURCES=header_test.cpp TestOptions.h ConnectionOptions.h
 remember_location(header_test)
 
 add_executable (declare_queues declare_queues.cpp ${platform_test_additions})
-target_link_libraries (declare_queues qpidclient)
+target_link_libraries (declare_queues qpidclient qpidcommon)
 remember_location(declare_queues)
 
 add_executable (replaying_sender replaying_sender.cpp ${platform_test_additions})
-target_link_libraries (replaying_sender qpidclient)
+target_link_libraries (replaying_sender qpidclient qpidcommon "${Boost_PROGRAM_OPTIONS_LIBRARY}")
 remember_location(replaying_sender)
 
 add_executable (resuming_receiver resuming_receiver.cpp ${platform_test_additions})
-target_link_libraries (resuming_receiver qpidclient)
+target_link_libraries (resuming_receiver qpidclient qpidcommon "${Boost_PROGRAM_OPTIONS_LIBRARY}")
 remember_location(resuming_receiver)
 
 add_executable (txshift txshift.cpp ${platform_test_additions})
-target_link_libraries (txshift qpidclient)
+target_link_libraries (txshift qpidclient qpidcommon "${Boost_PROGRAM_OPTIONS_LIBRARY}")
 #txshift_SOURCES=txshift.cpp  TestOptions.h ConnectionOptions.h
 remember_location(txshift)
 
 add_executable (txjob txjob.cpp ${platform_test_additions})
-target_link_libraries (txjob qpidclient)
+target_link_libraries (txjob qpidclient qpidcommon "${Boost_PROGRAM_OPTIONS_LIBRARY}")
 #txjob_SOURCES=txjob.cpp  TestOptions.h ConnectionOptions.h
 remember_location(txjob)
 
 add_executable (receiver receiver.cpp ${platform_test_additions})
-target_link_libraries (receiver qpidclient)
-#receiver_SOURCES=receiver.cpp  TestOptions.h ConnectionOptions.h
+target_link_libraries (receiver qpidclient qpidcommon "${Boost_PROGRAM_OPTIONS_LIBRARY}")
 remember_location(receiver)
 
+# This is bizarre - using both messaging and client libraries
 add_executable (sender sender.cpp Statistics.cpp ${platform_test_additions})
-target_link_libraries (sender qpidmessaging)
-#sender_SOURCES=sender.cpp  TestOptions.h ConnectionOptions.h
+target_link_libraries (sender qpidmessaging qpidtypes qpidclient qpidcommon "${Boost_PROGRAM_OPTIONS_LIBRARY}")
 remember_location(sender)
 
 add_executable (qpid-receive qpid-receive.cpp Statistics.cpp ${platform_test_additions})
-target_link_libraries (qpid-receive qpidmessaging)
+target_link_libraries (qpid-receive qpidmessaging qpidtypes qpidcommon "${Boost_PROGRAM_OPTIONS_LIBRARY}")
 remember_location(qpid-receive)
 
 add_executable (qpid-send qpid-send.cpp Statistics.cpp ${platform_test_additions})
-target_link_libraries (qpid-send qpidmessaging)
+target_link_libraries (qpid-send qpidmessaging qpidtypes qpidcommon "${Boost_PROGRAM_OPTIONS_LIBRARY}")
 remember_location(qpid-send)
 
 add_executable (qpid-ping qpid-ping.cpp ${platform_test_additions})
-target_link_libraries (qpid-ping qpidclient)
+target_link_libraries (qpid-ping qpidclient qpidcommon "${Boost_PROGRAM_OPTIONS_LIBRARY}")
 remember_location(qpid-ping)
 
 add_executable (datagen datagen.cpp ${platform_test_additions})
-target_link_libraries (datagen qpidclient)
+target_link_libraries (datagen qpidclient qpidcommon "${Boost_PROGRAM_OPTIONS_LIBRARY}")
 remember_location(datagen)
 
 add_executable (msg_group_test msg_group_test.cpp ${platform_test_additions})
-target_link_libraries (msg_group_test qpidmessaging)
+target_link_libraries (msg_group_test qpidmessaging qpidtypes qpidcommon "${Boost_PROGRAM_OPTIONS_LIBRARY}")
 remember_location(msg_group_test)
 
 if (BUILD_SASL)
@@ -357,6 +356,7 @@ if (PYTHON_EXECUTABLE)
   if (BUILD_AMQP)
     add_test (interlink_tests ${test_wrap} ${PYTHON_EXECUTABLE} ${CMAKE_CURRENT_SOURCE_DIR}/interlink_tests.py)
   endif (BUILD_AMQP)
+  add_test (swig_python_tests ${test_wrap} ${CMAKE_CURRENT_SOURCE_DIR}/swig_python_tests${test_script_suffix})
 endif (PYTHON_EXECUTABLE)
 add_test (ipv6_test ${shell} ${CMAKE_CURRENT_SOURCE_DIR}/ipv6_test${test_script_suffix})
 add_test (federation_tests ${shell} ${CMAKE_CURRENT_SOURCE_DIR}/run_federation_tests${test_script_suffix})
@@ -378,24 +378,7 @@ add_test (queue_redirect ${shell} ${CMAK
 
 add_library(test_store MODULE test_store.cpp)
 target_link_libraries (test_store qpidbroker qpidcommon)
-set_target_properties (test_store PROPERTIES
-                       COMPILE_DEFINITIONS _IN_QPID_BROKER
-                       PREFIX "")
+set_target_properties (test_store PROPERTIES PREFIX "" COMPILE_DEFINITIONS _IN_QPID_BROKER)
 
 
 add_library (dlclose_noop MODULE dlclose_noop.c)
-#libdlclose_noop_la_LDFLAGS = -module -rpath $(abs_builddir)
-
-#CLEANFILES+=valgrind.out *.log *.vglog* dummy_test $(unit_wrappers)
-#
-## Longer running stability tests, not run by default check: target.
-## Not run under valgrind, too slow
-#LONG_TESTS=fanout_perftest shared_perftest multiq_perftest topic_perftest
-#EXTRA_DIST+=$(LONG_TESTS) run_perftest
-#check-long:
-#	$(MAKE) check TESTS="start_broker $(LONG_TESTS) stop_broker" VALGRIND=
-
-#
-# legacystore
-#
-# add_subdirectory(legacystore)

Modified: qpid/branches/linearstore/qpid/cpp/src/tests/ClientSessionTest.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/tests/ClientSessionTest.cpp?rev=1525101&r1=1525100&r2=1525101&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/tests/ClientSessionTest.cpp (original)
+++ qpid/branches/linearstore/qpid/cpp/src/tests/ClientSessionTest.cpp Fri Sep 20 18:59:30 2013
@@ -37,6 +37,7 @@
 #include <boost/lexical_cast.hpp>
 #include <boost/bind.hpp>
 #include <boost/ptr_container/ptr_vector.hpp>
+#include <boost/format.hpp>
 
 #include <vector>
 

Modified: qpid/branches/linearstore/qpid/cpp/src/tests/ManagementTest.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/tests/ManagementTest.cpp?rev=1525101&r1=1525100&r2=1525101&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/tests/ManagementTest.cpp (original)
+++ qpid/branches/linearstore/qpid/cpp/src/tests/ManagementTest.cpp Fri Sep 20 18:59:30 2013
@@ -21,7 +21,6 @@
 
 #include "qpid/management/ManagementObject.h"
 #include "qpid/framing/Buffer.h"
-#include "qpid/console/ObjectId.h"
 #include "unit_test.h"
 
 namespace qpid {
@@ -93,32 +92,6 @@ QPID_AUTO_TEST_CASE(testObjectIdCreate) 
     BOOST_CHECK_EQUAL(oid.getV2Key(), "an-object-name");
 }
 
-QPID_AUTO_TEST_CASE(testConsoleObjectId) {
-    qpid::console::ObjectId oid1, oid2;
-
-    oid1.setValue(1, 2);
-    oid2.setValue(3, 4);
-
-    BOOST_CHECK(oid1 < oid2);
-    BOOST_CHECK(oid1 <= oid2);
-    BOOST_CHECK(oid2 > oid1);
-    BOOST_CHECK(oid2 >= oid1);
-    BOOST_CHECK(oid1 != oid2);
-    BOOST_CHECK(oid1 == oid1);
-
-    oid1.setValue(3, 6);
-    oid2.setValue(3, 4);
-
-    BOOST_CHECK(oid1 > oid2);
-    BOOST_CHECK(oid1 >= oid2);
-    BOOST_CHECK(oid2 < oid1);
-    BOOST_CHECK(oid2 <= oid1);
-    BOOST_CHECK(oid1 != oid2);
-
-    oid2.setValue(3, 6);
-    BOOST_CHECK(oid1 == oid2);
-}
-
 QPID_AUTO_TEST_SUITE_END()
 
 }} // namespace qpid::tests

Modified: qpid/branches/linearstore/qpid/cpp/src/tests/MessageReplayTracker.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/tests/MessageReplayTracker.cpp?rev=1525101&r1=1525100&r2=1525101&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/tests/MessageReplayTracker.cpp (original)
+++ qpid/branches/linearstore/qpid/cpp/src/tests/MessageReplayTracker.cpp Fri Sep 20 18:59:30 2013
@@ -23,6 +23,8 @@
 #include "qpid/client/MessageReplayTracker.h"
 #include "qpid/sys/Time.h"
 
+#include <boost/format.hpp>
+
 namespace qpid {
 namespace tests {
 

Modified: qpid/branches/linearstore/qpid/cpp/src/tests/MessagingFixture.h
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/tests/MessagingFixture.h?rev=1525101&r1=1525100&r2=1525101&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/tests/MessagingFixture.h (original)
+++ qpid/branches/linearstore/qpid/cpp/src/tests/MessagingFixture.h Fri Sep 20 18:59:30 2013
@@ -35,6 +35,8 @@
 #include "qpid/messaging/Message.h"
 #include "qpid/types/Variant.h"
 
+#include <boost/format.hpp>
+
 namespace qpid {
 namespace tests {
 
@@ -115,6 +117,7 @@ struct MessagingFixture : public BrokerF
             (boost::format("amqp:tcp:localhost:%1%") % (port)).str());
         connection.open();
         return connection;
+
     }
 
     /** Open a connection to the broker. */
@@ -231,9 +234,10 @@ inline void receive(messaging::Receiver&
 class MethodInvoker
 {
   public:
-    MethodInvoker(messaging::Session& session) : replyTo("#; {create:always, node:{x-declare:{auto-delete:true}}}"),
-                                      sender(session.createSender("qmf.default.direct/broker")),
-                                      receiver(session.createReceiver(replyTo)) {}
+    MethodInvoker(messaging::Session session) :
+        replyTo("#; {create:always, node:{x-declare:{auto-delete:true}}}"),
+        sender(session.createSender("qmf.default.direct/broker")),
+        receiver(session.createReceiver(replyTo)) {}
 
     void createExchange(const std::string& name, const std::string& type, bool durable=false)
     {
@@ -292,11 +296,14 @@ class MethodInvoker
         methodRequest("delete", params);
     }
 
-    void methodRequest(const std::string& method, const Variant::Map& inParams, Variant::Map* outParams = 0)
+    void methodRequest(
+        const std::string& method,
+        const Variant::Map& inParams, Variant::Map* outParams = 0,
+        const std::string& objectName="org.apache.qpid.broker:broker:amqp-broker")
     {
         Variant::Map content;
         Variant::Map objectId;
-        objectId["_object_name"] = "org.apache.qpid.broker:broker:amqp-broker";
+        objectId["_object_name"] = objectName;;
         content["_object_id"] = objectId;
         content["_method_name"] = method;
         content["_arguments"] = inParams;

Modified: qpid/branches/linearstore/qpid/cpp/src/tests/MessagingSessionTests.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/tests/MessagingSessionTests.cpp?rev=1525101&r1=1525100&r2=1525101&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/tests/MessagingSessionTests.cpp (original)
+++ qpid/branches/linearstore/qpid/cpp/src/tests/MessagingSessionTests.cpp Fri Sep 20 18:59:30 2013
@@ -1289,6 +1289,95 @@ QPID_AUTO_TEST_CASE(testSimpleRequestRes
     BOOST_CHECK_EQUAL(m.getSubject(), original.getSubject());
 }
 
+QPID_AUTO_TEST_CASE(testSelfDestructQueue)
+{
+    MessagingFixture fix;
+    Session other = fix.connection.createSession();
+    Receiver r1 = other.createReceiver("amq.fanout; {link:{reliability:at-least-once, x-declare:{arguments:{qpid.max_count:10,qpid.policy_type:self-destruct}}}}");
+    Receiver r2 = fix.session.createReceiver("amq.fanout");
+    //send request
+    Sender s = fix.session.createSender("amq.fanout");
+    for (uint i = 0; i < 20; ++i) {
+        s.send(Message((boost::format("MSG_%1%") % (i+1)).str()));
+    }
+    try {
+        ScopedSuppressLogging sl;
+        for (uint i = 0; i < 20; ++i) {
+            r1.fetch(Duration::SECOND);
+        }
+        BOOST_FAIL("Expected exception.");
+    } catch (const qpid::messaging::MessagingException&) {
+    }
+
+    for (uint i = 0; i < 20; ++i) {
+        BOOST_CHECK_EQUAL(r2.fetch(Duration::SECOND).getContent(), (boost::format("MSG_%1%") % (i+1)).str());
+    }
+}
+
+QPID_AUTO_TEST_CASE(testReroutingRingQueue)
+{
+    MessagingFixture fix;
+    Receiver r1 = fix.session.createReceiver("my-queue; {create:always, node:{x-declare:{alternate-exchange:amq.fanout, auto-delete:True, arguments:{qpid.max_count:10,qpid.policy_type:ring}}}}");
+    Receiver r2 = fix.session.createReceiver("amq.fanout");
+
+    Sender s = fix.session.createSender("my-queue");
+    for (uint i = 0; i < 20; ++i) {
+        s.send(Message((boost::format("MSG_%1%") % (i+1)).str()));
+    }
+    for (uint i = 10; i < 20; ++i) {
+        BOOST_CHECK_EQUAL(r1.fetch(Duration::SECOND).getContent(), (boost::format("MSG_%1%") % (i+1)).str());
+    }
+    for (uint i = 0; i < 10; ++i) {
+        BOOST_CHECK_EQUAL(r2.fetch(Duration::SECOND).getContent(), (boost::format("MSG_%1%") % (i+1)).str());
+    }
+}
+
+QPID_AUTO_TEST_CASE(testReleaseOnPriorityQueue)
+{
+    MessagingFixture fix;
+    std::string queue("queue; {create:always, node:{x-declare:{auto-delete:True, arguments:{qpid.priorities:10}}}}");
+    std::string text("my message");
+    Sender sender = fix.session.createSender(queue);
+    sender.send(Message(text));
+    Receiver receiver = fix.session.createReceiver(queue);
+    Message msg;
+    for (uint i = 0; i < 10; ++i) {
+        if (receiver.fetch(msg, Duration::SECOND)) {
+            BOOST_CHECK_EQUAL(msg.getContent(), text);
+            fix.session.release(msg);
+        } else {
+            BOOST_FAIL("Released message not redelivered as expected.");
+        }
+    }
+    fix.session.acknowledge();
+}
+
+QPID_AUTO_TEST_CASE(testRollbackWithFullPrefetch)
+{
+    QueueFixture fix;
+    std::string first("first");
+    std::string second("second");
+    Sender sender = fix.session.createSender(fix.queue);
+    for (uint i = 0; i < 10; ++i) {
+        sender.send(Message((boost::format("MSG_%1%") % (i+1)).str()));
+    }
+    Session txsession = fix.connection.createTransactionalSession();
+    Receiver receiver = txsession.createReceiver(fix.queue);
+    receiver.setCapacity(9);
+    Message msg;
+    for (uint i = 0; i < 10; ++i) {
+        if (receiver.fetch(msg, Duration::SECOND)) {
+            BOOST_CHECK_EQUAL(msg.getContent(), std::string("MSG_1"));
+            txsession.rollback();
+        } else {
+            BOOST_FAIL("Released message not redelivered as expected.");
+            break;
+        }
+    }
+    txsession.acknowledge();
+    txsession.commit();
+}
+
 QPID_AUTO_TEST_SUITE_END()
 
 }} // namespace qpid::tests

Modified: qpid/branches/linearstore/qpid/cpp/src/tests/QueuePolicyTest.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/tests/QueuePolicyTest.cpp?rev=1525101&r1=1525100&r2=1525101&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/tests/QueuePolicyTest.cpp (original)
+++ qpid/branches/linearstore/qpid/cpp/src/tests/QueuePolicyTest.cpp Fri Sep 20 18:59:30 2013
@@ -28,6 +28,8 @@
 #include "qpid/framing/reply_exceptions.h"
 #include "BrokerFixture.h"
 
+#include <boost/format.hpp>
+
 using namespace qpid::broker;
 using namespace qpid::client;
 using namespace qpid::framing;

Modified: qpid/branches/linearstore/qpid/cpp/src/tests/Shlib.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/tests/Shlib.cpp?rev=1525101&r1=1525100&r2=1525101&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/tests/Shlib.cpp (original)
+++ qpid/branches/linearstore/qpid/cpp/src/tests/Shlib.cpp Fri Sep 20 18:59:30 2013
@@ -35,13 +35,7 @@ typedef void (*CallMe)(int*);
 
 
 QPID_AUTO_TEST_CASE(testShlib) {
-    // The CMake-based build passes in the shared lib suffix; if it's not
-    // there, this is a Linux/UNIX libtool-based build.
-#if defined (QPID_SHLIB_PREFIX) && defined (QPID_SHLIB_SUFFIX)
     Shlib sh("./" QPID_SHLIB_PREFIX "shlibtest" QPID_SHLIB_POSTFIX QPID_SHLIB_SUFFIX);
-#else
-    Shlib sh(".libs/libshlibtest.so");
-#endif
     // Double cast to avoid ISO warning.
     CallMe callMe=sh.getSymbol<CallMe>("callMe");
     BOOST_REQUIRE(callMe != 0);
@@ -59,11 +53,7 @@ QPID_AUTO_TEST_CASE(testShlib) {
 QPID_AUTO_TEST_CASE(testAutoShlib) {
     int unloaded = 0;
     {
-#if defined (QPID_SHLIB_PREFIX) && defined (QPID_SHLIB_SUFFIX)
         AutoShlib sh("./" QPID_SHLIB_PREFIX "shlibtest" QPID_SHLIB_POSTFIX QPID_SHLIB_SUFFIX);
-#else
-        AutoShlib sh(".libs/libshlibtest.so");
-#endif
         CallMe callMe=sh.getSymbol<CallMe>("callMe");
         BOOST_REQUIRE(callMe != 0);
         callMe(&unloaded);

Modified: qpid/branches/linearstore/qpid/cpp/src/tests/TxMocks.h
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/tests/TxMocks.h?rev=1525101&r1=1525100&r2=1525101&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/tests/TxMocks.h (original)
+++ qpid/branches/linearstore/qpid/cpp/src/tests/TxMocks.h Fri Sep 20 18:59:30 2013
@@ -103,6 +103,9 @@ public:
         if(!debugName.empty()) std::cout << std::endl << "MockTxOp[" << debugName << "]::rollback()" << std::endl;
         actual.push_back(ROLLBACK);
     }
+
+    void callObserver(const boost::shared_ptr<TransactionObserver>&) {}
+
     MockTxOp& expectPrepare(){
         expected.push_back(PREPARE);
         return *this;

Modified: qpid/branches/linearstore/qpid/cpp/src/tests/brokertest.py
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/tests/brokertest.py?rev=1525101&r1=1525100&r2=1525101&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/tests/brokertest.py (original)
+++ qpid/branches/linearstore/qpid/cpp/src/tests/brokertest.py Fri Sep 20 18:59:30 2013
@@ -78,7 +78,7 @@ def error_line(filename, n=1):
     except: return ""
     return ":\n" + "".join(result)
 
-def retry(function, timeout=10, delay=.01, max_delay=1):
+def retry(function, timeout=10, delay=.001, max_delay=1):
     """Call function until it returns a true value or timeout expires.
     Double the delay for each retry up to max_delay.
     Returns what function returns if true, None if timeout expires."""
@@ -141,7 +141,7 @@ class Popen(subprocess.Popen):
         finally: f.close()
         log.debug("Started process %s: %s" % (self.pname, " ".join(self.cmd)))
 
-    def __str__(self): return "Popen<%s>"%(self.pname)
+    def __repr__(self): return "Popen<%s>"%(self.pname)
 
     def outfile(self, ext): return "%s.%s" % (self.pname, ext)
 
@@ -242,16 +242,12 @@ class Broker(Popen):
     _broker_count = 0
     _log_count = 0
 
-    def __str__(self): return "Broker<%s %s :%d>"%(self.log, self.pname, self.port())
-
-    def find_log(self):
-        self.log = "%03d:%s.log" % (Broker._log_count, self.name)
-        Broker._log_count += 1
+    def __repr__(self): return "<Broker:%s:%d>"%(self.log, self.port())
 
     def get_log(self):
         return os.path.abspath(self.log)
 
-    def __init__(self, test, args=[], name=None, expect=EXPECT_RUNNING, port=0, log_level=None, wait=None, show_cmd=False):
+    def __init__(self, test, args=[], test_store=False, name=None, expect=EXPECT_RUNNING, port=0, log_level=None, wait=None, show_cmd=False):
         """Start a broker daemon. name determines the data-dir and log
         file names."""
 
@@ -273,11 +269,18 @@ class Broker(Popen):
         else:
             self.name = "broker%d" % Broker._broker_count
             Broker._broker_count += 1
-        self.find_log()
+
+        self.log = "%03d:%s.log" % (Broker._log_count, self.name)
+        self.store_log = "%03d:%s.store.log" % (Broker._log_count, self.name)
+        Broker._log_count += 1
+
         cmd += ["--log-to-file", self.log]
         cmd += ["--log-to-stderr=no"]
         cmd += ["--log-enable=%s"%(log_level or "info+") ]
 
+        if test_store: cmd += ["--load-module", BrokerTest.test_store_lib,
+                               "--test-store-events", self.store_log]
+
         self.datadir = self.name
         cmd += ["--data-dir", self.datadir]
         if show_cmd: print cmd
@@ -285,7 +288,6 @@ class Broker(Popen):
         test.cleanup_stop(self)
         self._host = "127.0.0.1"
         log.debug("Started broker %s (%s, %s)" % (self.name, self.pname, self.log))
-        self._log_ready = False
 
     def startQmf(self, handler=None):
         self.qmf_session = qmf.console.Session(handler)
@@ -363,29 +365,21 @@ class Broker(Popen):
 
     def host_port(self): return "%s:%s" % (self.host(), self.port())
 
-    def log_contains(self, str, timeout=1):
-        """Wait for str to appear in the log file up to timeout. Return true if found"""
-        return retry(lambda: find_in_file(str, self.log), timeout)
-
-    def log_ready(self):
-        """Return true if the log file exists and contains a broker ready message"""
-        if not self._log_ready:
-            self._log_ready = find_in_file("notice Broker running", self.log)
-        return self._log_ready
-
     def ready(self, timeout=30, **kwargs):
         """Wait till broker is ready to serve clients"""
-        # First make sure the broker is listening by checking the log.
-        if not retry(self.log_ready, timeout=timeout):
-            raise Exception(
-                "Timed out waiting for broker %s%s"%(self.name, error_line(self.log,5)))
-        # Create a connection and a session.
-        try:
-            c = self.connect(**kwargs)
-            try: c.session()
-            finally: c.close()
-        except Exception,e: raise RethrownException(
-            "Broker %s not responding: (%s)%s"%(self.name,e,error_line(self.log, 5)))
+        deadline = time.time()+timeout
+        while True:
+            try:
+                c = self.connect(timeout=timeout, **kwargs)
+                try:
+                    c.session()
+                    return      # All good
+                finally: c.close()
+            except Exception,e: # Retry up to timeout
+                if time.time() > deadline:
+                    raise RethrownException(
+                        "Broker %s not responding: (%s)%s"%(
+                            self.name,e,error_line(self.log, 5)))
 
 def browse(session, queue, timeout=0, transform=lambda m: m.content):
     """Return a list with the contents of each message on queue."""
@@ -407,7 +401,7 @@ def assert_browse(session, queue, expect
     if msg: msg = "%s: %r != %r"%(msg, expect_contents, actual_contents)
     assert expect_contents == actual_contents, msg
 
-def assert_browse_retry(session, queue, expect_contents, timeout=1, delay=.01, transform=lambda m:m.content, msg="browse failed"):
+def assert_browse_retry(session, queue, expect_contents, timeout=1, delay=.001, transform=lambda m:m.content, msg="browse failed"):
     """Wait up to timeout for contents of queue to match expect_contents"""
     test = lambda: browse(session, queue, 0, transform=transform) == expect_contents
     retry(test, timeout, delay)
@@ -421,6 +415,10 @@ class BrokerTest(TestCase):
     Provides a well-known working directory for each test.
     """
 
+    def __init__(self, *args, **kwargs):
+        self.longMessage = True # Enable long messages for assert*(..., msg=xxx)
+        TestCase.__init__(self, *args, **kwargs)
+
     # Environment settings.
     qpidd_exec = os.path.abspath(checkenv("QPIDD_EXEC"))
     ha_lib = os.getenv("HA_LIB")
@@ -480,7 +478,7 @@ class BrokerTest(TestCase):
     def assert_browse(self, *args, **kwargs): assert_browse(*args, **kwargs)
     def assert_browse_retry(self, *args, **kwargs): assert_browse_retry(*args, **kwargs)
 
-def join(thread, timeout=10):
+def join(thread, timeout=30):
     thread.join(timeout)
     if thread.isAlive(): raise Exception("Timed out joining thread %s"%thread)
 

Modified: qpid/branches/linearstore/qpid/cpp/src/tests/cli_tests.py
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/tests/cli_tests.py?rev=1525101&r1=1525100&r2=1525101&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/tests/cli_tests.py (original)
+++ qpid/branches/linearstore/qpid/cpp/src/tests/cli_tests.py Fri Sep 20 18:59:30 2013
@@ -69,47 +69,39 @@ class CliTests(TestBase010):
         self.startBrokerAccess()
         queue1 = self.makeQueue("test_queue_params1", "--limit-policy none")
         queue2 = self.makeQueue("test_queue_params2", "--limit-policy reject")
-        queue3 = self.makeQueue("test_queue_params3", "--limit-policy flow-to-disk")
-        queue4 = self.makeQueue("test_queue_params4", "--limit-policy ring")
-        queue5 = self.makeQueue("test_queue_params5", "--limit-policy ring-strict")
+        queue3 = self.makeQueue("test_queue_params3", "--limit-policy ring")
 
         LIMIT = "qpid.policy_type"
         assert LIMIT not in queue1.arguments
         self.assertEqual(queue2.arguments[LIMIT], "reject")
-        self.assertEqual(queue3.arguments[LIMIT], "flow_to_disk")
-        self.assertEqual(queue4.arguments[LIMIT], "ring")
-        self.assertEqual(queue5.arguments[LIMIT], "ring_strict")
+        self.assertEqual(queue3.arguments[LIMIT], "ring")
 
-        queue6 = self.makeQueue("test_queue_params6", "--lvq-key lkey")
+        queue4 = self.makeQueue("test_queue_params4", "--lvq-key lkey")
 
         LVQKEY = "qpid.last_value_queue_key"
 
-        assert LVQKEY not in queue5.arguments
-        assert LVQKEY     in queue6.arguments
-        assert queue6.arguments[LVQKEY] == "lkey"
+        assert LVQKEY not in queue3.arguments
+        assert LVQKEY     in queue4.arguments
+        assert queue4.arguments[LVQKEY] == "lkey"
 
     def test_queue_params_api(self):
         self.startBrokerAccess()
         queue1 = self.makeQueue("test_queue_params_api1", "--limit-policy none", True)
         queue2 = self.makeQueue("test_queue_params_api2", "--limit-policy reject", True)
-        queue3 = self.makeQueue("test_queue_params_api3", "--limit-policy flow-to-disk", True)
-        queue4 = self.makeQueue("test_queue_params_api4", "--limit-policy ring", True)
-        queue5 = self.makeQueue("test_queue_params_api5", "--limit-policy ring-strict", True)
+        queue3 = self.makeQueue("test_queue_params_api3", "--limit-policy ring", True)
 
         LIMIT = "qpid.policy_type"
         assert LIMIT not in queue1.arguments
         self.assertEqual(queue2.arguments[LIMIT], "reject")
-        self.assertEqual(queue3.arguments[LIMIT], "flow_to_disk")
-        self.assertEqual(queue4.arguments[LIMIT], "ring")
-        self.assertEqual(queue5.arguments[LIMIT], "ring_strict")
+        self.assertEqual(queue3.arguments[LIMIT], "ring")
 
-        queue6 = self.makeQueue("test_queue_params_api6", "--lvq-key lkey")
+        queue4 = self.makeQueue("test_queue_params_api4", "--lvq-key lkey")
 
         LVQKEY = "qpid.last_value_queue_key"
 
-        assert LVQKEY not in queue5.arguments
-        assert LVQKEY     in queue6.arguments
-        assert queue6.arguments[LVQKEY] == "lkey"
+        assert LVQKEY not in queue3.arguments
+        assert LVQKEY     in queue4.arguments
+        assert queue4.arguments[LVQKEY] == "lkey"
 
 
     def test_qpid_config(self):

Modified: qpid/branches/linearstore/qpid/cpp/src/tests/ha_test.py
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/tests/ha_test.py?rev=1525101&r1=1525100&r2=1525101&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/tests/ha_test.py (original)
+++ qpid/branches/linearstore/qpid/cpp/src/tests/ha_test.py Fri Sep 20 18:59:30 2013
@@ -48,6 +48,24 @@ class QmfAgent(object):
             address, client_properties={"qpid.ha-admin":1}, **kwargs)
         self._agent = BrokerAgent(self._connection)
 
+    def queues(self):
+        return [q.values['name'] for q in self._agent.getAllQueues()]
+
+    def repsub_queue(self, sub):
+        """If QMF subscription sub is a replicating subscription return
+        the name of the replicated queue, else return None"""
+        session_name = self.getSession(sub.sessionRef).name
+        m = re.search("qpid.ha-q:(.*)\.", session_name)
+        return m and m.group(1)
+
+    def repsub_queues(self):
+        """Return queue names for all replicating subscriptions"""
+        return filter(None, [self.repsub_queue(s) for s in self.getAllSubscriptions()])
+
+    def tx_queues(self):
+        """Return names of all tx-queues"""
+        return [q for q in self.queues() if q.startswith("qpid.ha-tx")]
+
     def __getattr__(self, name):
         a = getattr(self._agent, name)
         return a
@@ -101,6 +119,9 @@ class HaBroker(Broker):
     """Start a broker with HA enabled
     @param client_cred: (user, password, mechanism) for admin clients started by the HaBroker.
     """
+
+    heartbeat=2
+
     def __init__(self, test, ha_port=None, args=[], brokers_url=None, ha_cluster=True,
                  ha_replicate="all", client_credentials=None, **kwargs):
         assert BrokerTest.ha_lib, "Cannot locate HA plug-in"
@@ -112,7 +133,7 @@ class HaBroker(Broker):
                  "--link-maintenance-interval=0.1",
                  # Heartbeat and negotiate time are needed so that a broker wont
                  # stall on an address that doesn't currently have a broker running.
-                 "--link-heartbeat-interval=1",
+                 "--link-heartbeat-interval=%s"%(HaBroker.heartbeat),
                  "--max-negotiate-time=1000",
                  "--ha-cluster=%s"%ha_cluster]
         if ha_replicate is not None:
@@ -139,15 +160,18 @@ acl allow all all
         self.client_credentials = client_credentials
         self.ha_port = ha_port
 
-    def __str__(self): return Broker.__str__(self)
+    def __repr__(self): return "<HaBroker:%s:%d>"%(self.log, self.port())
 
     def qpid_ha(self, args):
-        cred = self.client_credentials
-        url = self.host_port()
-        if cred:
-            url =cred.add_user(url)
-            args = args + ["--sasl-mechanism", cred.mechanism]
-        self.qpid_ha_script.main_except(["", "-b", url]+args)
+        try:
+            cred = self.client_credentials
+            url = self.host_port()
+            if cred:
+                url =cred.add_user(url)
+                args = args + ["--sasl-mechanism", cred.mechanism]
+            self.qpid_ha_script.main_except(["", "-b", url]+args)
+        except Exception, e:
+            raise Exception("Error in qpid_ha -b %s %s: %s"%(url, args,e))
 
     def promote(self): self.ready(); self.qpid_ha(["promote"])
     def set_public_url(self, url): self.qpid_ha(["set", "--public-url", url])
@@ -221,6 +245,12 @@ acl allow all all
 
     def wait_backup(self, address): self.wait_address(address)
 
+    def browse(self, queue, timeout=0, transform=lambda m: m.content):
+        c = self.connect_admin()
+        try:
+            return browse(c.session(), queue, timeout, transform)
+        finally: c.close()
+
     def assert_browse(self, queue, expected, **kwargs):
         """Verify queue contents by browsing."""
         bs = self.connect().session()
@@ -247,8 +277,10 @@ acl allow all all
         try: return self.connect()
         except ConnectionError: return None
 
-    def ready(self):
-        return Broker.ready(self, client_properties={"qpid.ha-admin":1})
+    def ready(self, *args, **kwargs):
+        if not 'client_properties' in kwargs: kwargs['client_properties'] = {}
+        kwargs['client_properties']['qpid.ha-admin'] = True
+        return Broker.ready(self, *args, **kwargs)
 
     def kill(self, final=True):
         if final: self.ha_port.stop()
@@ -259,16 +291,19 @@ acl allow all all
 class HaCluster(object):
     _cluster_count = 0
 
-    def __init__(self, test, n, promote=True, wait=True, args=[], **kwargs):
+    def __init__(self, test, n, promote=True, wait=True, args=[], s_args=[], **kwargs):
         """Start a cluster of n brokers.
 
         @test: The test being run
         @n: start n brokers
         @promote: promote self[0] to primary
         @wait: wait for primary active and backups ready. Ignored if promote=False
+        @args: args for all brokers in the cluster.
+        @s_args: args for specific brokers: s_args[i] for broker i.
         """
         self.test = test
-        self.args = args
+        self.args = copy(args)
+        self.s_args = copy(s_args)
         self.kwargs = kwargs
         self._ports = [HaPort(test) for i in xrange(n)]
         self._set_url()
@@ -288,10 +323,13 @@ class HaCluster(object):
         self.broker_id += 1
         return name
 
-    def _ha_broker(self, ha_port, name):
+    def _ha_broker(self, i, name):
+        args = self.args
+        if i < len(self.s_args): args += self.s_args[i]
+        ha_port = self._ports[i]
         b = HaBroker(ha_port.test, ha_port, brokers_url=self.url, name=name,
-                     args=self.args, **self.kwargs)
-        b.ready()
+                     args=args, **self.kwargs)
+        b.ready(timeout=5)
         return b
 
     def start(self):
@@ -302,7 +340,7 @@ class HaCluster(object):
             self._ports.append(HaPort(self.test))
             self._set_url()
             self._update_urls()
-        b = self._ha_broker(self._ports[i], self.next_name())
+        b = self._ha_broker(i, self.next_name())
         self._brokers.append(b)
         return b
 
@@ -328,7 +366,7 @@ class HaCluster(object):
         a separate log file: foo.n.log"""
         if self._ports[i].stopped: raise Exception("Restart after final kill: %s"%(self))
         b = self._brokers[i]
-        self._brokers[i] = self._ha_broker(self._ports[i], b.name)
+        self._brokers[i] = self._ha_broker(i, b.name)
         self._brokers[i].ready()
 
     def bounce(self, i, promote_next=True):

Modified: qpid/branches/linearstore/qpid/cpp/src/tests/ha_tests.py
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/tests/ha_tests.py?rev=1525101&r1=1525100&r2=1525101&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/tests/ha_tests.py (original)
+++ qpid/branches/linearstore/qpid/cpp/src/tests/ha_tests.py Fri Sep 20 18:59:30 2013
@@ -20,7 +20,7 @@
 
 import os, signal, sys, time, imp, re, subprocess, glob, random, logging, shutil, math, unittest
 import traceback
-from qpid.messaging import Message, SessionError, NotFound, ConnectionError, ReceiverError, Connection, Timeout, Disposition, REJECTED, Empty
+from qpid.messaging import Message, SessionError, NotFound, ConnectionError, ReceiverError, Connection, Timeout, Disposition, REJECTED, Empty, ServerError
 from qpid.datatypes import uuid4, UUID
 from brokertest import *
 from ha_test import *
@@ -37,6 +37,7 @@ def grep(filename, regexp):
 
 class HaBrokerTest(BrokerTest):
     """Base class for HA broker tests"""
+
     def assert_log_no_errors(self, broker):
         log = broker.get_log()
         if grep(log, re.compile("] error|] critical")):
@@ -219,7 +220,8 @@ class ReplicationTests(HaBrokerTest):
             backup.connect_admin().close()
 
             # Test discovery: should connect to primary after reject by backup
-            c = backup.connect(reconnect_urls=[primary.host_port(), backup.host_port()], reconnect=True)
+            c = backup.connect(reconnect_urls=[primary.host_port(), backup.host_port()],
+                               reconnect=True)
             s = c.session()
             sender = s.sender("q;{create:always}")
             backup.wait_backup("q")
@@ -561,9 +563,9 @@ class ReplicationTests(HaBrokerTest):
             return
         acl=os.path.join(os.getcwd(), "policy.acl")
         aclf=file(acl,"w")
-        # Verify that replication works with auth=yes and HA user has at least the following
-        # privileges:
+        # Minimum set of privileges required for the HA user.
         aclf.write("""
+# HA user
 acl allow zag@QPID access queue
 acl allow zag@QPID create queue
 acl allow zag@QPID consume queue
@@ -575,6 +577,9 @@ acl allow zag@QPID publish exchange
 acl allow zag@QPID delete exchange
 acl allow zag@QPID access method
 acl allow zag@QPID create link
+# Normal user
+acl allow zig@QPID all all
+
 acl deny all all
  """)
         aclf.close()
@@ -585,14 +590,16 @@ acl deny all all
                   "--ha-username=zag", "--ha-password=zag", "--ha-mechanism=PLAIN"
                   ],
             client_credentials=Credentials("zag", "zag", "PLAIN"))
-        s0 = cluster[0].connect(username="zag", password="zag").session();
-        s0.receiver("q;{create:always}")
-        s0.receiver("ex;{create:always,node:{type:topic,x-declare:{type:'fanout'},x-bindings:[{exchange:'ex',queue:'q'}]}}")
-        cluster[1].wait_backup("q")
-        cluster[1].wait_backup("ex")
-        s1 = cluster[1].connect_admin().session(); # Uses Credentials above.
-        s1.sender("ex").send("foo");
-        self.assertEqual(s1.receiver("q").fetch().content, "foo")
+        c = cluster[0].connect(username="zig", password="zig")
+        s0 = c.session();
+        s0.sender("q;{create:always}")
+        s0.sender("ex;{create:always,node:{type:topic,x-declare:{type:'fanout'},x-bindings:[{exchange:'ex',queue:'q'}]}}")
+        s0.sender("ex").send("foo");
+        s1 = c.session(transactional=True)
+        s1.sender("ex").send("foo-tx");
+        cluster[1].assert_browse_backup("q", ["foo"])
+        s1.commit()
+        cluster[1].assert_browse_backup("q", ["foo", "foo-tx"])
 
     def test_alternate_exchange(self):
         """Verify that alternate-exchange on exchanges and queues is propagated
@@ -927,20 +934,22 @@ class LongTests(HaBrokerTest):
         if d: return float(d)*60
         else: return 3                  # Default is to be quick
 
-    # FIXME aconway 2013-06-27: skip this test pending a fix for
-    # https://issues.apache.org/jira/browse/QPID-4944
-    def skip_test_failover_send_receive(self):
+    def test_failover_send_receive(self):
         """Test failover with continuous send-receive"""
         brokers = HaCluster(self, 3)
 
         # Start sender and receiver threads
         n = 10
-        senders = [NumberedSender(brokers[0], url=brokers.url,
-                                  max_depth=1024, failover_updates=False,
-                                  queue="test%s"%(i)) for i in xrange(n)]
-        receivers = [NumberedReceiver(brokers[0], url=brokers.url, sender=senders[i],
-                                      failover_updates=False,
-                                      queue="test%s"%(i)) for i in xrange(n)]
+        senders = [
+            NumberedSender(
+                brokers[0], url=brokers.url,max_depth=50, failover_updates=False,
+                queue="test%s"%(i), args=["--capacity=10"]) for i in xrange(n)]
+
+        receivers = [
+            NumberedReceiver(
+                brokers[0], url=brokers.url, sender=senders[i],failover_updates=False,
+                queue="test%s"%(i), args=["--capacity=10"]) for i in xrange(n)]
+
         for r in receivers: r.start()
         for s in senders: s.start()
 
@@ -991,7 +1000,7 @@ class LongTests(HaBrokerTest):
         finally:
             for s in senders: s.stop()
             for r in receivers: r.stop()
-            dead = filter(lambda i: not brokers[i].is_running(), xrange(3))
+            dead = filter(lambda b: not b.is_running(), brokers)
             if dead: raise Exception("Brokers not running: %s"%dead)
 
     def test_qmf_order(self):
@@ -1200,7 +1209,7 @@ class ConfigurationTests(HaBrokerTest):
         cluster[0].set_brokers_url(cluster.url+",xxx:1234")
         self.assertRaises(Empty, r.fetch, 0) # Not updated for brokers URL
 
-class StoreTests(BrokerTest):
+class StoreTests(HaBrokerTest):
     """Test for HA with persistence."""
 
     def check_skip(self):
@@ -1248,7 +1257,7 @@ class StoreTests(BrokerTest):
         doing catch-up from the primary."""
         if self.check_skip(): return
         cluster = HaCluster(self, 2)
-        sn = cluster[0].connect(heartbeat=1).session()
+        sn = cluster[0].connect(heartbeat=HaBroker.heartbeat).session()
         s1 = sn.sender("q1;{create:always,node:{durable:true}}")
         for m in ["foo","bar"]: s1.send(Message(m, durable=True))
         s2 = sn.sender("q2;{create:always,node:{durable:true}}")
@@ -1259,7 +1268,7 @@ class StoreTests(BrokerTest):
         cluster[1].assert_browse_backup("q2", ["hello"])
         # Make changes that the backup doesn't see
         cluster.kill(1, promote_next=False, final=False)
-        r1 = cluster[0].connect(heartbeat=1).session().receiver("q1")
+        r1 = cluster[0].connect(heartbeat=HaBroker.heartbeat).session().receiver("q1")
         for m in ["foo", "bar"]: self.assertEqual(r1.fetch().content, m)
         r1.session.acknowledge()
         for m in ["x","y","z"]: s1.send(Message(m, durable=True))
@@ -1278,7 +1287,7 @@ class StoreTests(BrokerTest):
         cluster[0].assert_browse("q1",  ["x","y","z"])
         cluster[1].assert_browse_backup("q1",  ["x","y","z"])
 
-        sn = cluster[0].connect(heartbeat=1).session() # FIXME aconway 2012-09-25: should fail over!
+        sn = cluster[0].connect(heartbeat=HaBroker.heartbeat).session()
         sn.sender("ex/k1").send("boo")
         cluster[0].assert_browse_backup("q1", ["x","y","z", "boo"])
         cluster[1].assert_browse_backup("q1", ["x","y","z", "boo"])
@@ -1287,6 +1296,185 @@ class StoreTests(BrokerTest):
         cluster[0].assert_browse("q2", ["hello", "end"])
         cluster[1].assert_browse_backup("q2", ["hello", "end"])
 
+def open_read(name):
+    try:
+        f = open(name)
+        return f.read()
+    finally: f.close()
+
+class TransactionTests(HaBrokerTest):
+
+    load_store=["--load-module", BrokerTest.test_store_lib]
+
+    def tx_simple_setup(self, broker):
+        """Start a transaction, remove messages from queue a, add messages to queue b"""
+        c = broker.connect()
+        # Send messages to a, no transaction.
+        sa = c.session().sender("a;{create:always,node:{durable:true}}")
+        tx_msgs =  ["x","y","z"]
+        for m in tx_msgs: sa.send(Message(content=m, durable=True))
+
+        # Receive messages from a, in transaction.
+        tx = c.session(transactional=True)
+        txr = tx.receiver("a")
+        tx_msgs2 = [txr.fetch(1).content for i in xrange(3)]
+        self.assertEqual(tx_msgs, tx_msgs2)
+
+        # Send messages to b, transactional, mixed with non-transactional.
+        sb = c.session().sender("b;{create:always,node:{durable:true}}")
+        txs = tx.sender("b")
+        msgs = [str(i) for i in xrange(3)]
+        for tx_m,m in zip(tx_msgs2, msgs):
+            txs.send(tx_m);
+            sb.send(m)
+        return tx
+
+    def tx_subscriptions(self, broker):
+        """Return list of queue names for tx subscriptions"""
+        return [q for q in broker.agent().repsub_queues()
+                    if q.startswith("qpid.ha-tx")]
+
+    def test_tx_simple_commit(self):
+        cluster = HaCluster(self, 2, test_store=True)
+        tx = self.tx_simple_setup(cluster[0])
+        tx.sync()
+        tx_queues = cluster[0].agent().tx_queues()
+
+        # NOTE: backup does not process transactional dequeues until prepare
+        cluster[1].assert_browse_backup("a", ["x","y","z"])
+        cluster[1].assert_browse_backup("b", ['0', '1', '2'])
+
+        tx.acknowledge()
+        tx.commit()
+        tx.sync()
+
+        for b in cluster: self.assert_simple_commit_outcome(b, tx_queues)
+
+    def assert_tx_cleanup(self, b, tx_queues):
+        """Verify that there are no transaction artifacts
+        (exchanges, queues, subscriptions) on b."""
+
+        self.assertEqual(0, len(b.agent().tx_queues()), msg=b)
+        self.assertEqual(0, len(self.tx_subscriptions(b)), msg=b)
+
+        # TX exchanges don't show up in management so test for existence by name.
+        s = b.connect_admin().session()
+        try:
+            for q in tx_queues:
+                try:
+                    s.sender("%s;{node:{type:topic}}"%q)
+                    self.fail("Found tx exchange %s on %s "%(q,b))
+                except NotFound: pass
+        finally: s.connection.close()
+
+    def assert_simple_commit_outcome(self, b, tx_queues):
+        b.assert_browse_backup("a", [], msg=b)
+        b.assert_browse_backup("b", ['0', '1', '2', 'x', 'y', 'z'], msg=b)
+        # Check for expected actions on the store
+        expect = """<enqueue a x>
+<enqueue a y>
+<enqueue a z>
+<begin tx 1>
+<dequeue a x tx=1>
+<dequeue a y tx=1>
+<dequeue a z tx=1>
+<commit tx=1>
+"""
+        self.assertEqual(expect, open_read(b.store_log), msg=b)
+        self.assert_tx_cleanup(b, tx_queues)
+
+    def test_tx_simple_rollback(self):
+        cluster = HaCluster(self, 2, test_store=True)
+        tx = self.tx_simple_setup(cluster[0])
+        tx.sync()
+        tx_queues = cluster[0].agent().tx_queues()
+        tx.acknowledge()
+        tx.rollback()
+        for b in cluster: self.assert_simple_rollback_outcome(b, tx_queues)
+
+    def assert_simple_rollback_outcome(self, b, tx_queues):
+        b.assert_browse_backup("a", ["x","y","z"], msg=b)
+        b.assert_browse_backup("b", ['0', '1', '2'], msg=b)
+        # Check for expected actions on the store
+        expect = """<enqueue a x>
+<enqueue a y>
+<enqueue a z>
+"""
+        self.assertEqual(open_read(b.store_log), expect, msg=b)
+        self.assert_tx_cleanup(b, tx_queues)
+
+    def test_tx_simple_failover(self):
+        cluster = HaCluster(self, 3, test_store=True)
+        tx = self.tx_simple_setup(cluster[0])
+        tx.sync()
+        tx_queues = cluster[0].agent().tx_queues()
+        tx.acknowledge()
+        cluster.bounce(0)       # Should cause roll-back
+        cluster[0].wait_status("ready") # Restarted.
+        cluster[1].wait_status("active") # Promoted.
+        cluster[2].wait_status("ready")  # Failed over.
+        for b in cluster: self.assert_simple_rollback_outcome(b, tx_queues)
+
+    def test_tx_no_backups(self):
+        """Test the special case of a TX where there are no backups"""
+
+        # Test commit
+        cluster = HaCluster(self, 1, test_store=True)
+        tx = self.tx_simple_setup(cluster[0])
+        tx.acknowledge()
+        tx.commit()
+        tx.sync()
+        tx_queues = cluster[0].agent().tx_queues()
+        self.assert_simple_commit_outcome(cluster[0], tx_queues)
+
+        # Test rollback
+        cluster = HaCluster(self, 1, test_store=True)
+        tx = self.tx_simple_setup(cluster[0])
+        tx.sync()
+        tx_queues = cluster[0].agent().tx_queues()
+        tx.acknowledge()
+        tx.rollback()
+        tx.sync()
+        self.assert_simple_rollback_outcome(cluster[0], tx_queues)
+
+
+    def test_tx_backup_fail(self):
+        cluster = HaCluster(
+            self, 2, test_store=True, s_args=[[],["--test-store-throw=bang"]])
+        c = cluster[0].connect()
+        tx = c.session(transactional=True)
+        s = tx.sender("q;{create:always,node:{durable:true}}")
+        for m in ["foo","bang","bar"]: s.send(Message(m, durable=True))
+        self.assertRaises(ServerError, tx.commit)
+        for b in cluster: b.assert_browse_backup("q", [])
+        self.assertEqual(open_read(cluster[0].store_log), "<begin tx 1>\n<abort tx=1>\n")
+        self.assertEqual(open_read(cluster[1].store_log), "<begin tx 1>\n<enqueue q foo tx=1>\n<enqueue q bang tx=1>\n<abort tx=1>\n")
+
+    def test_tx_join_leave(self):
+        """Test cluster members joining/leaving cluster.
+        Also check that tx-queues are cleaned up at end of transaction."""
+
+        cluster = HaCluster(self, 3)
+
+        # Leaving
+        tx = cluster[0].connect().session(transactional=True)
+        s = tx.sender("q;{create:always}")
+        s.send("a", sync=True)
+        self.assertEqual([1,1,1], [len(b.agent().tx_queues()) for b in cluster])
+        cluster[1].kill(final=False)
+        s.send("b")
+        self.assertRaises(ServerError, tx.commit)
+        self.assertEqual([[],[]], [b.agent().tx_queues() for b in [cluster[0],cluster[2]]])
+
+        # Joining
+        tx = cluster[0].connect().session(transactional=True)
+        s = tx.sender("q;{create:always}")
+        s.send("foo")
+        cluster.restart(1)
+        tx.commit()
+        # The new member is not in the tx but  receives the results normal replication.
+        for b in cluster: b.assert_browse_backup("q", ["foo"], msg=b)
+
 if __name__ == "__main__":
     outdir = "ha_tests.tmp"
     shutil.rmtree(outdir, True)



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org