You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kp...@apache.org on 2013/09/20 20:59:50 UTC
svn commit: r1525101 [8/21] - in /qpid/branches/linearstore/qpid: ./ bin/
cpp/ cpp/bindings/ cpp/bindings/qmf/ cpp/bindings/qmf/python/
cpp/bindings/qmf/ruby/ cpp/bindings/qmf/tests/ cpp/bindings/qmf2/
cpp/bindings/qmf2/examples/cpp/ cpp/bindings/qmf2/...
Modified: qpid/branches/linearstore/qpid/cpp/src/qpid/messaging/amqp/ReceiverContext.h
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/messaging/amqp/ReceiverContext.h?rev=1525101&r1=1525100&r2=1525101&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/messaging/amqp/ReceiverContext.h (original)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/messaging/amqp/ReceiverContext.h Fri Sep 20 18:59:30 2013
@@ -46,6 +46,7 @@ class ReceiverContext
public:
ReceiverContext(pn_session_t* session, const std::string& name, const qpid::messaging::Address& source);
~ReceiverContext();
+ void reset(pn_session_t* session);
void setCapacity(uint32_t);
uint32_t getCapacity();
uint32_t getAvailable();
@@ -56,7 +57,7 @@ class ReceiverContext
const std::string& getSource() const;
bool isClosed() const;
void configure();
- void verify(pn_terminus_t*);
+ void verify();
Address getAddress() const;
private:
friend class ConnectionContext;
Modified: qpid/branches/linearstore/qpid/cpp/src/qpid/messaging/amqp/ReceiverHandle.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/messaging/amqp/ReceiverHandle.cpp?rev=1525101&r1=1525100&r2=1525101&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/messaging/amqp/ReceiverHandle.cpp (original)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/messaging/amqp/ReceiverHandle.cpp Fri Sep 20 18:59:30 2013
@@ -84,7 +84,7 @@ uint32_t ReceiverHandle::getUnsettled()
void ReceiverHandle::close()
{
- session->closeReceiver(getName());
+ connection->detach(session, receiver);
}
const std::string& ReceiverHandle::getName() const
Modified: qpid/branches/linearstore/qpid/cpp/src/qpid/messaging/amqp/Sasl.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/messaging/amqp/Sasl.cpp?rev=1525101&r1=1525100&r2=1525101&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/messaging/amqp/Sasl.cpp (original)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/messaging/amqp/Sasl.cpp Fri Sep 20 18:59:30 2013
@@ -93,7 +93,7 @@ void Sasl::mechanisms(const std::string&
mechanisms = offered;
}
- if (sasl->start(mechanisms, response)) {
+ if (sasl->start(mechanisms, response, context.getTransportSecuritySettings())) {
init(sasl->getMechanism(), &response, hostname.size() ? &hostname : 0);
} else {
init(sasl->getMechanism(), 0, hostname.size() ? &hostname : 0);
Modified: qpid/branches/linearstore/qpid/cpp/src/qpid/messaging/amqp/SenderContext.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/messaging/amqp/SenderContext.cpp?rev=1525101&r1=1525100&r2=1525101&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/messaging/amqp/SenderContext.cpp (original)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/messaging/amqp/SenderContext.cpp Fri Sep 20 18:59:30 2013
@@ -42,7 +42,7 @@ SenderContext::SenderContext(pn_session_
: name(n),
address(a),
helper(address),
- sender(pn_sender(session, n.c_str())), capacity(1000) {}
+ sender(pn_sender(session, n.c_str())), capacity(1000), unreliable(helper.isUnreliable()) {}
SenderContext::~SenderContext()
{
@@ -67,7 +67,7 @@ uint32_t SenderContext::getCapacity()
uint32_t SenderContext::getUnsettled()
{
- return processUnsettled();
+ return processUnsettled(true/*always allow retrieval of unsettled count, even if link has failed*/);
}
const std::string& SenderContext::getName() const
@@ -80,16 +80,26 @@ const std::string& SenderContext::getTar
return address.getName();
}
-SenderContext::Delivery* SenderContext::send(const qpid::messaging::Message& message)
+bool SenderContext::send(const qpid::messaging::Message& message, SenderContext::Delivery** out)
{
- if (processUnsettled() < capacity && pn_link_credit(sender)) {
- deliveries.push_back(Delivery(nextId++));
- Delivery& delivery = deliveries.back();
- delivery.encode(MessageImplAccess::get(message), address);
- delivery.send(sender);
- return &delivery;
+ resend();//if there are any messages needing to be resent at the front of the queue, send them first
+ if (processUnsettled(false) < capacity && pn_link_credit(sender)) {
+ if (unreliable) {
+ Delivery delivery(nextId++);
+ delivery.encode(MessageImplAccess::get(message), address);
+ delivery.send(sender, unreliable);
+ *out = 0;
+ return true;
+ } else {
+ deliveries.push_back(Delivery(nextId++));
+ Delivery& delivery = deliveries.back();
+ delivery.encode(MessageImplAccess::get(message), address);
+ delivery.send(sender, unreliable);
+ *out = &delivery;
+ return true;
+ }
} else {
- return 0;
+ return false;
}
}
@@ -108,11 +118,13 @@ void SenderContext::check()
}
}
-uint32_t SenderContext::processUnsettled()
+uint32_t SenderContext::processUnsettled(bool silent)
{
- check();
+ if (!silent) {
+ check();
+ }
//remove messages from front of deque once peer has confirmed receipt
- while (!deliveries.empty() && deliveries.front().delivered()) {
+ while (!deliveries.empty() && deliveries.front().delivered() && !(pn_link_state(sender) & PN_REMOTE_CLOSED)) {
deliveries.front().settle();
deliveries.pop_front();
}
@@ -122,6 +134,7 @@ namespace {
const std::string X_AMQP("x-amqp-");
const std::string X_AMQP_FIRST_ACQUIRER("x-amqp-first-acquirer");
const std::string X_AMQP_DELIVERY_COUNT("x-amqp-delivery-count");
+const std::string X_AMQP_0_10_APP_ID("x-amqp-0-10.app-id");
class HeaderAdapter : public qpid::amqp::MessageEncoder::Header
{
@@ -341,7 +354,7 @@ class ApplicationPropertiesAdapter : pub
{
for (qpid::types::Variant::Map::const_iterator i = headers.begin(); i != headers.end(); ++i) {
//strip out values with special keys as they are sent in standard fields
- if (!startsWith(i->first, X_AMQP)) {
+ if (!startsWith(i->first, X_AMQP) || i->first == X_AMQP_0_10_APP_ID) {
qpid::amqp::CharSequence key(convert(i->first));
switch (i->second.getType()) {
case qpid::types::VAR_VOID:
@@ -413,7 +426,12 @@ bool changedSubject(const qpid::messagin
}
-SenderContext::Delivery::Delivery(int32_t i) : id(i), token(0) {}
+SenderContext::Delivery::Delivery(int32_t i) : id(i), token(0), presettled(false) {}
+
+void SenderContext::Delivery::reset()
+{
+ token = 0;
+}
void SenderContext::Delivery::encode(const qpid::messaging::MessageImpl& msg, const qpid::messaging::Address& address)
{
@@ -440,7 +458,15 @@ void SenderContext::Delivery::encode(con
PropertiesAdapter properties(msg, address.getSubject());
ApplicationPropertiesAdapter applicationProperties(msg.getHeaders());
//compute size:
- encoded.resize(qpid::amqp::MessageEncoder::getEncodedSize(header, properties, applicationProperties, msg.getBytes()));
+ size_t contentSize = qpid::amqp::MessageEncoder::getEncodedSize(header)
+ + qpid::amqp::MessageEncoder::getEncodedSize(properties)
+ + qpid::amqp::MessageEncoder::getEncodedSize(applicationProperties);
+ if (msg.getContent().isVoid()) {
+ contentSize += qpid::amqp::MessageEncoder::getEncodedSizeForContent(msg.getBytes());
+ } else {
+ contentSize += qpid::amqp::MessageEncoder::getEncodedSizeForValue(msg.getContent()) + 3/*descriptor*/;
+ }
+ encoded.resize(contentSize);
QPID_LOG(debug, "Sending message, buffer is " << encoded.getSize() << " bytes")
qpid::amqp::MessageEncoder encoder(encoded.getData(), encoded.getSize());
//write header:
@@ -451,7 +477,12 @@ void SenderContext::Delivery::encode(con
//write application-properties
encoder.writeApplicationProperties(applicationProperties);
//write body
- if (msg.getBytes().size()) encoder.writeBinary(msg.getBytes(), &qpid::amqp::message::DATA);//structured content not yet directly supported
+ if (!msg.getContent().isVoid()) {
+ //write as AmqpValue
+ encoder.writeValue(msg.getContent(), &qpid::amqp::message::AMQP_VALUE);
+ } else if (msg.getBytes().size()) {
+ encoder.writeBinary(msg.getBytes(), &qpid::amqp::message::DATA);//structured content not yet directly supported
+ }
if (encoder.getPosition() < encoded.getSize()) {
QPID_LOG(debug, "Trimming buffer from " << encoded.getSize() << " to " << encoder.getPosition());
encoded.trim(encoder.getPosition());
@@ -459,19 +490,27 @@ void SenderContext::Delivery::encode(con
//write footer (no annotations yet supported)
}
}
-void SenderContext::Delivery::send(pn_link_t* sender)
+void SenderContext::Delivery::send(pn_link_t* sender, bool unreliable)
{
pn_delivery_tag_t tag;
tag.size = sizeof(id);
tag.bytes = reinterpret_cast<const char*>(&id);
token = pn_delivery(sender, tag);
pn_link_send(sender, encoded.getData(), encoded.getSize());
+ if (unreliable) {
+ pn_delivery_settle(token);
+ presettled = true;
+ }
pn_link_advance(sender);
}
+bool SenderContext::Delivery::sent() const
+{
+ return presettled || token;
+}
bool SenderContext::Delivery::delivered()
{
- if (pn_delivery_remote_state(token) || pn_delivery_settled(token)) {
+ if (presettled || (token && (pn_delivery_remote_state(token) || pn_delivery_settled(token)))) {
//TODO: need a better means for signalling outcomes other than accepted
if (rejected()) {
QPID_LOG(warning, "delivery " << id << " was rejected by peer");
@@ -495,8 +534,19 @@ void SenderContext::Delivery::settle()
{
pn_delivery_settle(token);
}
-void SenderContext::verify(pn_terminus_t* target)
+void SenderContext::verify()
{
+ pn_terminus_t* target = pn_link_remote_target(sender);
+ if (!pn_terminus_get_address(target)) {
+ std::string msg("No such target : ");
+ msg += getTarget();
+ QPID_LOG(debug, msg);
+ throw qpid::messaging::NotFound(msg);
+ } else if (AddressImpl::isTemporary(address)) {
+ address.setName(pn_terminus_get_address(target));
+ QPID_LOG(debug, "Dynamic target name set to " << address.getName());
+ }
+
helper.checkAssertion(target, AddressHelper::FOR_SENDER);
}
void SenderContext::configure()
@@ -505,12 +555,18 @@ void SenderContext::configure()
}
void SenderContext::configure(pn_terminus_t* target)
{
- helper.configure(target, AddressHelper::FOR_SENDER);
+ helper.configure(sender, target, AddressHelper::FOR_SENDER);
+ std::string option;
+ if (helper.getLinkSource(option)) {
+ pn_terminus_set_address(pn_link_source(sender), option.c_str());
+ } else {
+ pn_terminus_set_address(pn_link_source(sender), pn_terminus_get_address(pn_link_target(sender)));
+ }
}
bool SenderContext::settled()
{
- return processUnsettled() == 0;
+ return processUnsettled(false) == 0;
}
Address SenderContext::getAddress() const
@@ -518,4 +574,22 @@ Address SenderContext::getAddress() cons
return address;
}
+
+void SenderContext::reset(pn_session_t* session)
+{
+ sender = pn_sender(session, name.c_str());
+ configure();
+
+ for (Deliveries::iterator i = deliveries.begin(); i != deliveries.end(); ++i) {
+ i->reset();
+ }
+}
+
+void SenderContext::resend()
+{
+ for (Deliveries::iterator i = deliveries.begin(); i != deliveries.end() && pn_link_credit(sender) && !i->sent(); ++i) {
+ i->send(sender, false/*only resend reliable transfers*/);
+ }
+}
+
}}} // namespace qpid::messaging::amqp
Modified: qpid/branches/linearstore/qpid/cpp/src/qpid/messaging/amqp/SenderContext.h
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/messaging/amqp/SenderContext.h?rev=1525101&r1=1525100&r2=1525101&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/messaging/amqp/SenderContext.h (original)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/messaging/amqp/SenderContext.h Fri Sep 20 18:59:30 2013
@@ -52,28 +52,32 @@ class SenderContext
public:
Delivery(int32_t id);
void encode(const qpid::messaging::MessageImpl& message, const qpid::messaging::Address&);
- void send(pn_link_t*);
+ void send(pn_link_t*, bool unreliable);
bool delivered();
bool accepted();
bool rejected();
void settle();
+ void reset();
+ bool sent() const;
private:
int32_t id;
pn_delivery_t* token;
EncodedMessage encoded;
+ bool presettled;
};
SenderContext(pn_session_t* session, const std::string& name, const qpid::messaging::Address& target);
~SenderContext();
+ void reset(pn_session_t* session);
void close();
void setCapacity(uint32_t);
uint32_t getCapacity();
uint32_t getUnsettled();
const std::string& getName() const;
const std::string& getTarget() const;
- Delivery* send(const qpid::messaging::Message& message);
+ bool send(const qpid::messaging::Message& message, Delivery**);
void configure();
- void verify(pn_terminus_t*);
+ void verify();
void check();
bool settled();
Address getAddress() const;
@@ -88,9 +92,11 @@ class SenderContext
int32_t nextId;
Deliveries deliveries;
uint32_t capacity;
+ bool unreliable;
- uint32_t processUnsettled();
+ uint32_t processUnsettled(bool silent);
void configure(pn_terminus_t*);
+ void resend();
};
}}} // namespace qpid::messaging::amqp
Modified: qpid/branches/linearstore/qpid/cpp/src/qpid/messaging/amqp/SenderHandle.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/messaging/amqp/SenderHandle.cpp?rev=1525101&r1=1525100&r2=1525101&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/messaging/amqp/SenderHandle.cpp (original)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/messaging/amqp/SenderHandle.cpp Fri Sep 20 18:59:30 2013
@@ -44,7 +44,7 @@ void SenderHandle::send(const Message& m
void SenderHandle::close()
{
- session->closeSender(getName());
+ connection->detach(session, sender);
}
void SenderHandle::setCapacity(uint32_t capacity)
Modified: qpid/branches/linearstore/qpid/cpp/src/qpid/messaging/amqp/SessionContext.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/messaging/amqp/SessionContext.cpp?rev=1525101&r1=1525100&r2=1525101&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/messaging/amqp/SessionContext.cpp (original)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/messaging/amqp/SessionContext.cpp Fri Sep 20 18:59:30 2013
@@ -79,14 +79,14 @@ boost::shared_ptr<ReceiverContext> Sessi
}
}
-void SessionContext::closeReceiver(const std::string&)
+void SessionContext::removeReceiver(const std::string& n)
{
-
+ receivers.erase(n);
}
-void SessionContext::closeSender(const std::string&)
+void SessionContext::removeSender(const std::string& n)
{
-
+ senders.erase(n);
}
boost::shared_ptr<ReceiverContext> SessionContext::nextReceiver(qpid::messaging::Duration /*timeout*/)
@@ -153,4 +153,25 @@ bool SessionContext::settled()
}
return result;
}
+
+void SessionContext::setName(const std::string& n)
+{
+ name = n;
+}
+std::string SessionContext::getName() const
+{
+ return name;
+}
+
+void SessionContext::reset(pn_connection_t* connection)
+{
+ session = pn_session(connection);
+ unacked.clear();
+ for (SessionContext::SenderMap::iterator i = senders.begin(); i != senders.end(); ++i) {
+ i->second->reset(session);
+ }
+ for (SessionContext::ReceiverMap::iterator i = receivers.begin(); i != receivers.end(); ++i) {
+ i->second->reset(session);
+ }
+}
}}} // namespace qpid::messaging::amqp
Modified: qpid/branches/linearstore/qpid/cpp/src/qpid/messaging/amqp/SessionContext.h
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/messaging/amqp/SessionContext.h?rev=1525101&r1=1525100&r2=1525101&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/messaging/amqp/SessionContext.h (original)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/messaging/amqp/SessionContext.h Fri Sep 20 18:59:30 2013
@@ -50,16 +50,19 @@ class SessionContext
public:
SessionContext(pn_connection_t*);
~SessionContext();
+ void reset(pn_connection_t*);
boost::shared_ptr<SenderContext> createSender(const qpid::messaging::Address& address);
boost::shared_ptr<ReceiverContext> createReceiver(const qpid::messaging::Address& address);
boost::shared_ptr<SenderContext> getSender(const std::string& name) const;
boost::shared_ptr<ReceiverContext> getReceiver(const std::string& name) const;
- void closeReceiver(const std::string&);
- void closeSender(const std::string&);
+ void removeReceiver(const std::string&);
+ void removeSender(const std::string&);
boost::shared_ptr<ReceiverContext> nextReceiver(qpid::messaging::Duration timeout);
uint32_t getReceivable();
uint32_t getUnsettledAcks();
bool settled();
+ void setName(const std::string&);
+ std::string getName() const;
private:
friend class ConnectionContext;
typedef std::map<std::string, boost::shared_ptr<SenderContext> > SenderMap;
@@ -70,6 +73,7 @@ class SessionContext
ReceiverMap receivers;
DeliveryMap unacked;
qpid::framing::SequenceNumber next;
+ std::string name;
qpid::framing::SequenceNumber record(pn_delivery_t*);
void acknowledge();
Modified: qpid/branches/linearstore/qpid/cpp/src/qpid/messaging/amqp/SessionHandle.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/messaging/amqp/SessionHandle.cpp?rev=1525101&r1=1525100&r2=1525101&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/messaging/amqp/SessionHandle.cpp (original)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/messaging/amqp/SessionHandle.cpp Fri Sep 20 18:59:30 2013
@@ -84,15 +84,25 @@ void SessionHandle::sync(bool /*block*/)
qpid::messaging::Sender SessionHandle::createSender(const qpid::messaging::Address& address)
{
boost::shared_ptr<SenderContext> sender = session->createSender(address);
- connection->attach(session, sender);
- return qpid::messaging::Sender(new SenderHandle(connection, session, sender));
+ try {
+ connection->attach(session, sender);
+ return qpid::messaging::Sender(new SenderHandle(connection, session, sender));
+ } catch (...) {
+ session->removeSender(sender->getName());
+ throw;
+ }
}
qpid::messaging::Receiver SessionHandle::createReceiver(const qpid::messaging::Address& address)
{
boost::shared_ptr<ReceiverContext> receiver = session->createReceiver(address);
- connection->attach(session, receiver);
- return qpid::messaging::Receiver(new ReceiverHandle(connection, session, receiver));
+ try {
+ connection->attach(session, receiver);
+ return qpid::messaging::Receiver(new ReceiverHandle(connection, session, receiver));
+ } catch (...) {
+ session->removeReceiver(receiver->getName());
+ throw;
+ }
}
bool SessionHandle::nextReceiver(Receiver& receiver, Duration timeout)
Modified: qpid/branches/linearstore/qpid/cpp/src/qpid/messaging/amqp/SslTransport.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/messaging/amqp/SslTransport.cpp?rev=1525101&r1=1525100&r2=1525101&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/messaging/amqp/SslTransport.cpp (original)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/messaging/amqp/SslTransport.cpp Fri Sep 20 18:59:30 2013
@@ -157,4 +157,11 @@ void SslTransport::activateOutput()
if (aio) aio->notifyPendingWrite();
}
+const qpid::sys::SecuritySettings* SslTransport::getSecuritySettings()
+{
+ securitySettings.ssf = socket.getKeyLen();
+ securitySettings.authid = "dummy";//set to non-empty string to enable external authentication
+ return &securitySettings;
+}
+
}}} // namespace qpid::messaging::amqp
Modified: qpid/branches/linearstore/qpid/cpp/src/qpid/messaging/amqp/SslTransport.h
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/messaging/amqp/SslTransport.h?rev=1525101&r1=1525100&r2=1525101&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/messaging/amqp/SslTransport.h (original)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/messaging/amqp/SslTransport.h Fri Sep 20 18:59:30 2013
@@ -23,6 +23,7 @@
*/
#include "qpid/messaging/amqp/Transport.h"
#include "qpid/sys/Mutex.h"
+#include "qpid/sys/SecuritySettings.h"
#include "qpid/sys/ssl/SslSocket.h"
#include <boost/shared_ptr.hpp>
@@ -50,6 +51,7 @@ class SslTransport : public Transport
void abort();
void connectionEstablished() {};
void close();
+ const qpid::sys::SecuritySettings* getSecuritySettings();
private:
qpid::sys::ssl::SslSocket socket;
@@ -59,6 +61,7 @@ class SslTransport : public Transport
boost::shared_ptr<qpid::sys::Poller> poller;
bool closed;
std::string id;
+ qpid::sys::SecuritySettings securitySettings;
void connected(const qpid::sys::Socket&);
void failed(const std::string& msg);
Modified: qpid/branches/linearstore/qpid/cpp/src/qpid/messaging/amqp/TcpTransport.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/messaging/amqp/TcpTransport.cpp?rev=1525101&r1=1525100&r2=1525101&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/messaging/amqp/TcpTransport.cpp (original)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/messaging/amqp/TcpTransport.cpp Fri Sep 20 18:59:30 2013
@@ -159,4 +159,8 @@ void TcpTransport::activateOutput()
if (aio) aio->notifyPendingWrite();
}
+const qpid::sys::SecuritySettings* TcpTransport::getSecuritySettings()
+{
+ return 0;
+}
}}} // namespace qpid::messaging::amqp
Modified: qpid/branches/linearstore/qpid/cpp/src/qpid/messaging/amqp/TcpTransport.h
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/messaging/amqp/TcpTransport.h?rev=1525101&r1=1525100&r2=1525101&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/messaging/amqp/TcpTransport.h (original)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/messaging/amqp/TcpTransport.h Fri Sep 20 18:59:30 2013
@@ -50,6 +50,7 @@ class TcpTransport : public Transport
void abort();
void connectionEstablished() {};
void close();
+ const qpid::sys::SecuritySettings* getSecuritySettings();
private:
boost::scoped_ptr<qpid::sys::Socket> socket;
Modified: qpid/branches/linearstore/qpid/cpp/src/qpid/messaging/amqp/Transport.h
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/messaging/amqp/Transport.h?rev=1525101&r1=1525100&r2=1525101&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/messaging/amqp/Transport.h (original)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/messaging/amqp/Transport.h Fri Sep 20 18:59:30 2013
@@ -21,12 +21,14 @@
* under the License.
*
*/
+#include "qpid/CommonImportExport.h"
#include "qpid/sys/OutputControl.h"
#include <boost/shared_ptr.hpp>
namespace qpid {
namespace sys {
class Poller;
+struct SecuritySettings;
}
namespace messaging {
namespace amqp {
@@ -38,10 +40,11 @@ class Transport : public qpid::sys::Outp
virtual ~Transport() {}
virtual void connect(const std::string& host, const std::string& port) = 0;
virtual void close() = 0;
+ virtual const qpid::sys::SecuritySettings* getSecuritySettings() = 0;
typedef Transport* Factory(TransportContext&, boost::shared_ptr<qpid::sys::Poller>);
- static Transport* create(const std::string& name, TransportContext&, boost::shared_ptr<qpid::sys::Poller>);
- static void add(const std::string& name, Factory* factory);
+ QPID_COMMON_EXTERN static Transport* create(const std::string& name, TransportContext&, boost::shared_ptr<qpid::sys::Poller>);
+ QPID_COMMON_EXTERN static void add(const std::string& name, Factory* factory);
};
}}} // namespace qpid::messaging::amqp
Modified: qpid/branches/linearstore/qpid/cpp/src/qpid/store/CMakeLists.txt
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/store/CMakeLists.txt?rev=1525101&r1=1525100&r2=1525101&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/store/CMakeLists.txt (original)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/store/CMakeLists.txt Fri Sep 20 18:59:30 2013
@@ -32,7 +32,7 @@ set (store_SOURCES
MessageStorePlugin.cpp
)
add_library (store MODULE ${store_SOURCES})
-target_link_libraries (store qpidbroker ${Boost_PROGRAM_OPTIONS_LIBRARY})
+target_link_libraries (store qpidbroker qpidcommon ${Boost_PROGRAM_OPTIONS_LIBRARY})
if (CMAKE_COMPILER_IS_GNUCXX)
set (GCC_CATCH_UNDEFINED "-Wl,--no-undefined")
# gcc on SunOS uses native linker whose "-z defs" is too fussy
Modified: qpid/branches/linearstore/qpid/cpp/src/qpid/store/ms-clfs/MSSqlClfsProvider.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/store/ms-clfs/MSSqlClfsProvider.cpp?rev=1525101&r1=1525100&r2=1525101&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/store/ms-clfs/MSSqlClfsProvider.cpp (original)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/store/ms-clfs/MSSqlClfsProvider.cpp Fri Sep 20 18:59:30 2013
@@ -34,7 +34,6 @@
#include <qpid/store/StorageProvider.h>
#include <qpid/sys/Mutex.h>
#include <boost/foreach.hpp>
-#include <boost/make_shared.hpp>
// From ms-sql...
#include "BlobAdapter.h"
@@ -356,7 +355,7 @@ MSSqlClfsProvider::finalizeMe()
MSSqlClfsProvider::MSSqlClfsProvider()
: options("MS SQL/CLFS Provider options")
{
- transactions = boost::make_shared<TransactionLog>();
+ transactions.reset(new TransactionLog());
}
MSSqlClfsProvider::~MSSqlClfsProvider()
Modified: qpid/branches/linearstore/qpid/cpp/src/qpid/types/Uuid.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/types/Uuid.cpp?rev=1525101&r1=1525100&r2=1525101&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/types/Uuid.cpp (original)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/types/Uuid.cpp Fri Sep 20 18:59:30 2013
@@ -52,6 +52,11 @@ Uuid::Uuid(const unsigned char* uuid)
::memcpy(bytes, uuid, Uuid::SIZE);
}
+Uuid::Uuid(const char* uuid)
+{
+ ::memcpy(bytes, uuid, Uuid::SIZE);
+}
+
Uuid& Uuid::operator=(const Uuid& other)
{
if (this == &other) return *this;
Modified: qpid/branches/linearstore/qpid/cpp/src/qpid/types/Variant.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/types/Variant.cpp?rev=1525101&r1=1525100&r2=1525101&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/types/Variant.cpp (original)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/types/Variant.cpp Fri Sep 20 18:59:30 2013
@@ -36,6 +36,7 @@ const std::string PREFIX("invalid conver
}
InvalidConversion::InvalidConversion(const std::string& msg) : Exception(PREFIX + msg) {}
+InvalidConversion::~InvalidConversion() throw() {}
class VariantImpl
{
Modified: qpid/branches/linearstore/qpid/cpp/src/qpid/xml/XmlExchange.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/xml/XmlExchange.cpp?rev=1525101&r1=1525100&r2=1525101&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/xml/XmlExchange.cpp (original)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/xml/XmlExchange.cpp Fri Sep 20 18:59:30 2013
@@ -372,6 +372,8 @@ bool XmlExchange::isBound(Queue::shared_
XmlExchange::~XmlExchange()
{
+ if (mgmtExchange != 0)
+ mgmtExchange->debugStats("destroying");
bindingsMap.clear();
}
Modified: qpid/branches/linearstore/qpid/cpp/src/rdma.cmake
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/rdma.cmake?rev=1525101&r1=1525100&r2=1525101&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/rdma.cmake (original)
+++ qpid/branches/linearstore/qpid/cpp/src/rdma.cmake Fri Sep 20 18:59:30 2013
@@ -78,7 +78,7 @@ if (BUILD_RDMA)
COMPONENT ${QPID_COMPONENT_COMMON})
add_library (rdma MODULE qpid/sys/RdmaIOPlugin.cpp)
- target_link_libraries (rdma qpidbroker rdmawrap)
+ target_link_libraries (rdma qpidbroker qpidcommon rdmawrap)
set_target_properties (rdma PROPERTIES
COMPILE_DEFINITIONS _IN_QPID_BROKER
LINK_FLAGS "${CATCH_UNDEFINED}"
@@ -94,7 +94,7 @@ if (BUILD_RDMA)
COMPONENT ${QPID_COMPONENT_BROKER})
add_library (rdmaconnector MODULE qpid/client/RdmaConnector.cpp)
- target_link_libraries (rdmaconnector qpidclient rdmawrap)
+ target_link_libraries (rdmaconnector qpidclient qpidcommon rdmawrap)
set_target_properties (rdmaconnector PROPERTIES
LINK_FLAGS "${CATCH_UNDEFINED}"
PREFIX "")
Propchange: qpid/branches/linearstore/qpid/cpp/src/tests/
------------------------------------------------------------------------------
Merged /qpid/trunk/qpid/cpp/src/tests:r1501885-1525056
Modified: qpid/branches/linearstore/qpid/cpp/src/tests/BrokerFixture.h
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/tests/BrokerFixture.h?rev=1525101&r1=1525100&r2=1525101&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/tests/BrokerFixture.h (original)
+++ qpid/branches/linearstore/qpid/cpp/src/tests/BrokerFixture.h Fri Sep 20 18:59:30 2013
@@ -42,11 +42,45 @@ namespace tests {
struct BrokerFixture : private boost::noncopyable {
typedef qpid::broker::Broker Broker;
typedef boost::intrusive_ptr<Broker> BrokerPtr;
+ typedef std::vector<std::string> Args;
BrokerPtr broker;
+ uint16_t port;
qpid::sys::Thread brokerThread;
- BrokerFixture(Broker::Options opts=Broker::Options(), bool enableMgmt=false) {
+ BrokerFixture(const Args& args=Args(), const Broker::Options& opts=Broker::Options(),
+ bool isExternalPort_=false, uint16_t externalPort_=0)
+ {
+ init(args, opts, isExternalPort_, externalPort_);
+ }
+
+ BrokerFixture(const Broker::Options& opts,
+ bool isExternalPort_=false, uint16_t externalPort_=0)
+ {
+ init(Args(), opts, isExternalPort_, externalPort_);
+ }
+
+ void shutdownBroker() {
+ if (broker) {
+ broker->shutdown();
+ brokerThread.join();
+ broker = BrokerPtr();
+ }
+ }
+
+ ~BrokerFixture() { shutdownBroker(); }
+
+ /** Open a connection to the broker. */
+ void open(qpid::client::Connection& c) {
+ c.open("localhost", getPort());
+ }
+
+ uint16_t getPort() { return port; }
+
+ private:
+ void init(const Args& args, Broker::Options opts,
+ bool isExternalPort=false, uint16_t externalPort=0)
+ {
// Keep the tests quiet unless logging env. vars have been set by user.
if (!::getenv("QPID_LOG_ENABLE") && !::getenv("QPID_TRACE")) {
qpid::log::Options logOpts;
@@ -55,38 +89,28 @@ struct BrokerFixture : private boost::n
logOpts.selectors.push_back("error+");
qpid::log::Logger::instance().configure(logOpts);
}
+ // Default options, may be over-ridden when we parse args.
opts.port=0;
opts.listenInterfaces.push_back("127.0.0.1");
- // Management doesn't play well with multiple in-process brokers.
- opts.enableMgmt=enableMgmt;
opts.workerThreads=1;
opts.dataDir="";
opts.auth=false;
+
+ // Argument parsing
+ std::vector<const char*> argv(args.size());
+ std::transform(args.begin(), args.end(), argv.begin(),
+ boost::bind(&std::string::c_str, _1));
+ Plugin::addOptions(opts);
+ opts.parse(argv.size(), &argv[0]);
broker = Broker::create(opts);
// TODO aconway 2007-12-05: At one point BrokerFixture
// tests could hang in Connection ctor if the following
// line is removed. This may not be an issue anymore.
broker->accept();
- broker->getPort(qpid::broker::Broker::TCP_TRANSPORT);
+ if (isExternalPort) port = externalPort;
+ else port = broker->getPort(qpid::broker::Broker::TCP_TRANSPORT);
brokerThread = qpid::sys::Thread(*broker);
};
-
- void shutdownBroker() {
- if (broker) {
- broker->shutdown();
- brokerThread.join();
- broker = BrokerPtr();
- }
- }
-
- ~BrokerFixture() { shutdownBroker(); }
-
- /** Open a connection to the broker. */
- void open(qpid::client::Connection& c) {
- c.open("localhost", broker->getPort(qpid::broker::Broker::TCP_TRANSPORT));
- }
-
- uint16_t getPort() { return broker->getPort(qpid::broker::Broker::TCP_TRANSPORT); }
};
/** Connection that opens in its constructor */
@@ -125,8 +149,8 @@ template <class ConnectionType, class Se
struct SessionFixtureT : BrokerFixture, ClientT<ConnectionType,SessionType> {
SessionFixtureT(Broker::Options opts=Broker::Options()) :
- BrokerFixture(opts),
- ClientT<ConnectionType,SessionType>(broker->getPort(qpid::broker::Broker::TCP_TRANSPORT))
+ BrokerFixture(BrokerFixture::Args(), opts),
+ ClientT<ConnectionType,SessionType>(getPort())
{}
};
Modified: qpid/branches/linearstore/qpid/cpp/src/tests/BrokerMgmtAgent.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/tests/BrokerMgmtAgent.cpp?rev=1525101&r1=1525100&r2=1525101&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/tests/BrokerMgmtAgent.cpp (original)
+++ qpid/branches/linearstore/qpid/cpp/src/tests/BrokerMgmtAgent.cpp Fri Sep 20 18:59:30 2013
@@ -63,6 +63,7 @@ class AgentFixture
qpid::broker::Broker::Options opts = qpid::broker::Broker::Options())
{
opts.enableMgmt=true;
+ opts.qmf1Support=!qmfV2;
opts.qmf2Support=qmfV2;
opts.mgmtPubInterval=pubInterval;
mFix = new MessagingFixture(opts, true);
Modified: qpid/branches/linearstore/qpid/cpp/src/tests/CMakeLists.txt
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/tests/CMakeLists.txt?rev=1525101&r1=1525100&r2=1525101&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/tests/CMakeLists.txt (original)
+++ qpid/branches/linearstore/qpid/cpp/src/tests/CMakeLists.txt Fri Sep 20 18:59:30 2013
@@ -108,7 +108,6 @@ set(all_unit_tests
ClientMessage
ClientMessageTest
ClientSessionTest
- ConsoleTest
DeliveryRecordTest
DtxWorkRecordTest
exception_test
@@ -147,6 +146,7 @@ set(all_unit_tests
TimerTest
TopicExchangeTest
TxBufferTest
+ TransactionObserverTest
Url
Uuid
Variant
@@ -171,7 +171,7 @@ add_executable (unit_test unit_test
${actual_unit_tests} ${platform_test_additions})
target_link_libraries (unit_test
${qpid_test_boost_libs}
- qpidmessaging qpidbroker qmfconsole)
+ qpidmessaging qpidtypes qpidbroker qpidclient qpidcommon "${Boost_PROGRAM_OPTIONS_LIBRARY}")
set_target_properties (unit_test PROPERTIES COMPILE_DEFINITIONS _IN_QPID_BROKER)
remember_location(unit_test)
@@ -191,105 +191,104 @@ endif (BUILD_SASL)
# Other test programs
#
add_executable (qpid-perftest qpid-perftest.cpp ${platform_test_additions})
-target_link_libraries (qpid-perftest qpidclient)
+target_link_libraries (qpid-perftest qpidclient qpidcommon "${Boost_PROGRAM_OPTIONS_LIBRARY}")
#qpid_perftest_SOURCES=qpid-perftest.cpp test_tools.h TestOptions.h ConnectionOptions.h
remember_location(qpid-perftest)
add_executable (qpid-txtest qpid-txtest.cpp ${platform_test_additions})
-target_link_libraries (qpid-txtest qpidclient)
+target_link_libraries (qpid-txtest qpidclient qpidcommon "${Boost_PROGRAM_OPTIONS_LIBRARY}")
#qpid_txtest_SOURCES=qpid-txtest.cpp TestOptions.h ConnectionOptions.h
remember_location(qpid-txtest)
add_executable (qpid-latency-test qpid-latency-test.cpp ${platform_test_additions})
-target_link_libraries (qpid-latency-test qpidclient)
+target_link_libraries (qpid-latency-test qpidclient qpidcommon "${Boost_PROGRAM_OPTIONS_LIBRARY}")
#qpid_latencytest_SOURCES=qpid-latency-test.cpp TestOptions.h ConnectionOptions.h
remember_location(qpid-latency-test)
add_executable (echotest echotest.cpp ${platform_test_additions})
-target_link_libraries (echotest qpidclient)
+target_link_libraries (echotest qpidclient qpidcommon "${Boost_PROGRAM_OPTIONS_LIBRARY}")
#echotest_SOURCES=echotest.cpp TestOptions.h ConnectionOptions.h
remember_location(echotest)
add_executable (qpid-client-test qpid-client-test.cpp ${platform_test_additions})
-target_link_libraries (qpid-client-test qpidclient)
+target_link_libraries (qpid-client-test qpidclient qpidcommon "${Boost_PROGRAM_OPTIONS_LIBRARY}")
#qpid_client_test_SOURCES=qpid-client-test.cpp TestOptions.h ConnectionOptions.h
remember_location(qpid-client-test)
add_executable (qpid-topic-listener qpid-topic-listener.cpp ${platform_test_additions})
-target_link_libraries (qpid-topic-listener qpidclient)
+target_link_libraries (qpid-topic-listener qpidclient qpidcommon "${Boost_PROGRAM_OPTIONS_LIBRARY}")
#qpid_topic_listener_SOURCES=qpid-topic-listener.cpp TestOptions.h ConnectionOptions.h
remember_location(qpid-topic-listener)
add_executable (qpid-topic-publisher qpid-topic-publisher.cpp ${platform_test_additions})
-target_link_libraries (qpid-topic-publisher qpidclient)
+target_link_libraries (qpid-topic-publisher qpidclient qpidcommon "${Boost_PROGRAM_OPTIONS_LIBRARY}")
#qpid_topic_publisher_SOURCES=qpid-topic-publisher.cpp TestOptions.h ConnectionOptions.h
remember_location(qpid-topic-publisher)
add_executable (publish publish.cpp ${platform_test_additions})
-target_link_libraries (publish qpidclient)
+target_link_libraries (publish qpidclient qpidcommon "${Boost_PROGRAM_OPTIONS_LIBRARY}")
#publish_SOURCES=publish.cpp TestOptions.h ConnectionOptions.h
remember_location(publish)
add_executable (consume consume.cpp ${platform_test_additions})
-target_link_libraries (consume qpidclient)
+target_link_libraries (consume qpidclient qpidcommon "${Boost_PROGRAM_OPTIONS_LIBRARY}")
#consume_SOURCES=consume.cpp TestOptions.h ConnectionOptions.h
remember_location(consume)
add_executable (header_test header_test.cpp ${platform_test_additions})
-target_link_libraries (header_test qpidclient)
+target_link_libraries (header_test qpidclient qpidcommon "${Boost_PROGRAM_OPTIONS_LIBRARY}")
#header_test_SOURCES=header_test.cpp TestOptions.h ConnectionOptions.h
remember_location(header_test)
add_executable (declare_queues declare_queues.cpp ${platform_test_additions})
-target_link_libraries (declare_queues qpidclient)
+target_link_libraries (declare_queues qpidclient qpidcommon)
remember_location(declare_queues)
add_executable (replaying_sender replaying_sender.cpp ${platform_test_additions})
-target_link_libraries (replaying_sender qpidclient)
+target_link_libraries (replaying_sender qpidclient qpidcommon "${Boost_PROGRAM_OPTIONS_LIBRARY}")
remember_location(replaying_sender)
add_executable (resuming_receiver resuming_receiver.cpp ${platform_test_additions})
-target_link_libraries (resuming_receiver qpidclient)
+target_link_libraries (resuming_receiver qpidclient qpidcommon "${Boost_PROGRAM_OPTIONS_LIBRARY}")
remember_location(resuming_receiver)
add_executable (txshift txshift.cpp ${platform_test_additions})
-target_link_libraries (txshift qpidclient)
+target_link_libraries (txshift qpidclient qpidcommon "${Boost_PROGRAM_OPTIONS_LIBRARY}")
#txshift_SOURCES=txshift.cpp TestOptions.h ConnectionOptions.h
remember_location(txshift)
add_executable (txjob txjob.cpp ${platform_test_additions})
-target_link_libraries (txjob qpidclient)
+target_link_libraries (txjob qpidclient qpidcommon "${Boost_PROGRAM_OPTIONS_LIBRARY}")
#txjob_SOURCES=txjob.cpp TestOptions.h ConnectionOptions.h
remember_location(txjob)
add_executable (receiver receiver.cpp ${platform_test_additions})
-target_link_libraries (receiver qpidclient)
-#receiver_SOURCES=receiver.cpp TestOptions.h ConnectionOptions.h
+target_link_libraries (receiver qpidclient qpidcommon "${Boost_PROGRAM_OPTIONS_LIBRARY}")
remember_location(receiver)
+# This is bizarre - using both messaging and client libraries
add_executable (sender sender.cpp Statistics.cpp ${platform_test_additions})
-target_link_libraries (sender qpidmessaging)
-#sender_SOURCES=sender.cpp TestOptions.h ConnectionOptions.h
+target_link_libraries (sender qpidmessaging qpidtypes qpidclient qpidcommon "${Boost_PROGRAM_OPTIONS_LIBRARY}")
remember_location(sender)
add_executable (qpid-receive qpid-receive.cpp Statistics.cpp ${platform_test_additions})
-target_link_libraries (qpid-receive qpidmessaging)
+target_link_libraries (qpid-receive qpidmessaging qpidtypes qpidcommon "${Boost_PROGRAM_OPTIONS_LIBRARY}")
remember_location(qpid-receive)
add_executable (qpid-send qpid-send.cpp Statistics.cpp ${platform_test_additions})
-target_link_libraries (qpid-send qpidmessaging)
+target_link_libraries (qpid-send qpidmessaging qpidtypes qpidcommon "${Boost_PROGRAM_OPTIONS_LIBRARY}")
remember_location(qpid-send)
add_executable (qpid-ping qpid-ping.cpp ${platform_test_additions})
-target_link_libraries (qpid-ping qpidclient)
+target_link_libraries (qpid-ping qpidclient qpidcommon "${Boost_PROGRAM_OPTIONS_LIBRARY}")
remember_location(qpid-ping)
add_executable (datagen datagen.cpp ${platform_test_additions})
-target_link_libraries (datagen qpidclient)
+target_link_libraries (datagen qpidclient qpidcommon "${Boost_PROGRAM_OPTIONS_LIBRARY}")
remember_location(datagen)
add_executable (msg_group_test msg_group_test.cpp ${platform_test_additions})
-target_link_libraries (msg_group_test qpidmessaging)
+target_link_libraries (msg_group_test qpidmessaging qpidtypes qpidcommon "${Boost_PROGRAM_OPTIONS_LIBRARY}")
remember_location(msg_group_test)
if (BUILD_SASL)
@@ -357,6 +356,7 @@ if (PYTHON_EXECUTABLE)
if (BUILD_AMQP)
add_test (interlink_tests ${test_wrap} ${PYTHON_EXECUTABLE} ${CMAKE_CURRENT_SOURCE_DIR}/interlink_tests.py)
endif (BUILD_AMQP)
+ add_test (swig_python_tests ${test_wrap} ${CMAKE_CURRENT_SOURCE_DIR}/swig_python_tests${test_script_suffix})
endif (PYTHON_EXECUTABLE)
add_test (ipv6_test ${shell} ${CMAKE_CURRENT_SOURCE_DIR}/ipv6_test${test_script_suffix})
add_test (federation_tests ${shell} ${CMAKE_CURRENT_SOURCE_DIR}/run_federation_tests${test_script_suffix})
@@ -378,24 +378,7 @@ add_test (queue_redirect ${shell} ${CMAK
add_library(test_store MODULE test_store.cpp)
target_link_libraries (test_store qpidbroker qpidcommon)
-set_target_properties (test_store PROPERTIES
- COMPILE_DEFINITIONS _IN_QPID_BROKER
- PREFIX "")
+set_target_properties (test_store PROPERTIES PREFIX "" COMPILE_DEFINITIONS _IN_QPID_BROKER)
add_library (dlclose_noop MODULE dlclose_noop.c)
-#libdlclose_noop_la_LDFLAGS = -module -rpath $(abs_builddir)
-
-#CLEANFILES+=valgrind.out *.log *.vglog* dummy_test $(unit_wrappers)
-#
-## Longer running stability tests, not run by default check: target.
-## Not run under valgrind, too slow
-#LONG_TESTS=fanout_perftest shared_perftest multiq_perftest topic_perftest
-#EXTRA_DIST+=$(LONG_TESTS) run_perftest
-#check-long:
-# $(MAKE) check TESTS="start_broker $(LONG_TESTS) stop_broker" VALGRIND=
-
-#
-# legacystore
-#
-# add_subdirectory(legacystore)
Modified: qpid/branches/linearstore/qpid/cpp/src/tests/ClientSessionTest.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/tests/ClientSessionTest.cpp?rev=1525101&r1=1525100&r2=1525101&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/tests/ClientSessionTest.cpp (original)
+++ qpid/branches/linearstore/qpid/cpp/src/tests/ClientSessionTest.cpp Fri Sep 20 18:59:30 2013
@@ -37,6 +37,7 @@
#include <boost/lexical_cast.hpp>
#include <boost/bind.hpp>
#include <boost/ptr_container/ptr_vector.hpp>
+#include <boost/format.hpp>
#include <vector>
Modified: qpid/branches/linearstore/qpid/cpp/src/tests/ManagementTest.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/tests/ManagementTest.cpp?rev=1525101&r1=1525100&r2=1525101&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/tests/ManagementTest.cpp (original)
+++ qpid/branches/linearstore/qpid/cpp/src/tests/ManagementTest.cpp Fri Sep 20 18:59:30 2013
@@ -21,7 +21,6 @@
#include "qpid/management/ManagementObject.h"
#include "qpid/framing/Buffer.h"
-#include "qpid/console/ObjectId.h"
#include "unit_test.h"
namespace qpid {
@@ -93,32 +92,6 @@ QPID_AUTO_TEST_CASE(testObjectIdCreate)
BOOST_CHECK_EQUAL(oid.getV2Key(), "an-object-name");
}
-QPID_AUTO_TEST_CASE(testConsoleObjectId) {
- qpid::console::ObjectId oid1, oid2;
-
- oid1.setValue(1, 2);
- oid2.setValue(3, 4);
-
- BOOST_CHECK(oid1 < oid2);
- BOOST_CHECK(oid1 <= oid2);
- BOOST_CHECK(oid2 > oid1);
- BOOST_CHECK(oid2 >= oid1);
- BOOST_CHECK(oid1 != oid2);
- BOOST_CHECK(oid1 == oid1);
-
- oid1.setValue(3, 6);
- oid2.setValue(3, 4);
-
- BOOST_CHECK(oid1 > oid2);
- BOOST_CHECK(oid1 >= oid2);
- BOOST_CHECK(oid2 < oid1);
- BOOST_CHECK(oid2 <= oid1);
- BOOST_CHECK(oid1 != oid2);
-
- oid2.setValue(3, 6);
- BOOST_CHECK(oid1 == oid2);
-}
-
QPID_AUTO_TEST_SUITE_END()
}} // namespace qpid::tests
Modified: qpid/branches/linearstore/qpid/cpp/src/tests/MessageReplayTracker.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/tests/MessageReplayTracker.cpp?rev=1525101&r1=1525100&r2=1525101&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/tests/MessageReplayTracker.cpp (original)
+++ qpid/branches/linearstore/qpid/cpp/src/tests/MessageReplayTracker.cpp Fri Sep 20 18:59:30 2013
@@ -23,6 +23,8 @@
#include "qpid/client/MessageReplayTracker.h"
#include "qpid/sys/Time.h"
+#include <boost/format.hpp>
+
namespace qpid {
namespace tests {
Modified: qpid/branches/linearstore/qpid/cpp/src/tests/MessagingFixture.h
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/tests/MessagingFixture.h?rev=1525101&r1=1525100&r2=1525101&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/tests/MessagingFixture.h (original)
+++ qpid/branches/linearstore/qpid/cpp/src/tests/MessagingFixture.h Fri Sep 20 18:59:30 2013
@@ -35,6 +35,8 @@
#include "qpid/messaging/Message.h"
#include "qpid/types/Variant.h"
+#include <boost/format.hpp>
+
namespace qpid {
namespace tests {
@@ -115,6 +117,7 @@ struct MessagingFixture : public BrokerF
(boost::format("amqp:tcp:localhost:%1%") % (port)).str());
connection.open();
return connection;
+
}
/** Open a connection to the broker. */
@@ -231,9 +234,10 @@ inline void receive(messaging::Receiver&
class MethodInvoker
{
public:
- MethodInvoker(messaging::Session& session) : replyTo("#; {create:always, node:{x-declare:{auto-delete:true}}}"),
- sender(session.createSender("qmf.default.direct/broker")),
- receiver(session.createReceiver(replyTo)) {}
+ MethodInvoker(messaging::Session session) :
+ replyTo("#; {create:always, node:{x-declare:{auto-delete:true}}}"),
+ sender(session.createSender("qmf.default.direct/broker")),
+ receiver(session.createReceiver(replyTo)) {}
void createExchange(const std::string& name, const std::string& type, bool durable=false)
{
@@ -292,11 +296,14 @@ class MethodInvoker
methodRequest("delete", params);
}
- void methodRequest(const std::string& method, const Variant::Map& inParams, Variant::Map* outParams = 0)
+ void methodRequest(
+ const std::string& method,
+ const Variant::Map& inParams, Variant::Map* outParams = 0,
+ const std::string& objectName="org.apache.qpid.broker:broker:amqp-broker")
{
Variant::Map content;
Variant::Map objectId;
- objectId["_object_name"] = "org.apache.qpid.broker:broker:amqp-broker";
+ objectId["_object_name"] = objectName;;
content["_object_id"] = objectId;
content["_method_name"] = method;
content["_arguments"] = inParams;
Modified: qpid/branches/linearstore/qpid/cpp/src/tests/MessagingSessionTests.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/tests/MessagingSessionTests.cpp?rev=1525101&r1=1525100&r2=1525101&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/tests/MessagingSessionTests.cpp (original)
+++ qpid/branches/linearstore/qpid/cpp/src/tests/MessagingSessionTests.cpp Fri Sep 20 18:59:30 2013
@@ -1289,6 +1289,95 @@ QPID_AUTO_TEST_CASE(testSimpleRequestRes
BOOST_CHECK_EQUAL(m.getSubject(), original.getSubject());
}
+QPID_AUTO_TEST_CASE(testSelfDestructQueue)
+{
+ MessagingFixture fix;
+ Session other = fix.connection.createSession();
+ Receiver r1 = other.createReceiver("amq.fanout; {link:{reliability:at-least-once, x-declare:{arguments:{qpid.max_count:10,qpid.policy_type:self-destruct}}}}");
+ Receiver r2 = fix.session.createReceiver("amq.fanout");
+ //send request
+ Sender s = fix.session.createSender("amq.fanout");
+ for (uint i = 0; i < 20; ++i) {
+ s.send(Message((boost::format("MSG_%1%") % (i+1)).str()));
+ }
+ try {
+ ScopedSuppressLogging sl;
+ for (uint i = 0; i < 20; ++i) {
+ r1.fetch(Duration::SECOND);
+ }
+ BOOST_FAIL("Expected exception.");
+ } catch (const qpid::messaging::MessagingException&) {
+ }
+
+ for (uint i = 0; i < 20; ++i) {
+ BOOST_CHECK_EQUAL(r2.fetch(Duration::SECOND).getContent(), (boost::format("MSG_%1%") % (i+1)).str());
+ }
+}
+
+QPID_AUTO_TEST_CASE(testReroutingRingQueue)
+{
+ MessagingFixture fix;
+ Receiver r1 = fix.session.createReceiver("my-queue; {create:always, node:{x-declare:{alternate-exchange:amq.fanout, auto-delete:True, arguments:{qpid.max_count:10,qpid.policy_type:ring}}}}");
+ Receiver r2 = fix.session.createReceiver("amq.fanout");
+
+ Sender s = fix.session.createSender("my-queue");
+ for (uint i = 0; i < 20; ++i) {
+ s.send(Message((boost::format("MSG_%1%") % (i+1)).str()));
+ }
+ for (uint i = 10; i < 20; ++i) {
+ BOOST_CHECK_EQUAL(r1.fetch(Duration::SECOND).getContent(), (boost::format("MSG_%1%") % (i+1)).str());
+ }
+ for (uint i = 0; i < 10; ++i) {
+ BOOST_CHECK_EQUAL(r2.fetch(Duration::SECOND).getContent(), (boost::format("MSG_%1%") % (i+1)).str());
+ }
+}
+
+QPID_AUTO_TEST_CASE(testReleaseOnPriorityQueue)
+{
+ MessagingFixture fix;
+ std::string queue("queue; {create:always, node:{x-declare:{auto-delete:True, arguments:{qpid.priorities:10}}}}");
+ std::string text("my message");
+ Sender sender = fix.session.createSender(queue);
+ sender.send(Message(text));
+ Receiver receiver = fix.session.createReceiver(queue);
+ Message msg;
+ for (uint i = 0; i < 10; ++i) {
+ if (receiver.fetch(msg, Duration::SECOND)) {
+ BOOST_CHECK_EQUAL(msg.getContent(), text);
+ fix.session.release(msg);
+ } else {
+ BOOST_FAIL("Released message not redelivered as expected.");
+ }
+ }
+ fix.session.acknowledge();
+}
+
+QPID_AUTO_TEST_CASE(testRollbackWithFullPrefetch)
+{
+ QueueFixture fix;
+ std::string first("first");
+ std::string second("second");
+ Sender sender = fix.session.createSender(fix.queue);
+ for (uint i = 0; i < 10; ++i) {
+ sender.send(Message((boost::format("MSG_%1%") % (i+1)).str()));
+ }
+ Session txsession = fix.connection.createTransactionalSession();
+ Receiver receiver = txsession.createReceiver(fix.queue);
+ receiver.setCapacity(9);
+ Message msg;
+ for (uint i = 0; i < 10; ++i) {
+ if (receiver.fetch(msg, Duration::SECOND)) {
+ BOOST_CHECK_EQUAL(msg.getContent(), std::string("MSG_1"));
+ txsession.rollback();
+ } else {
+ BOOST_FAIL("Released message not redelivered as expected.");
+ break;
+ }
+ }
+ txsession.acknowledge();
+ txsession.commit();
+}
+
QPID_AUTO_TEST_SUITE_END()
}} // namespace qpid::tests
Modified: qpid/branches/linearstore/qpid/cpp/src/tests/QueuePolicyTest.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/tests/QueuePolicyTest.cpp?rev=1525101&r1=1525100&r2=1525101&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/tests/QueuePolicyTest.cpp (original)
+++ qpid/branches/linearstore/qpid/cpp/src/tests/QueuePolicyTest.cpp Fri Sep 20 18:59:30 2013
@@ -28,6 +28,8 @@
#include "qpid/framing/reply_exceptions.h"
#include "BrokerFixture.h"
+#include <boost/format.hpp>
+
using namespace qpid::broker;
using namespace qpid::client;
using namespace qpid::framing;
Modified: qpid/branches/linearstore/qpid/cpp/src/tests/Shlib.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/tests/Shlib.cpp?rev=1525101&r1=1525100&r2=1525101&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/tests/Shlib.cpp (original)
+++ qpid/branches/linearstore/qpid/cpp/src/tests/Shlib.cpp Fri Sep 20 18:59:30 2013
@@ -35,13 +35,7 @@ typedef void (*CallMe)(int*);
QPID_AUTO_TEST_CASE(testShlib) {
- // The CMake-based build passes in the shared lib suffix; if it's not
- // there, this is a Linux/UNIX libtool-based build.
-#if defined (QPID_SHLIB_PREFIX) && defined (QPID_SHLIB_SUFFIX)
Shlib sh("./" QPID_SHLIB_PREFIX "shlibtest" QPID_SHLIB_POSTFIX QPID_SHLIB_SUFFIX);
-#else
- Shlib sh(".libs/libshlibtest.so");
-#endif
// Double cast to avoid ISO warning.
CallMe callMe=sh.getSymbol<CallMe>("callMe");
BOOST_REQUIRE(callMe != 0);
@@ -59,11 +53,7 @@ QPID_AUTO_TEST_CASE(testShlib) {
QPID_AUTO_TEST_CASE(testAutoShlib) {
int unloaded = 0;
{
-#if defined (QPID_SHLIB_PREFIX) && defined (QPID_SHLIB_SUFFIX)
AutoShlib sh("./" QPID_SHLIB_PREFIX "shlibtest" QPID_SHLIB_POSTFIX QPID_SHLIB_SUFFIX);
-#else
- AutoShlib sh(".libs/libshlibtest.so");
-#endif
CallMe callMe=sh.getSymbol<CallMe>("callMe");
BOOST_REQUIRE(callMe != 0);
callMe(&unloaded);
Modified: qpid/branches/linearstore/qpid/cpp/src/tests/TxMocks.h
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/tests/TxMocks.h?rev=1525101&r1=1525100&r2=1525101&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/tests/TxMocks.h (original)
+++ qpid/branches/linearstore/qpid/cpp/src/tests/TxMocks.h Fri Sep 20 18:59:30 2013
@@ -103,6 +103,9 @@ public:
if(!debugName.empty()) std::cout << std::endl << "MockTxOp[" << debugName << "]::rollback()" << std::endl;
actual.push_back(ROLLBACK);
}
+
+ void callObserver(const boost::shared_ptr<TransactionObserver>&) {}
+
MockTxOp& expectPrepare(){
expected.push_back(PREPARE);
return *this;
Modified: qpid/branches/linearstore/qpid/cpp/src/tests/brokertest.py
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/tests/brokertest.py?rev=1525101&r1=1525100&r2=1525101&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/tests/brokertest.py (original)
+++ qpid/branches/linearstore/qpid/cpp/src/tests/brokertest.py Fri Sep 20 18:59:30 2013
@@ -78,7 +78,7 @@ def error_line(filename, n=1):
except: return ""
return ":\n" + "".join(result)
-def retry(function, timeout=10, delay=.01, max_delay=1):
+def retry(function, timeout=10, delay=.001, max_delay=1):
"""Call function until it returns a true value or timeout expires.
Double the delay for each retry up to max_delay.
Returns what function returns if true, None if timeout expires."""
@@ -141,7 +141,7 @@ class Popen(subprocess.Popen):
finally: f.close()
log.debug("Started process %s: %s" % (self.pname, " ".join(self.cmd)))
- def __str__(self): return "Popen<%s>"%(self.pname)
+ def __repr__(self): return "Popen<%s>"%(self.pname)
def outfile(self, ext): return "%s.%s" % (self.pname, ext)
@@ -242,16 +242,12 @@ class Broker(Popen):
_broker_count = 0
_log_count = 0
- def __str__(self): return "Broker<%s %s :%d>"%(self.log, self.pname, self.port())
-
- def find_log(self):
- self.log = "%03d:%s.log" % (Broker._log_count, self.name)
- Broker._log_count += 1
+ def __repr__(self): return "<Broker:%s:%d>"%(self.log, self.port())
def get_log(self):
return os.path.abspath(self.log)
- def __init__(self, test, args=[], name=None, expect=EXPECT_RUNNING, port=0, log_level=None, wait=None, show_cmd=False):
+ def __init__(self, test, args=[], test_store=False, name=None, expect=EXPECT_RUNNING, port=0, log_level=None, wait=None, show_cmd=False):
"""Start a broker daemon. name determines the data-dir and log
file names."""
@@ -273,11 +269,18 @@ class Broker(Popen):
else:
self.name = "broker%d" % Broker._broker_count
Broker._broker_count += 1
- self.find_log()
+
+ self.log = "%03d:%s.log" % (Broker._log_count, self.name)
+ self.store_log = "%03d:%s.store.log" % (Broker._log_count, self.name)
+ Broker._log_count += 1
+
cmd += ["--log-to-file", self.log]
cmd += ["--log-to-stderr=no"]
cmd += ["--log-enable=%s"%(log_level or "info+") ]
+ if test_store: cmd += ["--load-module", BrokerTest.test_store_lib,
+ "--test-store-events", self.store_log]
+
self.datadir = self.name
cmd += ["--data-dir", self.datadir]
if show_cmd: print cmd
@@ -285,7 +288,6 @@ class Broker(Popen):
test.cleanup_stop(self)
self._host = "127.0.0.1"
log.debug("Started broker %s (%s, %s)" % (self.name, self.pname, self.log))
- self._log_ready = False
def startQmf(self, handler=None):
self.qmf_session = qmf.console.Session(handler)
@@ -363,29 +365,21 @@ class Broker(Popen):
def host_port(self): return "%s:%s" % (self.host(), self.port())
- def log_contains(self, str, timeout=1):
- """Wait for str to appear in the log file up to timeout. Return true if found"""
- return retry(lambda: find_in_file(str, self.log), timeout)
-
- def log_ready(self):
- """Return true if the log file exists and contains a broker ready message"""
- if not self._log_ready:
- self._log_ready = find_in_file("notice Broker running", self.log)
- return self._log_ready
-
def ready(self, timeout=30, **kwargs):
"""Wait till broker is ready to serve clients"""
- # First make sure the broker is listening by checking the log.
- if not retry(self.log_ready, timeout=timeout):
- raise Exception(
- "Timed out waiting for broker %s%s"%(self.name, error_line(self.log,5)))
- # Create a connection and a session.
- try:
- c = self.connect(**kwargs)
- try: c.session()
- finally: c.close()
- except Exception,e: raise RethrownException(
- "Broker %s not responding: (%s)%s"%(self.name,e,error_line(self.log, 5)))
+ deadline = time.time()+timeout
+ while True:
+ try:
+ c = self.connect(timeout=timeout, **kwargs)
+ try:
+ c.session()
+ return # All good
+ finally: c.close()
+ except Exception,e: # Retry up to timeout
+ if time.time() > deadline:
+ raise RethrownException(
+ "Broker %s not responding: (%s)%s"%(
+ self.name,e,error_line(self.log, 5)))
def browse(session, queue, timeout=0, transform=lambda m: m.content):
"""Return a list with the contents of each message on queue."""
@@ -407,7 +401,7 @@ def assert_browse(session, queue, expect
if msg: msg = "%s: %r != %r"%(msg, expect_contents, actual_contents)
assert expect_contents == actual_contents, msg
-def assert_browse_retry(session, queue, expect_contents, timeout=1, delay=.01, transform=lambda m:m.content, msg="browse failed"):
+def assert_browse_retry(session, queue, expect_contents, timeout=1, delay=.001, transform=lambda m:m.content, msg="browse failed"):
"""Wait up to timeout for contents of queue to match expect_contents"""
test = lambda: browse(session, queue, 0, transform=transform) == expect_contents
retry(test, timeout, delay)
@@ -421,6 +415,10 @@ class BrokerTest(TestCase):
Provides a well-known working directory for each test.
"""
+ def __init__(self, *args, **kwargs):
+ self.longMessage = True # Enable long messages for assert*(..., msg=xxx)
+ TestCase.__init__(self, *args, **kwargs)
+
# Environment settings.
qpidd_exec = os.path.abspath(checkenv("QPIDD_EXEC"))
ha_lib = os.getenv("HA_LIB")
@@ -480,7 +478,7 @@ class BrokerTest(TestCase):
def assert_browse(self, *args, **kwargs): assert_browse(*args, **kwargs)
def assert_browse_retry(self, *args, **kwargs): assert_browse_retry(*args, **kwargs)
-def join(thread, timeout=10):
+def join(thread, timeout=30):
thread.join(timeout)
if thread.isAlive(): raise Exception("Timed out joining thread %s"%thread)
Modified: qpid/branches/linearstore/qpid/cpp/src/tests/cli_tests.py
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/tests/cli_tests.py?rev=1525101&r1=1525100&r2=1525101&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/tests/cli_tests.py (original)
+++ qpid/branches/linearstore/qpid/cpp/src/tests/cli_tests.py Fri Sep 20 18:59:30 2013
@@ -69,47 +69,39 @@ class CliTests(TestBase010):
self.startBrokerAccess()
queue1 = self.makeQueue("test_queue_params1", "--limit-policy none")
queue2 = self.makeQueue("test_queue_params2", "--limit-policy reject")
- queue3 = self.makeQueue("test_queue_params3", "--limit-policy flow-to-disk")
- queue4 = self.makeQueue("test_queue_params4", "--limit-policy ring")
- queue5 = self.makeQueue("test_queue_params5", "--limit-policy ring-strict")
+ queue3 = self.makeQueue("test_queue_params3", "--limit-policy ring")
LIMIT = "qpid.policy_type"
assert LIMIT not in queue1.arguments
self.assertEqual(queue2.arguments[LIMIT], "reject")
- self.assertEqual(queue3.arguments[LIMIT], "flow_to_disk")
- self.assertEqual(queue4.arguments[LIMIT], "ring")
- self.assertEqual(queue5.arguments[LIMIT], "ring_strict")
+ self.assertEqual(queue3.arguments[LIMIT], "ring")
- queue6 = self.makeQueue("test_queue_params6", "--lvq-key lkey")
+ queue4 = self.makeQueue("test_queue_params4", "--lvq-key lkey")
LVQKEY = "qpid.last_value_queue_key"
- assert LVQKEY not in queue5.arguments
- assert LVQKEY in queue6.arguments
- assert queue6.arguments[LVQKEY] == "lkey"
+ assert LVQKEY not in queue3.arguments
+ assert LVQKEY in queue4.arguments
+ assert queue4.arguments[LVQKEY] == "lkey"
def test_queue_params_api(self):
self.startBrokerAccess()
queue1 = self.makeQueue("test_queue_params_api1", "--limit-policy none", True)
queue2 = self.makeQueue("test_queue_params_api2", "--limit-policy reject", True)
- queue3 = self.makeQueue("test_queue_params_api3", "--limit-policy flow-to-disk", True)
- queue4 = self.makeQueue("test_queue_params_api4", "--limit-policy ring", True)
- queue5 = self.makeQueue("test_queue_params_api5", "--limit-policy ring-strict", True)
+ queue3 = self.makeQueue("test_queue_params_api3", "--limit-policy ring", True)
LIMIT = "qpid.policy_type"
assert LIMIT not in queue1.arguments
self.assertEqual(queue2.arguments[LIMIT], "reject")
- self.assertEqual(queue3.arguments[LIMIT], "flow_to_disk")
- self.assertEqual(queue4.arguments[LIMIT], "ring")
- self.assertEqual(queue5.arguments[LIMIT], "ring_strict")
+ self.assertEqual(queue3.arguments[LIMIT], "ring")
- queue6 = self.makeQueue("test_queue_params_api6", "--lvq-key lkey")
+ queue4 = self.makeQueue("test_queue_params_api4", "--lvq-key lkey")
LVQKEY = "qpid.last_value_queue_key"
- assert LVQKEY not in queue5.arguments
- assert LVQKEY in queue6.arguments
- assert queue6.arguments[LVQKEY] == "lkey"
+ assert LVQKEY not in queue3.arguments
+ assert LVQKEY in queue4.arguments
+ assert queue4.arguments[LVQKEY] == "lkey"
def test_qpid_config(self):
Modified: qpid/branches/linearstore/qpid/cpp/src/tests/ha_test.py
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/tests/ha_test.py?rev=1525101&r1=1525100&r2=1525101&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/tests/ha_test.py (original)
+++ qpid/branches/linearstore/qpid/cpp/src/tests/ha_test.py Fri Sep 20 18:59:30 2013
@@ -48,6 +48,24 @@ class QmfAgent(object):
address, client_properties={"qpid.ha-admin":1}, **kwargs)
self._agent = BrokerAgent(self._connection)
+ def queues(self):
+ return [q.values['name'] for q in self._agent.getAllQueues()]
+
+ def repsub_queue(self, sub):
+ """If QMF subscription sub is a replicating subscription return
+ the name of the replicated queue, else return None"""
+ session_name = self.getSession(sub.sessionRef).name
+ m = re.search("qpid.ha-q:(.*)\.", session_name)
+ return m and m.group(1)
+
+ def repsub_queues(self):
+ """Return queue names for all replicating subscriptions"""
+ return filter(None, [self.repsub_queue(s) for s in self.getAllSubscriptions()])
+
+ def tx_queues(self):
+ """Return names of all tx-queues"""
+ return [q for q in self.queues() if q.startswith("qpid.ha-tx")]
+
def __getattr__(self, name):
a = getattr(self._agent, name)
return a
@@ -101,6 +119,9 @@ class HaBroker(Broker):
"""Start a broker with HA enabled
@param client_cred: (user, password, mechanism) for admin clients started by the HaBroker.
"""
+
+ heartbeat=2
+
def __init__(self, test, ha_port=None, args=[], brokers_url=None, ha_cluster=True,
ha_replicate="all", client_credentials=None, **kwargs):
assert BrokerTest.ha_lib, "Cannot locate HA plug-in"
@@ -112,7 +133,7 @@ class HaBroker(Broker):
"--link-maintenance-interval=0.1",
# Heartbeat and negotiate time are needed so that a broker wont
# stall on an address that doesn't currently have a broker running.
- "--link-heartbeat-interval=1",
+ "--link-heartbeat-interval=%s"%(HaBroker.heartbeat),
"--max-negotiate-time=1000",
"--ha-cluster=%s"%ha_cluster]
if ha_replicate is not None:
@@ -139,15 +160,18 @@ acl allow all all
self.client_credentials = client_credentials
self.ha_port = ha_port
- def __str__(self): return Broker.__str__(self)
+ def __repr__(self): return "<HaBroker:%s:%d>"%(self.log, self.port())
def qpid_ha(self, args):
- cred = self.client_credentials
- url = self.host_port()
- if cred:
- url =cred.add_user(url)
- args = args + ["--sasl-mechanism", cred.mechanism]
- self.qpid_ha_script.main_except(["", "-b", url]+args)
+ try:
+ cred = self.client_credentials
+ url = self.host_port()
+ if cred:
+ url =cred.add_user(url)
+ args = args + ["--sasl-mechanism", cred.mechanism]
+ self.qpid_ha_script.main_except(["", "-b", url]+args)
+ except Exception, e:
+ raise Exception("Error in qpid_ha -b %s %s: %s"%(url, args,e))
def promote(self): self.ready(); self.qpid_ha(["promote"])
def set_public_url(self, url): self.qpid_ha(["set", "--public-url", url])
@@ -221,6 +245,12 @@ acl allow all all
def wait_backup(self, address): self.wait_address(address)
+ def browse(self, queue, timeout=0, transform=lambda m: m.content):
+ c = self.connect_admin()
+ try:
+ return browse(c.session(), queue, timeout, transform)
+ finally: c.close()
+
def assert_browse(self, queue, expected, **kwargs):
"""Verify queue contents by browsing."""
bs = self.connect().session()
@@ -247,8 +277,10 @@ acl allow all all
try: return self.connect()
except ConnectionError: return None
- def ready(self):
- return Broker.ready(self, client_properties={"qpid.ha-admin":1})
+ def ready(self, *args, **kwargs):
+ if not 'client_properties' in kwargs: kwargs['client_properties'] = {}
+ kwargs['client_properties']['qpid.ha-admin'] = True
+ return Broker.ready(self, *args, **kwargs)
def kill(self, final=True):
if final: self.ha_port.stop()
@@ -259,16 +291,19 @@ acl allow all all
class HaCluster(object):
_cluster_count = 0
- def __init__(self, test, n, promote=True, wait=True, args=[], **kwargs):
+ def __init__(self, test, n, promote=True, wait=True, args=[], s_args=[], **kwargs):
"""Start a cluster of n brokers.
@test: The test being run
@n: start n brokers
@promote: promote self[0] to primary
@wait: wait for primary active and backups ready. Ignored if promote=False
+ @args: args for all brokers in the cluster.
+ @s_args: args for specific brokers: s_args[i] for broker i.
"""
self.test = test
- self.args = args
+ self.args = copy(args)
+ self.s_args = copy(s_args)
self.kwargs = kwargs
self._ports = [HaPort(test) for i in xrange(n)]
self._set_url()
@@ -288,10 +323,13 @@ class HaCluster(object):
self.broker_id += 1
return name
- def _ha_broker(self, ha_port, name):
+ def _ha_broker(self, i, name):
+ args = self.args
+ if i < len(self.s_args): args += self.s_args[i]
+ ha_port = self._ports[i]
b = HaBroker(ha_port.test, ha_port, brokers_url=self.url, name=name,
- args=self.args, **self.kwargs)
- b.ready()
+ args=args, **self.kwargs)
+ b.ready(timeout=5)
return b
def start(self):
@@ -302,7 +340,7 @@ class HaCluster(object):
self._ports.append(HaPort(self.test))
self._set_url()
self._update_urls()
- b = self._ha_broker(self._ports[i], self.next_name())
+ b = self._ha_broker(i, self.next_name())
self._brokers.append(b)
return b
@@ -328,7 +366,7 @@ class HaCluster(object):
a separate log file: foo.n.log"""
if self._ports[i].stopped: raise Exception("Restart after final kill: %s"%(self))
b = self._brokers[i]
- self._brokers[i] = self._ha_broker(self._ports[i], b.name)
+ self._brokers[i] = self._ha_broker(i, b.name)
self._brokers[i].ready()
def bounce(self, i, promote_next=True):
Modified: qpid/branches/linearstore/qpid/cpp/src/tests/ha_tests.py
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/tests/ha_tests.py?rev=1525101&r1=1525100&r2=1525101&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/tests/ha_tests.py (original)
+++ qpid/branches/linearstore/qpid/cpp/src/tests/ha_tests.py Fri Sep 20 18:59:30 2013
@@ -20,7 +20,7 @@
import os, signal, sys, time, imp, re, subprocess, glob, random, logging, shutil, math, unittest
import traceback
-from qpid.messaging import Message, SessionError, NotFound, ConnectionError, ReceiverError, Connection, Timeout, Disposition, REJECTED, Empty
+from qpid.messaging import Message, SessionError, NotFound, ConnectionError, ReceiverError, Connection, Timeout, Disposition, REJECTED, Empty, ServerError
from qpid.datatypes import uuid4, UUID
from brokertest import *
from ha_test import *
@@ -37,6 +37,7 @@ def grep(filename, regexp):
class HaBrokerTest(BrokerTest):
"""Base class for HA broker tests"""
+
def assert_log_no_errors(self, broker):
log = broker.get_log()
if grep(log, re.compile("] error|] critical")):
@@ -219,7 +220,8 @@ class ReplicationTests(HaBrokerTest):
backup.connect_admin().close()
# Test discovery: should connect to primary after reject by backup
- c = backup.connect(reconnect_urls=[primary.host_port(), backup.host_port()], reconnect=True)
+ c = backup.connect(reconnect_urls=[primary.host_port(), backup.host_port()],
+ reconnect=True)
s = c.session()
sender = s.sender("q;{create:always}")
backup.wait_backup("q")
@@ -561,9 +563,9 @@ class ReplicationTests(HaBrokerTest):
return
acl=os.path.join(os.getcwd(), "policy.acl")
aclf=file(acl,"w")
- # Verify that replication works with auth=yes and HA user has at least the following
- # privileges:
+ # Minimum set of privileges required for the HA user.
aclf.write("""
+# HA user
acl allow zag@QPID access queue
acl allow zag@QPID create queue
acl allow zag@QPID consume queue
@@ -575,6 +577,9 @@ acl allow zag@QPID publish exchange
acl allow zag@QPID delete exchange
acl allow zag@QPID access method
acl allow zag@QPID create link
+# Normal user
+acl allow zig@QPID all all
+
acl deny all all
""")
aclf.close()
@@ -585,14 +590,16 @@ acl deny all all
"--ha-username=zag", "--ha-password=zag", "--ha-mechanism=PLAIN"
],
client_credentials=Credentials("zag", "zag", "PLAIN"))
- s0 = cluster[0].connect(username="zag", password="zag").session();
- s0.receiver("q;{create:always}")
- s0.receiver("ex;{create:always,node:{type:topic,x-declare:{type:'fanout'},x-bindings:[{exchange:'ex',queue:'q'}]}}")
- cluster[1].wait_backup("q")
- cluster[1].wait_backup("ex")
- s1 = cluster[1].connect_admin().session(); # Uses Credentials above.
- s1.sender("ex").send("foo");
- self.assertEqual(s1.receiver("q").fetch().content, "foo")
+ c = cluster[0].connect(username="zig", password="zig")
+ s0 = c.session();
+ s0.sender("q;{create:always}")
+ s0.sender("ex;{create:always,node:{type:topic,x-declare:{type:'fanout'},x-bindings:[{exchange:'ex',queue:'q'}]}}")
+ s0.sender("ex").send("foo");
+ s1 = c.session(transactional=True)
+ s1.sender("ex").send("foo-tx");
+ cluster[1].assert_browse_backup("q", ["foo"])
+ s1.commit()
+ cluster[1].assert_browse_backup("q", ["foo", "foo-tx"])
def test_alternate_exchange(self):
"""Verify that alternate-exchange on exchanges and queues is propagated
@@ -927,20 +934,22 @@ class LongTests(HaBrokerTest):
if d: return float(d)*60
else: return 3 # Default is to be quick
- # FIXME aconway 2013-06-27: skip this test pending a fix for
- # https://issues.apache.org/jira/browse/QPID-4944
- def skip_test_failover_send_receive(self):
+ def test_failover_send_receive(self):
"""Test failover with continuous send-receive"""
brokers = HaCluster(self, 3)
# Start sender and receiver threads
n = 10
- senders = [NumberedSender(brokers[0], url=brokers.url,
- max_depth=1024, failover_updates=False,
- queue="test%s"%(i)) for i in xrange(n)]
- receivers = [NumberedReceiver(brokers[0], url=brokers.url, sender=senders[i],
- failover_updates=False,
- queue="test%s"%(i)) for i in xrange(n)]
+ senders = [
+ NumberedSender(
+ brokers[0], url=brokers.url,max_depth=50, failover_updates=False,
+ queue="test%s"%(i), args=["--capacity=10"]) for i in xrange(n)]
+
+ receivers = [
+ NumberedReceiver(
+ brokers[0], url=brokers.url, sender=senders[i],failover_updates=False,
+ queue="test%s"%(i), args=["--capacity=10"]) for i in xrange(n)]
+
for r in receivers: r.start()
for s in senders: s.start()
@@ -991,7 +1000,7 @@ class LongTests(HaBrokerTest):
finally:
for s in senders: s.stop()
for r in receivers: r.stop()
- dead = filter(lambda i: not brokers[i].is_running(), xrange(3))
+ dead = filter(lambda b: not b.is_running(), brokers)
if dead: raise Exception("Brokers not running: %s"%dead)
def test_qmf_order(self):
@@ -1200,7 +1209,7 @@ class ConfigurationTests(HaBrokerTest):
cluster[0].set_brokers_url(cluster.url+",xxx:1234")
self.assertRaises(Empty, r.fetch, 0) # Not updated for brokers URL
-class StoreTests(BrokerTest):
+class StoreTests(HaBrokerTest):
"""Test for HA with persistence."""
def check_skip(self):
@@ -1248,7 +1257,7 @@ class StoreTests(BrokerTest):
doing catch-up from the primary."""
if self.check_skip(): return
cluster = HaCluster(self, 2)
- sn = cluster[0].connect(heartbeat=1).session()
+ sn = cluster[0].connect(heartbeat=HaBroker.heartbeat).session()
s1 = sn.sender("q1;{create:always,node:{durable:true}}")
for m in ["foo","bar"]: s1.send(Message(m, durable=True))
s2 = sn.sender("q2;{create:always,node:{durable:true}}")
@@ -1259,7 +1268,7 @@ class StoreTests(BrokerTest):
cluster[1].assert_browse_backup("q2", ["hello"])
# Make changes that the backup doesn't see
cluster.kill(1, promote_next=False, final=False)
- r1 = cluster[0].connect(heartbeat=1).session().receiver("q1")
+ r1 = cluster[0].connect(heartbeat=HaBroker.heartbeat).session().receiver("q1")
for m in ["foo", "bar"]: self.assertEqual(r1.fetch().content, m)
r1.session.acknowledge()
for m in ["x","y","z"]: s1.send(Message(m, durable=True))
@@ -1278,7 +1287,7 @@ class StoreTests(BrokerTest):
cluster[0].assert_browse("q1", ["x","y","z"])
cluster[1].assert_browse_backup("q1", ["x","y","z"])
- sn = cluster[0].connect(heartbeat=1).session() # FIXME aconway 2012-09-25: should fail over!
+ sn = cluster[0].connect(heartbeat=HaBroker.heartbeat).session()
sn.sender("ex/k1").send("boo")
cluster[0].assert_browse_backup("q1", ["x","y","z", "boo"])
cluster[1].assert_browse_backup("q1", ["x","y","z", "boo"])
@@ -1287,6 +1296,185 @@ class StoreTests(BrokerTest):
cluster[0].assert_browse("q2", ["hello", "end"])
cluster[1].assert_browse_backup("q2", ["hello", "end"])
+def open_read(name):
+ try:
+ f = open(name)
+ return f.read()
+ finally: f.close()
+
+class TransactionTests(HaBrokerTest):
+
+ load_store=["--load-module", BrokerTest.test_store_lib]
+
+ def tx_simple_setup(self, broker):
+ """Start a transaction, remove messages from queue a, add messages to queue b"""
+ c = broker.connect()
+ # Send messages to a, no transaction.
+ sa = c.session().sender("a;{create:always,node:{durable:true}}")
+ tx_msgs = ["x","y","z"]
+ for m in tx_msgs: sa.send(Message(content=m, durable=True))
+
+ # Receive messages from a, in transaction.
+ tx = c.session(transactional=True)
+ txr = tx.receiver("a")
+ tx_msgs2 = [txr.fetch(1).content for i in xrange(3)]
+ self.assertEqual(tx_msgs, tx_msgs2)
+
+ # Send messages to b, transactional, mixed with non-transactional.
+ sb = c.session().sender("b;{create:always,node:{durable:true}}")
+ txs = tx.sender("b")
+ msgs = [str(i) for i in xrange(3)]
+ for tx_m,m in zip(tx_msgs2, msgs):
+ txs.send(tx_m);
+ sb.send(m)
+ return tx
+
+ def tx_subscriptions(self, broker):
+ """Return list of queue names for tx subscriptions"""
+ return [q for q in broker.agent().repsub_queues()
+ if q.startswith("qpid.ha-tx")]
+
+ def test_tx_simple_commit(self):
+ cluster = HaCluster(self, 2, test_store=True)
+ tx = self.tx_simple_setup(cluster[0])
+ tx.sync()
+ tx_queues = cluster[0].agent().tx_queues()
+
+ # NOTE: backup does not process transactional dequeues until prepare
+ cluster[1].assert_browse_backup("a", ["x","y","z"])
+ cluster[1].assert_browse_backup("b", ['0', '1', '2'])
+
+ tx.acknowledge()
+ tx.commit()
+ tx.sync()
+
+ for b in cluster: self.assert_simple_commit_outcome(b, tx_queues)
+
+ def assert_tx_cleanup(self, b, tx_queues):
+ """Verify that there are no transaction artifacts
+ (exchanges, queues, subscriptions) on b."""
+
+ self.assertEqual(0, len(b.agent().tx_queues()), msg=b)
+ self.assertEqual(0, len(self.tx_subscriptions(b)), msg=b)
+
+ # TX exchanges don't show up in management so test for existence by name.
+ s = b.connect_admin().session()
+ try:
+ for q in tx_queues:
+ try:
+ s.sender("%s;{node:{type:topic}}"%q)
+ self.fail("Found tx exchange %s on %s "%(q,b))
+ except NotFound: pass
+ finally: s.connection.close()
+
+ def assert_simple_commit_outcome(self, b, tx_queues):
+ b.assert_browse_backup("a", [], msg=b)
+ b.assert_browse_backup("b", ['0', '1', '2', 'x', 'y', 'z'], msg=b)
+ # Check for expected actions on the store
+ expect = """<enqueue a x>
+<enqueue a y>
+<enqueue a z>
+<begin tx 1>
+<dequeue a x tx=1>
+<dequeue a y tx=1>
+<dequeue a z tx=1>
+<commit tx=1>
+"""
+ self.assertEqual(expect, open_read(b.store_log), msg=b)
+ self.assert_tx_cleanup(b, tx_queues)
+
+ def test_tx_simple_rollback(self):
+ cluster = HaCluster(self, 2, test_store=True)
+ tx = self.tx_simple_setup(cluster[0])
+ tx.sync()
+ tx_queues = cluster[0].agent().tx_queues()
+ tx.acknowledge()
+ tx.rollback()
+ for b in cluster: self.assert_simple_rollback_outcome(b, tx_queues)
+
+ def assert_simple_rollback_outcome(self, b, tx_queues):
+ b.assert_browse_backup("a", ["x","y","z"], msg=b)
+ b.assert_browse_backup("b", ['0', '1', '2'], msg=b)
+ # Check for expected actions on the store
+ expect = """<enqueue a x>
+<enqueue a y>
+<enqueue a z>
+"""
+ self.assertEqual(open_read(b.store_log), expect, msg=b)
+ self.assert_tx_cleanup(b, tx_queues)
+
+ def test_tx_simple_failover(self):
+ cluster = HaCluster(self, 3, test_store=True)
+ tx = self.tx_simple_setup(cluster[0])
+ tx.sync()
+ tx_queues = cluster[0].agent().tx_queues()
+ tx.acknowledge()
+ cluster.bounce(0) # Should cause roll-back
+ cluster[0].wait_status("ready") # Restarted.
+ cluster[1].wait_status("active") # Promoted.
+ cluster[2].wait_status("ready") # Failed over.
+ for b in cluster: self.assert_simple_rollback_outcome(b, tx_queues)
+
+ def test_tx_no_backups(self):
+ """Test the special case of a TX where there are no backups"""
+
+ # Test commit
+ cluster = HaCluster(self, 1, test_store=True)
+ tx = self.tx_simple_setup(cluster[0])
+ tx.acknowledge()
+ tx.commit()
+ tx.sync()
+ tx_queues = cluster[0].agent().tx_queues()
+ self.assert_simple_commit_outcome(cluster[0], tx_queues)
+
+ # Test rollback
+ cluster = HaCluster(self, 1, test_store=True)
+ tx = self.tx_simple_setup(cluster[0])
+ tx.sync()
+ tx_queues = cluster[0].agent().tx_queues()
+ tx.acknowledge()
+ tx.rollback()
+ tx.sync()
+ self.assert_simple_rollback_outcome(cluster[0], tx_queues)
+
+
+ def test_tx_backup_fail(self):
+ cluster = HaCluster(
+ self, 2, test_store=True, s_args=[[],["--test-store-throw=bang"]])
+ c = cluster[0].connect()
+ tx = c.session(transactional=True)
+ s = tx.sender("q;{create:always,node:{durable:true}}")
+ for m in ["foo","bang","bar"]: s.send(Message(m, durable=True))
+ self.assertRaises(ServerError, tx.commit)
+ for b in cluster: b.assert_browse_backup("q", [])
+ self.assertEqual(open_read(cluster[0].store_log), "<begin tx 1>\n<abort tx=1>\n")
+ self.assertEqual(open_read(cluster[1].store_log), "<begin tx 1>\n<enqueue q foo tx=1>\n<enqueue q bang tx=1>\n<abort tx=1>\n")
+
+ def test_tx_join_leave(self):
+ """Test cluster members joining/leaving cluster.
+ Also check that tx-queues are cleaned up at end of transaction."""
+
+ cluster = HaCluster(self, 3)
+
+ # Leaving
+ tx = cluster[0].connect().session(transactional=True)
+ s = tx.sender("q;{create:always}")
+ s.send("a", sync=True)
+ self.assertEqual([1,1,1], [len(b.agent().tx_queues()) for b in cluster])
+ cluster[1].kill(final=False)
+ s.send("b")
+ self.assertRaises(ServerError, tx.commit)
+ self.assertEqual([[],[]], [b.agent().tx_queues() for b in [cluster[0],cluster[2]]])
+
+ # Joining
+ tx = cluster[0].connect().session(transactional=True)
+ s = tx.sender("q;{create:always}")
+ s.send("foo")
+ cluster.restart(1)
+ tx.commit()
+ # The new member is not in the tx but receives the results normal replication.
+ for b in cluster: b.assert_browse_backup("q", ["foo"], msg=b)
+
if __name__ == "__main__":
outdir = "ha_tests.tmp"
shutil.rmtree(outdir, True)
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org