You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kp...@apache.org on 2013/09/20 20:59:50 UTC
svn commit: r1525101 [6/21] - in /qpid/branches/linearstore/qpid: ./ bin/
cpp/ cpp/bindings/ cpp/bindings/qmf/ cpp/bindings/qmf/python/
cpp/bindings/qmf/ruby/ cpp/bindings/qmf/tests/ cpp/bindings/qmf2/
cpp/bindings/qmf2/examples/cpp/ cpp/bindings/qmf2/...
Modified: qpid/branches/linearstore/qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.h
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.h?rev=1525101&r1=1525100&r2=1525101&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.h (original)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.h Fri Sep 20 18:59:30 2013
@@ -53,6 +53,10 @@ class ConnectionImpl : public qpid::mess
void setOption(const std::string& name, const qpid::types::Variant& value);
bool backoff();
std::string getAuthenticatedUsername();
+ void reconnect(const std::string& url);
+ void reconnect();
+ std::string getUrl() const;
+ bool getAutoDecode() const;
private:
typedef std::map<std::string, qpid::messaging::Session> Sessions;
@@ -63,13 +67,14 @@ class ConnectionImpl : public qpid::mess
bool replaceUrls; // Replace rather than merging with reconnect-urls
std::vector<std::string> urls;
qpid::client::ConnectionSettings settings;
- bool reconnect;
+ bool autoReconnect;
double timeout;
int32_t limit;
double minReconnectInterval;
double maxReconnectInterval;
int32_t retries;
bool reconnectOnLimitExceeded;
+ bool disableAutoDecode;
void setOptions(const qpid::types::Variant::Map& options);
void connect(const qpid::sys::AbsTime& started);
Modified: qpid/branches/linearstore/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp?rev=1525101&r1=1525100&r2=1525101&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp (original)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp Fri Sep 20 18:59:30 2013
@@ -399,7 +399,7 @@ void populate(qpid::messaging::Message&
//need to be able to link the message back to the transfer it was delivered by
//e.g. for rejecting.
MessageImplAccess::get(message).setInternalId(command.getId());
-
+
message.setContent(command.getContent());
populateHeaders(message, command.getHeaders());
Modified: qpid/branches/linearstore/qpid/cpp/src/qpid/client/amqp0_10/OutgoingMessage.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/client/amqp0_10/OutgoingMessage.cpp?rev=1525101&r1=1525100&r2=1525101&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/client/amqp0_10/OutgoingMessage.cpp (original)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/client/amqp0_10/OutgoingMessage.cpp Fri Sep 20 18:59:30 2013
@@ -21,6 +21,7 @@
#include "qpid/client/amqp0_10/OutgoingMessage.h"
#include "qpid/client/amqp0_10/AddressResolution.h"
#include "qpid/amqp_0_10/Codecs.h"
+#include "qpid/types/encodings.h"
#include "qpid/types/Variant.h"
#include "qpid/messaging/Address.h"
#include "qpid/messaging/Message.h"
@@ -45,13 +46,30 @@ const std::string SUBJECT("qpid.subject"
const std::string X_APP_ID("x-amqp-0-10.app-id");
const std::string X_ROUTING_KEY("x-amqp-0-10.routing-key");
const std::string X_CONTENT_ENCODING("x-amqp-0-10.content-encoding");
+const std::string TEXT_PLAIN("text/plain");
}
void OutgoingMessage::convert(const qpid::messaging::Message& from)
{
//TODO: need to avoid copying as much as possible
- message.setData(from.getContent());
- message.getMessageProperties().setContentType(from.getContentType());
+ if (from.getContentObject().getType() == qpid::types::VAR_MAP) {
+ std::string content;
+ qpid::amqp_0_10::MapCodec::encode(from.getContentObject().asMap(), content);
+ message.getMessageProperties().setContentType(qpid::amqp_0_10::MapCodec::contentType);
+ message.setData(content);
+ } else if (from.getContentObject().getType() == qpid::types::VAR_LIST) {
+ std::string content;
+ qpid::amqp_0_10::ListCodec::encode(from.getContentObject().asList(), content);
+ message.getMessageProperties().setContentType(qpid::amqp_0_10::ListCodec::contentType);
+ message.setData(content);
+ } else if (from.getContentObject().getType() == qpid::types::VAR_STRING &&
+ (from.getContentObject().getEncoding() == qpid::types::encodings::UTF8 || from.getContentObject().getEncoding() == qpid::types::encodings::ASCII)) {
+ message.getMessageProperties().setContentType(TEXT_PLAIN);
+ message.setData(from.getContent());
+ } else {
+ message.setData(from.getContent());
+ message.getMessageProperties().setContentType(from.getContentType());
+ }
if ( !from.getCorrelationId().empty() )
message.getMessageProperties().setCorrelationId(from.getCorrelationId());
message.getMessageProperties().setUserId(from.getUserId());
Modified: qpid/branches/linearstore/qpid/cpp/src/qpid/client/amqp0_10/ReceiverImpl.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/client/amqp0_10/ReceiverImpl.cpp?rev=1525101&r1=1525100&r2=1525101&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/client/amqp0_10/ReceiverImpl.cpp (original)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/client/amqp0_10/ReceiverImpl.cpp Fri Sep 20 18:59:30 2013
@@ -25,6 +25,8 @@
#include "qpid/messaging/exceptions.h"
#include "qpid/messaging/Receiver.h"
#include "qpid/messaging/Session.h"
+#include "qpid/amqp_0_10/Codecs.h"
+#include "qpid/types/encodings.h"
namespace qpid {
namespace client {
@@ -83,6 +85,7 @@ void ReceiverImpl::start()
if (state == STOPPED) {
state = STARTED;
startFlow(l);
+ session.sendCompletion();
}
}
@@ -148,18 +151,42 @@ qpid::messaging::Address ReceiverImpl::g
}
ReceiverImpl::ReceiverImpl(SessionImpl& p, const std::string& name,
- const qpid::messaging::Address& a) :
+ const qpid::messaging::Address& a, bool autoDecode_) :
- parent(&p), destination(name), address(a), byteCredit(0xFFFFFFFF),
+ parent(&p), destination(name), address(a), byteCredit(0xFFFFFFFF), autoDecode(autoDecode_),
state(UNRESOLVED), capacity(0), window(0) {}
+namespace {
+const std::string TEXT_PLAIN("text/plain");
+}
+
bool ReceiverImpl::getImpl(qpid::messaging::Message& message, qpid::messaging::Duration timeout)
{
{
sys::Mutex::ScopedLock l(lock);
if (state == CANCELLED) return false;
}
- return parent->get(*this, message, timeout);
+ if (parent->get(*this, message, timeout)) {
+ if (autoDecode) {
+ if (message.getContentType() == qpid::amqp_0_10::MapCodec::contentType) {
+ message.getContentObject() = qpid::types::Variant::Map();
+ decode(message, message.getContentObject().asMap());
+ } else if (message.getContentType() == qpid::amqp_0_10::ListCodec::contentType) {
+ message.getContentObject() = qpid::types::Variant::List();
+ decode(message, message.getContentObject().asList());
+ } else if (!message.getContentBytes().empty()) {
+ message.getContentObject() = message.getContentBytes();
+ if (message.getContentType() == TEXT_PLAIN) {
+ message.getContentObject().setEncoding(qpid::types::encodings::UTF8);
+ } else {
+ message.getContentObject().setEncoding(qpid::types::encodings::BINARY);
+ }
+ }
+ }
+ return true;
+ } else {
+ return false;
+ }
}
bool ReceiverImpl::fetchImpl(qpid::messaging::Message& message, qpid::messaging::Duration timeout)
Modified: qpid/branches/linearstore/qpid/cpp/src/qpid/client/amqp0_10/ReceiverImpl.h
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/client/amqp0_10/ReceiverImpl.h?rev=1525101&r1=1525100&r2=1525101&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/client/amqp0_10/ReceiverImpl.h (original)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/client/amqp0_10/ReceiverImpl.h Fri Sep 20 18:59:30 2013
@@ -48,7 +48,7 @@ class ReceiverImpl : public qpid::messag
enum State {UNRESOLVED, STOPPED, STARTED, CANCELLED};
ReceiverImpl(SessionImpl& parent, const std::string& name,
- const qpid::messaging::Address& address);
+ const qpid::messaging::Address& address, bool autoDecode);
void init(qpid::client::AsyncSession session, AddressResolution& resolver);
bool get(qpid::messaging::Message& message, qpid::messaging::Duration timeout);
@@ -74,6 +74,7 @@ class ReceiverImpl : public qpid::messag
const std::string destination;
const qpid::messaging::Address address;
const uint32_t byteCredit;
+ const bool autoDecode;
State state;
std::auto_ptr<MessageSource> source;
Modified: qpid/branches/linearstore/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp?rev=1525101&r1=1525100&r2=1525101&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp (original)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp Fri Sep 20 18:59:30 2013
@@ -207,7 +207,7 @@ Receiver SessionImpl::createReceiverImpl
ScopedLock l(lock);
std::string name = address.getName();
getFreeKey(name, receivers);
- Receiver receiver(new ReceiverImpl(*this, name, address));
+ Receiver receiver(new ReceiverImpl(*this, name, address, connection->getAutoDecode()));
getImplPtr<Receiver, ReceiverImpl>(receiver)->init(session, resolver);
receivers[name] = receiver;
return receiver;
Modified: qpid/branches/linearstore/qpid/cpp/src/qpid/client/windows/SaslFactory.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/client/windows/SaslFactory.cpp?rev=1525101&r1=1525100&r2=1525101&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/client/windows/SaslFactory.cpp (original)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/client/windows/SaslFactory.cpp Fri Sep 20 18:59:30 2013
@@ -26,6 +26,7 @@
#include "qpid/sys/SecurityLayer.h"
#include "qpid/sys/SecuritySettings.h"
#include "qpid/log/Statement.h"
+#include "qpid/NullSaslServer.h"
#include "boost/tokenizer.hpp"
@@ -107,6 +108,12 @@ std::auto_ptr<Sasl> SaslFactory::create(
return sasl;
}
+std::auto_ptr<SaslServer> SaslFactory::createServer( const std::string& realm, bool /*encryptionRequired*/, const qpid::sys::SecuritySettings& )
+{
+ std::auto_ptr<SaslServer> server(new NullSaslServer(realm));
+ return server;
+}
+
namespace {
const std::string ANONYMOUS = "ANONYMOUS";
const std::string PLAIN = "PLAIN";
Modified: qpid/branches/linearstore/qpid/cpp/src/qpid/framing/Buffer.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/framing/Buffer.cpp?rev=1525101&r1=1525100&r2=1525101&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/framing/Buffer.cpp (original)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/framing/Buffer.cpp Fri Sep 20 18:59:30 2013
@@ -182,27 +182,27 @@ double Buffer::getDouble(){
}
template <>
-uint64_t Buffer::getUInt<1>() {
+QPID_COMMON_EXTERN uint64_t Buffer::getUInt<1>() {
return getOctet();
}
template <>
-uint64_t Buffer::getUInt<2>() {
+QPID_COMMON_EXTERN uint64_t Buffer::getUInt<2>() {
return getShort();
}
template <>
-uint64_t Buffer::getUInt<4>() {
+QPID_COMMON_EXTERN uint64_t Buffer::getUInt<4>() {
return getLong();
}
template <>
-uint64_t Buffer::getUInt<8>() {
+QPID_COMMON_EXTERN uint64_t Buffer::getUInt<8>() {
return getLongLong();
}
template <>
-void Buffer::putUInt<1>(uint64_t i) {
+QPID_COMMON_EXTERN void Buffer::putUInt<1>(uint64_t i) {
if (std::numeric_limits<uint8_t>::min() <= i && i <= std::numeric_limits<uint8_t>::max()) {
putOctet(i);
return;
@@ -211,7 +211,7 @@ void Buffer::putUInt<1>(uint64_t i) {
}
template <>
-void Buffer::putUInt<2>(uint64_t i) {
+QPID_COMMON_EXTERN void Buffer::putUInt<2>(uint64_t i) {
if (std::numeric_limits<uint16_t>::min() <= i && i <= std::numeric_limits<uint16_t>::max()) {
putShort(i);
return;
@@ -220,7 +220,7 @@ void Buffer::putUInt<2>(uint64_t i) {
}
template <>
-void Buffer::putUInt<4>(uint64_t i) {
+QPID_COMMON_EXTERN void Buffer::putUInt<4>(uint64_t i) {
if (std::numeric_limits<uint32_t>::min() <= i && i <= std::numeric_limits<uint32_t>::max()) {
putLong(i);
return;
@@ -229,7 +229,7 @@ void Buffer::putUInt<4>(uint64_t i) {
}
template <>
-void Buffer::putUInt<8>(uint64_t i) {
+QPID_COMMON_EXTERN void Buffer::putUInt<8>(uint64_t i) {
putLongLong(i);
}
Modified: qpid/branches/linearstore/qpid/cpp/src/qpid/framing/FrameSet.h
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/framing/FrameSet.h?rev=1525101&r1=1525100&r2=1525101&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/framing/FrameSet.h (original)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/framing/FrameSet.h Fri Sep 20 18:59:30 2013
@@ -9,9 +9,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -44,6 +44,8 @@ class FrameSet
public:
typedef boost::shared_ptr<FrameSet> shared_ptr;
+ typedef Frames::iterator iterator;
+ typedef Frames::const_iterator const_iterator;
QPID_COMMON_EXTERN FrameSet(const SequenceNumber& id);
QPID_COMMON_EXTERN FrameSet(const FrameSet&);
@@ -62,7 +64,7 @@ public:
QPID_COMMON_EXTERN AMQMethodBody* getMethod();
QPID_COMMON_EXTERN const AMQHeaderBody* getHeaders() const;
QPID_COMMON_EXTERN AMQHeaderBody* getHeaders();
-
+
template <class T> bool isA() const {
const AMQMethodBody* method = getMethod();
return method && method->isA<T>();
@@ -71,12 +73,12 @@ public:
template <class T> const T* as() const {
const AMQMethodBody* method = getMethod();
return (method && method->isA<T>()) ? dynamic_cast<const T*>(method) : 0;
- }
+ }
template <class T> T* as() {
AMQMethodBody* method = getMethod();
return (method && method->isA<T>()) ? dynamic_cast<T*>(method) : 0;
- }
+ }
template <class T> const T* getHeaderProperties() const {
const AMQHeaderBody* header = getHeaders();
@@ -85,7 +87,7 @@ public:
Frames::const_iterator begin() const { return parts.begin(); }
Frames::const_iterator end() const { return parts.end(); }
-
+
const SequenceNumber& getId() const { return id; }
template <class P> void remove(P predicate) {
Modified: qpid/branches/linearstore/qpid/cpp/src/qpid/ha/AlternateExchangeSetter.h
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/ha/AlternateExchangeSetter.h?rev=1525101&r1=1525100&r2=1525101&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/ha/AlternateExchangeSetter.h (original)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/ha/AlternateExchangeSetter.h Fri Sep 20 18:59:30 2013
@@ -25,7 +25,6 @@
#include "qpid/log/Statement.h"
#include "qpid/broker/Exchange.h"
#include "qpid/broker/ExchangeRegistry.h"
-#include "boost/function.hpp"
#include <map>
namespace qpid {
Modified: qpid/branches/linearstore/qpid/cpp/src/qpid/ha/Backup.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/ha/Backup.cpp?rev=1525101&r1=1525100&r2=1525101&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/ha/Backup.cpp (original)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/ha/Backup.cpp Fri Sep 20 18:59:30 2013
@@ -52,9 +52,7 @@ using sys::Mutex;
Backup::Backup(HaBroker& hb, const Settings& s) :
logPrefix("Backup: "), membership(hb.getMembership()), stopped(false),
haBroker(hb), broker(hb.getBroker()), settings(s),
- statusCheck(
- new StatusCheck(
- logPrefix, broker.getOptions().linkHeartbeatInterval, hb.getBrokerInfo()))
+ statusCheck(new StatusCheck(hb))
{}
void Backup::setBrokerUrl(const Url& brokers) {
Modified: qpid/branches/linearstore/qpid/cpp/src/qpid/ha/Backup.h
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/ha/Backup.h?rev=1525101&r1=1525100&r2=1525101&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/ha/Backup.h (original)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/ha/Backup.h Fri Sep 20 18:59:30 2013
@@ -59,6 +59,8 @@ class Backup : public Role
Role* promote();
+ boost::shared_ptr<BrokerReplicator> getBrokerReplicator() { return replicator; }
+
private:
void stop(sys::Mutex::ScopedLock&);
Role* recover(sys::Mutex::ScopedLock&);
Modified: qpid/branches/linearstore/qpid/cpp/src/qpid/ha/BrokerInfo.h
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/ha/BrokerInfo.h?rev=1525101&r1=1525100&r2=1525101&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/ha/BrokerInfo.h (original)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/ha/BrokerInfo.h Fri Sep 20 18:59:30 2013
@@ -23,6 +23,7 @@
*/
#include "types.h"
+#include "hash.h"
#include "qpid/Url.h"
#include "qpid/framing/FieldTable.h"
#include "qpid/types/Uuid.h"
@@ -42,7 +43,7 @@ class BrokerInfo
{
public:
typedef std::set<BrokerInfo> Set;
- typedef qpid::sys::unordered_map<types::Uuid, BrokerInfo, types::Uuid::Hasher> Map;
+ typedef qpid::sys::unordered_map<types::Uuid, BrokerInfo, Hasher<types::Uuid> > Map;
BrokerInfo();
BrokerInfo(const types::Uuid& id, BrokerStatus, const Address& = Address());
Modified: qpid/branches/linearstore/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp?rev=1525101&r1=1525100&r2=1525101&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp (original)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp Fri Sep 20 18:59:30 2013
@@ -21,6 +21,7 @@
#include "BrokerReplicator.h"
#include "HaBroker.h"
#include "QueueReplicator.h"
+#include "TxReplicator.h"
#include "qpid/broker/Broker.h"
#include "qpid/broker/amqp_0_10/Connection.h"
#include "qpid/broker/ConnectionObserver.h"
@@ -129,7 +130,6 @@ const string COLON(":");
void sendQuery(const string& packageName, const string& className, const string& queueName,
SessionHandler& sessionHandler)
{
- framing::AMQP_ServerProxy peer(sessionHandler.out);
Variant::Map request;
request[WHAT] = OBJECT;
Variant::Map schema;
@@ -229,8 +229,8 @@ class BrokerReplicator::UpdateTracker {
typedef boost::function<void (const std::string&)> CleanFn;
UpdateTracker(const std::string& type_, // "queue" or "exchange"
- CleanFn f, const ReplicationTest& rt)
- : type(type_), cleanFn(f), repTest(rt) {}
+ CleanFn f)
+ : type(type_), cleanFn(f) {}
/** Destructor cleans up remaining initial queues. */
~UpdateTracker() {
@@ -245,16 +245,10 @@ class BrokerReplicator::UpdateTracker {
}
/** Add an exchange name */
- void addExchange(Exchange::shared_ptr ex) {
- if (repTest.getLevel(*ex))
- initial.insert(ex->getName());
- }
+ void addExchange(Exchange::shared_ptr ex) { initial.insert(ex->getName()); }
/** Add a queue name. */
- void addQueue(Queue::shared_ptr q) {
- if (repTest.getLevel(*q))
- initial.insert(q->getName());
- }
+ void addQueue(Queue::shared_ptr q) { initial.insert(q->getName()); }
/** Received an event for name */
void event(const std::string& name) {
@@ -275,13 +269,13 @@ class BrokerReplicator::UpdateTracker {
void clean(const std::string& name) {
QPID_LOG(info, "Backup: Deleted " << type << " " << name <<
": no longer exists on primary");
- cleanFn(name);
+ try { cleanFn(name); }
+ catch (const framing::NotFoundException&) {}
}
std::string type;
Names initial, events;
CleanFn cleanFn;
- ReplicationTest repTest;
};
namespace {
@@ -349,7 +343,8 @@ BrokerReplicator::~BrokerReplicator() {
namespace {
void collectQueueReplicators(
- const boost::shared_ptr<Exchange> ex, set<boost::shared_ptr<QueueReplicator> >& collect)
+ const boost::shared_ptr<Exchange>& ex,
+ set<boost::shared_ptr<QueueReplicator> >& collect)
{
boost::shared_ptr<QueueReplicator> qr(boost::dynamic_pointer_cast<QueueReplicator>(ex));
if (qr) collect.insert(qr);
@@ -390,16 +385,13 @@ void BrokerReplicator::connected(Bridge&
exchangeTracker.reset(
new UpdateTracker("exchange",
- boost::bind(&BrokerReplicator::deleteExchange, this, _1),
- replicationTest));
- exchanges.eachExchange(
- boost::bind(&UpdateTracker::addExchange, exchangeTracker.get(), _1));
+ boost::bind(&BrokerReplicator::deleteExchange, this, _1)));
+ exchanges.eachExchange(boost::bind(&BrokerReplicator::existingExchange, this, _1));
queueTracker.reset(
new UpdateTracker("queue",
- boost::bind(&BrokerReplicator::deleteQueue, this, _1, true),
- replicationTest));
- queues.eachQueue(boost::bind(&UpdateTracker::addQueue, queueTracker.get(), _1));
+ boost::bind(&BrokerReplicator::deleteQueue, this, _1, true)));
+ queues.eachQueue(boost::bind(&BrokerReplicator::existingQueue, this, _1));
framing::AMQP_ServerProxy peer(sessionHandler.out);
const qmf::org::apache::qpid::broker::ArgsLinkBridge& args(bridge.getArgs());
@@ -428,6 +420,21 @@ void BrokerReplicator::connected(Bridge&
sendQuery(ORG_APACHE_QPID_BROKER, BINDING, queueName, sessionHandler);
}
+// Called for each queue in existence when the backup connects to a primary.
+void BrokerReplicator::existingQueue(const boost::shared_ptr<Queue>& q) {
+ if (replicationTest.getLevel(*q)) {
+ QPID_LOG(debug, "Existing queue: " << q->getName());
+ queueTracker->addQueue(q);
+ }
+}
+
+void BrokerReplicator::existingExchange(const boost::shared_ptr<Exchange>& ex) {
+ if (replicationTest.getLevel(*ex)) {
+ QPID_LOG(debug, "Existing exchange: " << ex->getName());
+ exchangeTracker->addExchange(ex);
+ }
+}
+
void BrokerReplicator::route(Deliverable& msg) {
// We transition from JOINING->CATCHUP on the first message received from the primary.
// Until now we couldn't be sure if we had a good connection to the primary.
@@ -554,11 +561,7 @@ void BrokerReplicator::doEventExchangeDe
void BrokerReplicator::doEventExchangeDelete(Variant::Map& values) {
string name = values[EXNAME].asString();
boost::shared_ptr<Exchange> exchange = exchanges.find(name);
- if (!exchange) {
- QPID_LOG(warning, logPrefix << "Exchange delete event, not found: " << name);
- } else if (!replicationTest.getLevel(*exchange)) {
- QPID_LOG(warning, logPrefix << "Exchange delete event, not replicated: " << name);
- } else {
+ if (exchange && replicationTest.getLevel(*exchange)) {
QPID_LOG(debug, logPrefix << "Exchange delete event:" << name);
if (exchangeTracker.get()) exchangeTracker->event(name);
deleteExchange(name);
@@ -651,8 +654,10 @@ void BrokerReplicator::doResponseQueue(V
if (!queueTracker.get())
throw Exception(QPID_MSG("Unexpected queue response: " << values));
if (!queueTracker->response(name)) return; // Response is out-of-date
+
QPID_LOG(debug, logPrefix << "Queue response: " << name);
boost::shared_ptr<Queue> queue = queues.find(name);
+
if (queue) { // Already exists
bool uuidOk = (getHaUuid(queue->getSettings().original) == getHaUuid(argsMap));
if (!uuidOk) QPID_LOG(debug, logPrefix << "UUID mismatch for queue: " << name);
@@ -660,6 +665,7 @@ void BrokerReplicator::doResponseQueue(V
QPID_LOG(debug, logPrefix << "Queue response replacing queue: " << name);
deleteQueue(name);
}
+
framing::FieldTable args;
qpid::amqp_0_10::translate(argsMap, args);
boost::shared_ptr<QueueReplicator> qr = replicateQueue(
@@ -770,8 +776,13 @@ boost::shared_ptr<QueueReplicator> Broke
const boost::shared_ptr<Queue>& queue)
{
if (replicationTest.getLevel(*queue) == ALL) {
- boost::shared_ptr<QueueReplicator> qr(
- new QueueReplicator(haBroker, queue, link));
+ boost::shared_ptr<QueueReplicator> qr;
+ if (TxReplicator::isTxQueue(queue->getName())){
+ qr.reset(new TxReplicator(haBroker, queue, link));
+ }
+ else {
+ qr.reset(new QueueReplicator(haBroker, queue, link));
+ }
qr->activate();
return qr;
}
@@ -785,7 +796,7 @@ void BrokerReplicator::deleteQueue(const
// messages. Any reroutes will be done at the primary and
// replicated as normal.
if (purge) queue->purge(0, boost::shared_ptr<Exchange>());
- broker.deleteQueue(name, userId, remoteHost);
+ haBroker.getBroker().deleteQueue(name, userId, remoteHost);
QPID_LOG(debug, logPrefix << "Queue deleted: " << name);
}
}
@@ -864,28 +875,35 @@ bool BrokerReplicator::isBound(boost::sh
string BrokerReplicator::getType() const { return QPID_CONFIGURATION_REPLICATOR; }
-void BrokerReplicator::autoDeleteCheck(boost::shared_ptr<Exchange> ex) {
+void BrokerReplicator::disconnectedExchange(boost::shared_ptr<Exchange> ex) {
boost::shared_ptr<QueueReplicator> qr(boost::dynamic_pointer_cast<QueueReplicator>(ex));
- if (!qr) return;
- assert(qr);
- if (qr->getQueue()->isAutoDelete() && qr->isSubscribed()) {
- if (qr->getQueue()->getSettings().autoDeleteDelay) {
- // Start the auto-delete timer
- qr->getQueue()->releaseFromUse();
- qr->getQueue()->scheduleAutoDelete();
+ if (qr) {
+ qr->disconnect();
+ if (TxReplicator::isTxQueue(qr->getQueue()->getName())) {
+ // Transactions are aborted on failover so clean up tx-queues
+ deleteQueue(qr->getQueue()->getName());
}
- else {
- // Delete immediately. Don't purge, the primary is gone so we need
- // to reroute the deleted messages.
- deleteQueue(qr->getQueue()->getName(), false);
+ else if (qr->getQueue()->isAutoDelete() && qr->isSubscribed()) {
+ if (qr->getQueue()->getSettings().autoDeleteDelay) {
+ // Start the auto-delete timer
+ qr->getQueue()->releaseFromUse();
+ qr->getQueue()->scheduleAutoDelete();
+ }
+ else {
+ // Delete immediately. Don't purge, the primary is gone so we need
+ // to reroute the deleted messages.
+ deleteQueue(qr->getQueue()->getName(), false);
+ }
}
}
}
+typedef vector<boost::shared_ptr<Exchange> > ExchangeVector;
+
// Callback function for accumulating exchange candidates
namespace {
- void exchangeAccumulatorCallback(vector<boost::shared_ptr<Exchange> >& c, const Exchange::shared_ptr& i) {
- c.push_back(i);
+ void exchangeAccumulatorCallback(ExchangeVector& ev, const Exchange::shared_ptr& i) {
+ ev.push_back(i);
}
}
@@ -893,13 +911,12 @@ namespace {
void BrokerReplicator::disconnected() {
QPID_LOG(info, logPrefix << "Disconnected from primary " << primary);
connection = 0;
- // Clean up auto-delete queues
- vector<boost::shared_ptr<Exchange> > collect;
- // Make a copy so we can work outside the ExchangeRegistry lock
- exchanges.eachExchange(
- boost::bind(&exchangeAccumulatorCallback, boost::ref(collect), _1));
- for_each(collect.begin(), collect.end(),
- boost::bind(&BrokerReplicator::autoDeleteCheck, this, _1));
+
+ // Make copy of exchanges so we can work outside the registry lock.
+ ExchangeVector exs;
+ exchanges.eachExchange(boost::bind(&exchangeAccumulatorCallback, boost::ref(exs), _1));
+ for_each(exs.begin(), exs.end(),
+ boost::bind(&BrokerReplicator::disconnectedExchange, this, _1));
}
void BrokerReplicator::setMembership(const Variant::List& brokers) {
Modified: qpid/branches/linearstore/qpid/cpp/src/qpid/ha/BrokerReplicator.h
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/ha/BrokerReplicator.h?rev=1525101&r1=1525100&r2=1525101&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/ha/BrokerReplicator.h (original)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/ha/BrokerReplicator.h Fri Sep 20 18:59:30 2013
@@ -71,6 +71,8 @@ class BrokerReplicator : public broker::
public boost::enable_shared_from_this<BrokerReplicator>
{
public:
+ typedef boost::shared_ptr<QueueReplicator> QueueReplicatorPtr;
+
BrokerReplicator(HaBroker&, const boost::shared_ptr<broker::Link>&);
~BrokerReplicator();
@@ -84,8 +86,9 @@ class BrokerReplicator : public broker::
bool isBound(boost::shared_ptr<broker::Queue>, const std::string* const, const framing::FieldTable* const);
void shutdown();
+ QueueReplicatorPtr findQueueReplicator(const std::string& qname);
+
private:
- typedef boost::shared_ptr<QueueReplicator> QueueReplicatorPtr;
typedef std::pair<boost::shared_ptr<broker::Queue>, bool> CreateQueueResult;
typedef std::pair<boost::shared_ptr<broker::Exchange>, bool> CreateExchangeResult;
@@ -99,6 +102,8 @@ class BrokerReplicator : public broker::
class ConnectionObserver;
void connected(broker::Bridge&, broker::SessionHandler&);
+ void existingQueue(const boost::shared_ptr<broker::Queue>&);
+ void existingExchange(const boost::shared_ptr<broker::Exchange>&);
void doEventQueueDeclare(types::Variant::Map& values);
void doEventQueueDelete(types::Variant::Map& values);
@@ -114,7 +119,6 @@ class BrokerReplicator : public broker::
void doResponseBind(types::Variant::Map& values);
void doResponseHaBroker(types::Variant::Map& values);
- QueueReplicatorPtr findQueueReplicator(const std::string& qname);
QueueReplicatorPtr startQueueReplicator(const boost::shared_ptr<broker::Queue>&);
QueueReplicatorPtr replicateQueue(
@@ -135,8 +139,7 @@ class BrokerReplicator : public broker::
void deleteQueue(const std::string& name, bool purge=true);
void deleteExchange(const std::string& name);
- void autoDeleteCheck(boost::shared_ptr<broker::Exchange>);
-
+ void disconnectedExchange(boost::shared_ptr<broker::Exchange>);
void disconnected();
void setMembership(const types::Variant::List&); // Set membership from list.
Modified: qpid/branches/linearstore/qpid/cpp/src/qpid/ha/FailoverExchange.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/ha/FailoverExchange.cpp?rev=1525101&r1=1525100&r2=1525101&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/ha/FailoverExchange.cpp (original)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/ha/FailoverExchange.cpp Fri Sep 20 18:59:30 2013
@@ -19,7 +19,7 @@
*
*/
#include "FailoverExchange.h"
-#include "makeMessage.h"
+#include "Event.h"
#include "qpid/broker/amqp_0_10/MessageTransfer.h"
#include "qpid/broker/Message.h"
#include "qpid/broker/DeliverableMessage.h"
@@ -117,7 +117,7 @@ void FailoverExchange::sendUpdate(const
if (urls.empty()) return;
framing::Array array = vectorToUrlArray(urls);
const ProtocolVersion v;
- broker::Message message(makeMessage(Buffer(), typeName));
+ broker::Message message(makeMessage(std::string(), typeName, typeName));
MessageTransfer& transfer = MessageTransfer::get(message);
MessageProperties* props =
transfer.getFrames().getHeaders()->get<framing::MessageProperties>(true);
Modified: qpid/branches/linearstore/qpid/cpp/src/qpid/ha/HaBroker.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/ha/HaBroker.cpp?rev=1525101&r1=1525100&r2=1525101&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/ha/HaBroker.cpp (original)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/ha/HaBroker.cpp Fri Sep 20 18:59:30 2013
@@ -64,6 +64,7 @@ using boost::dynamic_pointer_cast;
HaBroker::HaBroker(broker::Broker& b, const Settings& s)
: systemId(b.getSystem()->getSystemId().data()),
settings(s),
+ userId(s.username+"@"+b.getOptions().realm),
broker(b),
observer(new ConnectionObserver(*this, systemId)),
role(new StandAlone),
@@ -75,14 +76,14 @@ HaBroker::HaBroker(broker::Broker& b, co
// otherwise there's a window for a client to connect before we get to
// initialize()
if (settings.cluster) {
- QPID_LOG(debug, "Broker startup, rejecting client connections.");
+ QPID_LOG(debug, "Backup starting, rejecting client connections.");
shared_ptr<broker::ConnectionObserver> excluder(new BackupConnectionExcluder);
observer->setObserver(excluder, "Backup: ");
broker.getConnectionObservers().add(observer);
broker.getExchanges().registerExchange(failoverExchange);
}
// QueueSnapshots are needed for standalone replication as well as cluster.
- broker.getConfigurationObservers().add(queueSnapshots);
+ broker.getBrokerObservers().add(queueSnapshots);
}
namespace {
@@ -94,7 +95,7 @@ bool isNone(const std::string& x) { retu
void HaBroker::initialize() {
if (settings.cluster) {
membership.setStatus(JOINING);
- QPID_LOG(notice, "Initializing HA broker: " << membership.getInfo());
+ QPID_LOG(notice, "Initializing HA broker: " << membership.getSelf());
}
// Set up the management object.
@@ -202,7 +203,7 @@ std::vector<Url> HaBroker::getKnownBroke
}
void HaBroker::shutdown(const std::string& message) {
- QPID_LOG(critical, message);
+ QPID_LOG(critical, "Shutting down: " << message);
broker.shutdown();
throw Exception(message);
}
@@ -213,7 +214,7 @@ BrokerStatus HaBroker::getStatus() const
void HaBroker::setAddress(const Address& a) {
QPID_LOG(info, role->getLogPrefix() << "Set self address to: " << a);
- membership.setAddress(a);
+ membership.setSelfAddress(a);
}
boost::shared_ptr<QueueReplicator> HaBroker::findQueueReplicator(const std::string& queueName) {
Modified: qpid/branches/linearstore/qpid/cpp/src/qpid/ha/HaBroker.h
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/ha/HaBroker.h?rev=1525101&r1=1525100&r2=1525101&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/ha/HaBroker.h (original)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/ha/HaBroker.h Fri Sep 20 18:59:30 2013
@@ -84,6 +84,7 @@ class HaBroker : public management::Mana
broker::Broker& getBroker() { return broker; }
const Settings& getSettings() const { return settings; }
+ boost::shared_ptr<Role> getRole() const {return role; }
/** Shut down the broker because of a critical error. */
void shutdown(const std::string& message);
@@ -91,7 +92,7 @@ class HaBroker : public management::Mana
BrokerStatus getStatus() const;
boost::shared_ptr<ConnectionObserver> getObserver() { return observer; }
- BrokerInfo getBrokerInfo() const { return membership.getInfo(); }
+ BrokerInfo getBrokerInfo() const { return membership.getSelf(); }
Membership& getMembership() { return membership; }
types::Uuid getSystemId() const { return systemId; }
@@ -101,6 +102,9 @@ class HaBroker : public management::Mana
boost::shared_ptr<QueueReplicator> findQueueReplicator(const std::string& queueName);
+ /** Authenticated user ID for queue create/delete */
+ std::string getUserId() const { return userId; }
+
private:
void setPublicUrl(const Url&);
void setBrokerUrl(const Url&);
@@ -111,6 +115,7 @@ class HaBroker : public management::Mana
// Immutable members
const types::Uuid systemId;
const Settings settings;
+ const std::string userId;
// Member variables protected by lock
mutable sys::Mutex lock;
Modified: qpid/branches/linearstore/qpid/cpp/src/qpid/ha/Membership.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/ha/Membership.cpp?rev=1525101&r1=1525100&r2=1525101&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/ha/Membership.cpp (original)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/ha/Membership.cpp Fri Sep 20 18:59:30 2013
@@ -107,6 +107,14 @@ BrokerInfo::Set Membership::otherBackups
return result;
}
+BrokerInfo::Set Membership::getBrokers() const {
+ Mutex::ScopedLock l(lock);
+ BrokerInfo::Set result;
+ transform(brokers.begin(), brokers.end(), inserter(result, result.begin()),
+ boost::bind(&BrokerInfo::Map::value_type::second, _1));
+ return result;
+}
+
bool Membership::get(const types::Uuid& id, BrokerInfo& result) const {
Mutex::ScopedLock l(lock);
BrokerInfo::Map::const_iterator i = brokers.find(id);
@@ -136,10 +144,9 @@ bool checkTransition(BrokerStatus from,
}
} // namespace
-
void Membership::update(Mutex::ScopedLock& l) {
QPID_LOG(info, "Membership: " << brokers);
- // Update managment and send update event.
+// Update managment and send update event.
BrokerStatus newStatus = getStatus(l);
Variant::List brokerList = asList(l);
if (mgmtObject) {
@@ -198,14 +205,14 @@ BrokerStatus Membership::getStatus(sys::
return i->second.getStatus();
}
-BrokerInfo Membership::getInfo() const {
+BrokerInfo Membership::getSelf() const {
Mutex::ScopedLock l(lock);
BrokerInfo::Map::const_iterator i = brokers.find(self);
assert(i != brokers.end());
return i->second;
}
-void Membership::setAddress(const Address& a) {
+void Membership::setSelfAddress(const Address& a) {
Mutex::ScopedLock l(lock);
brokers[self].setAddress(a);
update(l);
Modified: qpid/branches/linearstore/qpid/cpp/src/qpid/ha/Membership.h
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/ha/Membership.h?rev=1525101&r1=1525100&r2=1525101&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/ha/Membership.h (original)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/ha/Membership.h Fri Sep 20 18:59:30 2013
@@ -26,6 +26,7 @@
#include "types.h"
#include "qpid/log/Statement.h"
#include "qpid/sys/Mutex.h"
+#include "qpid/sys/Time.h"
#include "qpid/types/Variant.h"
#include <boost/function.hpp>
#include <set>
@@ -69,16 +70,19 @@ class Membership
/** Return IDs of all READY backups other than self */
BrokerInfo::Set otherBackups() const;
+ /** Return IDs of all brokers */
+ BrokerInfo::Set getBrokers() const;
+
void assign(const types::Variant::List&);
types::Variant::List asList() const;
bool get(const types::Uuid& id, BrokerInfo& result) const;
- types::Uuid getSelf() const { return self; }
- BrokerInfo getInfo() const;
+ BrokerInfo getSelf() const;
BrokerStatus getStatus() const;
void setStatus(BrokerStatus s);
- void setAddress(const Address&);
+
+ void setSelfAddress(const Address&);
private:
void update(sys::Mutex::ScopedLock&);
Modified: qpid/branches/linearstore/qpid/cpp/src/qpid/ha/Primary.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/ha/Primary.cpp?rev=1525101&r1=1525100&r2=1525101&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/ha/Primary.cpp (original)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/ha/Primary.cpp Fri Sep 20 18:59:30 2013
@@ -27,17 +27,19 @@
#include "RemoteBackup.h"
#include "ConnectionObserver.h"
#include "QueueReplicator.h"
+#include "PrimaryTxObserver.h"
#include "qpid/assert.h"
#include "qpid/broker/Broker.h"
-#include "qpid/broker/ConfigurationObserver.h"
+#include "qpid/broker/BrokerObserver.h"
#include "qpid/broker/Connection.h"
#include "qpid/broker/Queue.h"
#include "qpid/framing/FieldTable.h"
#include "qpid/framing/FieldValue.h"
-#include "qpid/framing/Uuid.h"
#include "qpid/log/Statement.h"
+#include "qpid/types/Uuid.h"
#include "qpid/sys/Timer.h"
#include <boost/bind.hpp>
+#include <boost/shared_ptr.hpp>
namespace qpid {
namespace ha {
@@ -59,15 +61,18 @@ class PrimaryConnectionObserver : public
Primary& primary;
};
-class PrimaryConfigurationObserver : public broker::ConfigurationObserver
+class PrimaryBrokerObserver : public broker::BrokerObserver
{
public:
- PrimaryConfigurationObserver(Primary& p) : primary(p) {}
+ PrimaryBrokerObserver(Primary& p) : primary(p) {}
void queueCreate(const Primary::QueuePtr& q) { primary.queueCreate(q); }
void queueDestroy(const Primary::QueuePtr& q) { primary.queueDestroy(q); }
void exchangeCreate(const Primary::ExchangePtr& q) { primary.exchangeCreate(q); }
void exchangeDestroy(const Primary::ExchangePtr& q) { primary.exchangeDestroy(q); }
- private:
+ void startTx(const shared_ptr<broker::TxBuffer>& tx) { primary.startTx(tx); }
+ void startDtx(const shared_ptr<broker::DtxBuffer>& dtx) { primary.startDtx(dtx); }
+
+ private:
Primary& primary;
};
@@ -82,8 +87,6 @@ class ExpectedBackupTimerTask : public s
} // namespace
-Primary* Primary::instance = 0;
-
Primary::Primary(HaBroker& hb, const BrokerInfo::Set& expect) :
haBroker(hb), membership(hb.getMembership()),
logPrefix("Primary: "), active(false),
@@ -92,13 +95,11 @@ Primary::Primary(HaBroker& hb, const Bro
hb.getMembership().setStatus(RECOVERING);
broker::QueueRegistry& queues = hb.getBroker().getQueues();
queues.eachQueue(boost::bind(&Primary::initializeQueue, this, _1));
- assert(instance == 0);
- instance = this; // Let queue replicators find us.
if (expect.empty()) {
QPID_LOG(notice, logPrefix << "Promoted to primary. No expected backups.");
}
else {
- // NOTE: RemoteBackups must be created before we set the ConfigurationObserver
+ // NOTE: RemoteBackups must be created before we set the BrokerObserver
// or ConnectionObserver so that there is no client activity while
// the QueueGuards are created.
QPID_LOG(notice, logPrefix << "Promoted to primary. Expected backups: " << expect);
@@ -113,8 +114,8 @@ Primary::Primary(HaBroker& hb, const Bro
timerTask = new ExpectedBackupTimerTask(*this, deadline);
hb.getBroker().getTimer().add(timerTask);
}
- configurationObserver.reset(new PrimaryConfigurationObserver(*this));
- haBroker.getBroker().getConfigurationObservers().add(configurationObserver);
+ brokerObserver.reset(new PrimaryBrokerObserver(*this));
+ haBroker.getBroker().getBrokerObservers().add(brokerObserver);
checkReady(); // Outside lock
// Allow client connections
@@ -124,7 +125,7 @@ Primary::Primary(HaBroker& hb, const Bro
Primary::~Primary() {
if (timerTask) timerTask->cancel();
- haBroker.getBroker().getConfigurationObservers().remove(configurationObserver);
+ haBroker.getBroker().getBrokerObservers().remove(brokerObserver);
haBroker.getObserver()->reset();
}
@@ -212,14 +213,42 @@ void Primary::readyReplica(const Replica
if (backup) checkReady(backup);
}
+void Primary::addReplica(ReplicatingSubscription& rs) {
+ // Note this is called before the ReplicatingSubscription has been activated
+ // on the queue.
+ sys::Mutex::ScopedLock l(lock);
+ replicas[make_pair(rs.getBrokerInfo().getSystemId(), rs.getQueue())] = &rs;
+}
+
+void Primary::skip(
+ const types::Uuid& backup,
+ const boost::shared_ptr<broker::Queue>& queue,
+ const ReplicationIdSet& ids)
+{
+ sys::Mutex::ScopedLock l(lock);
+ ReplicaMap::const_iterator i = replicas.find(make_pair(backup, queue));
+ if (i != replicas.end()) i->second->addSkip(ids);
+}
+
+void Primary::removeReplica(const ReplicatingSubscription& rs) {
+ sys::Mutex::ScopedLock l(lock);
+ replicas.erase(make_pair(rs.getBrokerInfo().getSystemId(), rs.getQueue()));
+
+ TxMap::const_iterator i = txMap.find(rs.getQueue()->getName());
+ if (i != txMap.end()) {
+ boost::shared_ptr<PrimaryTxObserver> tx = i->second.lock();
+ if (tx) tx->cancel(rs);
+ }
+}
+
// NOTE: Called with queue registry lock held.
void Primary::queueCreate(const QueuePtr& q) {
// Set replication argument.
ReplicateLevel level = replicationTest.useLevel(*q);
- QPID_LOG(debug, logPrefix << "Created queue " << q->getName()
- << " replication: " << printable(level));
q->addArgument(QPID_REPLICATE, printable(level).str());
if (level) {
+ QPID_LOG(debug, logPrefix << "Created queue " << q->getName()
+ << " replication: " << printable(level));
initializeQueue(q);
// Give each queue a unique id. Used by backups to avoid confusion of
// same-named queues.
@@ -235,24 +264,26 @@ void Primary::queueCreate(const QueuePtr
// NOTE: Called with queue registry lock held.
void Primary::queueDestroy(const QueuePtr& q) {
- QPID_LOG(debug, logPrefix << "Destroyed queue " << q->getName());
- {
- Mutex::ScopedLock l(lock);
- for (BackupMap::iterator i = backups.begin(); i != backups.end(); ++i)
- i->second->queueDestroy(q);
+ if (replicationTest.useLevel(*q)) {
+ QPID_LOG(debug, logPrefix << "Destroyed queue " << q->getName());
+ {
+ Mutex::ScopedLock l(lock);
+ for (BackupMap::iterator i = backups.begin(); i != backups.end(); ++i)
+ i->second->queueDestroy(q);
+ }
+ checkReady(); // Outside lock
}
- checkReady(); // Outside lock
}
// NOTE: Called with exchange registry lock held.
void Primary::exchangeCreate(const ExchangePtr& ex) {
ReplicateLevel level = replicationTest.useLevel(*ex);
- QPID_LOG(debug, logPrefix << "Created exchange " << ex->getName()
- << " replication: " << printable(level));
FieldTable args = ex->getArgs();
args.setString(QPID_REPLICATE, printable(level).str()); // Set replication arg.
if (level) {
- // Give each exchange a unique id to avoid confusion of same-named exchanges.
+ QPID_LOG(debug, logPrefix << "Created exchange " << ex->getName()
+ << " replication: " << printable(level));
+ // Give each exchange a unique id to avoid confusion of same-named exchanges.
args.set(QPID_HA_UUID, FieldTable::ValuePtr(new UuidValue(&Uuid(true)[0])));
}
ex->setArgs(args);
@@ -260,8 +291,10 @@ void Primary::exchangeCreate(const Excha
// NOTE: Called with exchange registry lock held.
void Primary::exchangeDestroy(const ExchangePtr& ex) {
- QPID_LOG(debug, logPrefix << "Destroyed exchange " << ex->getName());
- // Do nothing
+ if (replicationTest.useLevel(*ex)) {
+ QPID_LOG(debug, logPrefix << "Destroyed exchange " << ex->getName());
+ // Do nothing
+ }
}
// New backup connected
@@ -280,6 +313,7 @@ void Primary::backupDisconnect(shared_pt
backup->cancel();
expectedBackups.erase(backup);
backups.erase(id);
+ membership.remove(id);
}
@@ -365,4 +399,19 @@ void Primary::setCatchupQueues(const Rem
backup->startCatchup();
}
+shared_ptr<PrimaryTxObserver> Primary::makeTxObserver() {
+ shared_ptr<PrimaryTxObserver> observer(new PrimaryTxObserver(haBroker));
+ observer->initialize();
+ txMap[observer->getTxQueue()->getName()] = observer;
+ return observer;
+}
+
+void Primary::startTx(const boost::shared_ptr<broker::TxBuffer>& tx) {
+ tx->setObserver(makeTxObserver());
+}
+
+void Primary::startDtx(const boost::shared_ptr<broker::DtxBuffer>& dtx) {
+ dtx->setObserver(makeTxObserver());
+}
+
}} // namespace qpid::ha
Modified: qpid/branches/linearstore/qpid/cpp/src/qpid/ha/Primary.h
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/ha/Primary.h?rev=1525101&r1=1525100&r2=1525101&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/ha/Primary.h (original)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/ha/Primary.h Fri Sep 20 18:59:30 2013
@@ -23,12 +23,14 @@
*/
#include "types.h"
+#include "hash.h"
#include "BrokerInfo.h"
#include "ReplicationTest.h"
#include "Role.h"
#include "qpid/sys/Mutex.h"
#include "qpid/sys/unordered_map.h"
#include <boost/shared_ptr.hpp>
+#include <boost/weak_ptr.hpp>
#include <boost/intrusive_ptr.hpp>
#include <string>
@@ -38,7 +40,9 @@ namespace broker {
class Queue;
class Connection;
class ConnectionObserver;
-class ConfigurationObserver;
+class BrokerObserver;
+class TxBuffer;
+class DtxBuffer;
}
namespace sys {
@@ -51,6 +55,7 @@ class ReplicatingSubscription;
class RemoteBackup;
class QueueGuard;
class Membership;
+class PrimaryTxObserver;
/**
* State associated with a primary broker:
@@ -66,8 +71,6 @@ class Primary : public Role
typedef boost::shared_ptr<broker::Exchange> ExchangePtr;
typedef boost::shared_ptr<RemoteBackup> RemoteBackupPtr;
- static Primary* get() { return instance; }
-
Primary(HaBroker& hb, const BrokerInfo::Set& expectedBackups);
~Primary();
@@ -77,13 +80,21 @@ class Primary : public Role
void setBrokerUrl(const Url&) {}
void readyReplica(const ReplicatingSubscription&);
- void removeReplica(const std::string& q);
+ void addReplica(ReplicatingSubscription&);
+ void removeReplica(const ReplicatingSubscription&);
+
+ /** Skip replication of ids to queue on backup. */
+ void skip(const types::Uuid& backup,
+ const boost::shared_ptr<broker::Queue>& queue,
+ const ReplicationIdSet& ids);
- // Called via ConfigurationObserver
+ // Called via BrokerObserver
void queueCreate(const QueuePtr&);
void queueDestroy(const QueuePtr&);
void exchangeCreate(const ExchangePtr&);
void exchangeDestroy(const ExchangePtr&);
+ void startTx(const boost::shared_ptr<broker::TxBuffer>&);
+ void startDtx(const boost::shared_ptr<broker::DtxBuffer>&);
// Called via ConnectionObserver
void opened(broker::Connection& connection);
@@ -95,11 +106,18 @@ class Primary : public Role
void timeoutExpectedBackups();
private:
- typedef qpid::sys::unordered_map<
- types::Uuid, RemoteBackupPtr, types::Uuid::Hasher > BackupMap;
+ typedef sys::unordered_map<
+ types::Uuid, RemoteBackupPtr, Hasher<types::Uuid> > BackupMap;
typedef std::set<RemoteBackupPtr > BackupSet;
+ typedef std::pair<types::Uuid, boost::shared_ptr<broker::Queue> > UuidQueue;
+ typedef sys::unordered_map<UuidQueue, ReplicatingSubscription*,
+ Hasher<UuidQueue> > ReplicaMap;
+
+ // Map of PrimaryTxObservers by tx-queue name
+ typedef sys::unordered_map<std::string, boost::weak_ptr<PrimaryTxObserver> > TxMap;
+
RemoteBackupPtr backupConnect(const BrokerInfo&, broker::Connection&, sys::Mutex::ScopedLock&);
void backupDisconnect(RemoteBackupPtr, sys::Mutex::ScopedLock&);
@@ -107,8 +125,10 @@ class Primary : public Role
void checkReady();
void checkReady(RemoteBackupPtr);
void setCatchupQueues(const RemoteBackupPtr&, bool createGuards);
+ void deduplicate();
+ boost::shared_ptr<PrimaryTxObserver> makeTxObserver();
- sys::Mutex lock;
+ mutable sys::Mutex lock;
HaBroker& haBroker;
Membership& membership;
std::string logPrefix;
@@ -126,9 +146,10 @@ class Primary : public Role
*/
BackupMap backups;
boost::shared_ptr<broker::ConnectionObserver> connectionObserver;
- boost::shared_ptr<broker::ConfigurationObserver> configurationObserver;
+ boost::shared_ptr<broker::BrokerObserver> brokerObserver;
boost::intrusive_ptr<sys::TimerTask> timerTask;
- static Primary* instance;
+ ReplicaMap replicas;
+ TxMap txMap;
};
}} // namespace qpid::ha
Modified: qpid/branches/linearstore/qpid/cpp/src/qpid/ha/QueueGuard.h
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/ha/QueueGuard.h?rev=1525101&r1=1525100&r2=1525101&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/ha/QueueGuard.h (original)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/ha/QueueGuard.h Fri Sep 20 18:59:30 2013
@@ -89,7 +89,7 @@ class QueueGuard {
class QueueObserver;
typedef qpid::sys::unordered_map<ReplicationId,
boost::intrusive_ptr<broker::AsyncCompletion>,
- TrivialHasher<ReplicationId> > Delayed;
+ Hasher<ReplicationId> > Delayed;
bool complete(ReplicationId, sys::Mutex::ScopedLock &);
void complete(Delayed::iterator, sys::Mutex::ScopedLock &);
Modified: qpid/branches/linearstore/qpid/cpp/src/qpid/ha/QueueReplicator.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/ha/QueueReplicator.cpp?rev=1525101&r1=1525100&r2=1525101&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/ha/QueueReplicator.cpp (original)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/ha/QueueReplicator.cpp Fri Sep 20 18:59:30 2013
@@ -19,12 +19,13 @@
*
*/
-#include "makeMessage.h"
+#include "Event.h"
#include "HaBroker.h"
#include "QueueReplicator.h"
#include "QueueSnapshots.h"
#include "ReplicatingSubscription.h"
#include "Settings.h"
+#include "types.h"
#include "qpid/broker/Bridge.h"
#include "qpid/broker/Broker.h"
#include "qpid/broker/Link.h"
@@ -38,36 +39,31 @@
#include "qpid/Msg.h"
#include "qpid/assert.h"
#include <boost/shared_ptr.hpp>
+#include <boost/bind.hpp>
-namespace {
-const std::string QPID_REPLICATOR_("qpid.replicator-");
-const std::string TYPE_NAME("qpid.queue-replicator");
-const std::string QPID_HA("qpid.ha-");
-}
namespace qpid {
namespace ha {
using namespace broker;
using namespace framing;
using namespace std;
+using std::exception;
using sys::Mutex;
-const std::string QueueReplicator::DEQUEUE_EVENT_KEY(QPID_HA+"dequeue");
-const std::string QueueReplicator::ID_EVENT_KEY(QPID_HA+"id");
const std::string QueueReplicator::QPID_SYNC_FREQUENCY("qpid.sync_frequency");
-std::string QueueReplicator::replicatorName(const std::string& queueName) {
- return QPID_REPLICATOR_ + queueName;
+namespace {
+const string QPID_HA(QPID_HA_PREFIX);
+const std::string TYPE_NAME(QPID_HA+"queue-replicator");
}
-bool QueueReplicator::isReplicatorName(const std::string& name) {
- return name.compare(0, QPID_REPLICATOR_.size(), QPID_REPLICATOR_) == 0;
+
+std::string QueueReplicator::replicatorName(const std::string& queueName) {
+ return QUEUE_REPLICATOR_PREFIX + queueName;
}
-bool QueueReplicator::isEventKey(const std::string key) {
- const std::string& prefix = QPID_HA;
- bool ret = key.size() > prefix.size() && key.compare(0, prefix.size(), prefix) == 0;
- return ret;
+bool QueueReplicator::isReplicatorName(const std::string& name) {
+ return startsWith(name, QUEUE_REPLICATOR_PREFIX);
}
class QueueReplicator::ErrorListener : public SessionHandler::ErrorListener {
@@ -109,12 +105,15 @@ QueueReplicator::QueueReplicator(HaBroke
boost::shared_ptr<Link> l)
: Exchange(replicatorName(q->getName()), 0, q->getBroker()),
haBroker(hb),
+ brokerInfo(hb.getBrokerInfo()),
+ link(l),
+ queue(q),
+ sessionHandler(0),
logPrefix("Backup of "+q->getName()+": "),
- queue(q), link(l), brokerInfo(hb.getBrokerInfo()), subscribed(false),
- settings(hb.getSettings()), destroyed(false),
+ subscribed(false),
+ settings(hb.getSettings()),
nextId(0), maxId(0)
{
- QPID_LOG(debug, logPrefix << "Created");
args.setString(QPID_REPLICATE, printable(NONE).str());
Uuid uuid(true);
bridgeName = replicatorName(q->getName()) + std::string(".") + uuid.str();
@@ -122,12 +121,18 @@ QueueReplicator::QueueReplicator(HaBroke
args.setString(QPID_REPLICATE, printable(NONE).str());
setArgs(args);
if (q->isAutoDelete()) q->markInUse();
+
+ dispatch[DequeueEvent::KEY] =
+ boost::bind(&QueueReplicator::dequeueEvent, this, _1, _2);
+ dispatch[IdEvent::KEY] =
+ boost::bind(&QueueReplicator::idEvent, this, _1, _2);
}
// This must be called immediately after the constructor.
// It has to be separate so we can call shared_from_this().
void QueueReplicator::activate() {
Mutex::ScopedLock l(lock);
+ QPID_LOG(debug, logPrefix << "Created");
if (!queue) return; // Already destroyed
// Enable callback to route()
@@ -163,6 +168,11 @@ void QueueReplicator::activate() {
boost::shared_ptr<QueueObserver>(new QueueObserver(shared_from_this())));
}
+void QueueReplicator::disconnect() {
+ Mutex::ScopedLock l(lock);
+ sessionHandler = 0;
+}
+
QueueReplicator::~QueueReplicator() {}
// Called from Queue::destroyed()
@@ -170,25 +180,24 @@ void QueueReplicator::destroy() {
boost::shared_ptr<Bridge> bridge2; // To call outside of lock
{
Mutex::ScopedLock l(lock);
- if (destroyed) return;
- destroyed = true;
+ if (!queue) return; // Already destroyed
QPID_LOG(debug, logPrefix << "Destroyed");
+ bridge2 = bridge; // call close outside the lock.
// Need to drop shared pointers to avoid pointer cycles keeping this in memory.
queue.reset();
- link.reset();
bridge.reset();
getBroker()->getExchanges().destroy(getName());
- bridge2 = bridge;
}
if (bridge2) bridge2->close(); // Outside of lock, avoid deadlock.
}
// Called in a broker connection thread when the bridge is created.
// Note: called with the Link lock held.
-void QueueReplicator::initializeBridge(Bridge& bridge, SessionHandler& sessionHandler) {
+void QueueReplicator::initializeBridge(Bridge& bridge, SessionHandler& sessionHandler_) {
Mutex::ScopedLock l(lock);
- if (destroyed) return; // Already destroyed
- AMQP_ServerProxy peer(sessionHandler.out);
+ if (!queue) return; // Already destroyed
+ sessionHandler = &sessionHandler_;
+ AMQP_ServerProxy peer(sessionHandler->out);
const qmf::org::apache::qpid::broker::ArgsLinkBridge& args(bridge.getArgs());
FieldTable arguments;
arguments.setInt(ReplicatingSubscription::QPID_REPLICATING_SUBSCRIPTION, 1);
@@ -224,44 +233,57 @@ template <class T> T decodeContent(Messa
}
}
-void QueueReplicator::dequeue(const ReplicationIdSet& dequeues, Mutex::ScopedLock&) {
- QPID_LOG(trace, logPrefix << "Dequeue " << dequeues);
+void QueueReplicator::dequeueEvent(const string& data, Mutex::ScopedLock&) {
+ DequeueEvent e;
+ decodeStr(data, e);
+ QPID_LOG(trace, logPrefix << "Dequeue " << e.ids);
//TODO: should be able to optimise the following
- for (ReplicationIdSet::iterator i = dequeues.begin(); i != dequeues.end(); ++i) {
+ for (ReplicationIdSet::iterator i = e.ids.begin(); i != e.ids.end(); ++i) {
PositionMap::iterator j = positions.find(*i);
if (j != positions.end()) queue->dequeueMessageAt(j->second);
}
}
// Called in connection thread of the queues bridge to primary.
-void QueueReplicator::route(Deliverable& msg)
+void QueueReplicator::route(Deliverable& deliverable)
{
try {
Mutex::ScopedLock l(lock);
- if (destroyed) return;
- const std::string& key = msg.getMessage().getRoutingKey();
- if (!isEventKey(key)) { // Replicated message
+ if (!queue) return; // Already destroyed
+ broker::Message& message(deliverable.getMessage());
+ string key(message.getRoutingKey());
+ if (!isEventKey(message.getRoutingKey())) {
ReplicationId id = nextId++;
maxId = std::max(maxId, id);
- msg.getMessage().setReplicationId(id);
- msg.deliverTo(queue);
+ message.setReplicationId(id);
+ deliver(message);
QueuePosition position = queue->getPosition();
positions[id] = position;
QPID_LOG(trace, logPrefix << "Enqueued " << LogMessageId(*queue,position,id));
}
- else if (key == DEQUEUE_EVENT_KEY) {
- dequeue(decodeContent<ReplicationIdSet>(msg.getMessage()), l);
- }
- else if (key == ID_EVENT_KEY) {
- nextId = decodeContent<ReplicationId>(msg.getMessage());
+ else {
+ DispatchMap::iterator i = dispatch.find(key);
+ if (i == dispatch.end()) {
+ QPID_LOG(info, logPrefix << "Ignoring unknown event: " << key);
+ }
+ else {
+ (i->second)(message.getContent(), l);
+ }
}
- // Ignore unknown event keys, may be introduced in later versions.
}
catch (const std::exception& e) {
haBroker.shutdown(QPID_MSG(logPrefix << "Replication failed: " << e.what()));
}
}
+void QueueReplicator::deliver(const broker::Message& m) {
+ queue->deliver(m);
+}
+
+void QueueReplicator::idEvent(const string& data, Mutex::ScopedLock&) {
+ nextId = decodeStr<IdEvent>(data).id;
+}
+
ReplicationId QueueReplicator::getMaxId() {
Mutex::ScopedLock l(lock);
return maxId;
@@ -273,4 +295,5 @@ bool QueueReplicator::unbind(boost::shar
bool QueueReplicator::isBound(boost::shared_ptr<Queue>, const std::string* const, const FieldTable* const) { return false; }
std::string QueueReplicator::getType() const { return TYPE_NAME; }
+
}} // namespace qpid::broker
Modified: qpid/branches/linearstore/qpid/cpp/src/qpid/ha/QueueReplicator.h
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/ha/QueueReplicator.h?rev=1525101&r1=1525100&r2=1525101&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/ha/QueueReplicator.h (original)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/ha/QueueReplicator.h Fri Sep 20 18:59:30 2013
@@ -26,6 +26,7 @@
#include "hash.h"
#include "qpid/broker/Exchange.h"
#include <boost/enable_shared_from_this.hpp>
+#include <boost/function.hpp>
#include <iosfwd>
namespace qpid {
@@ -56,30 +57,24 @@ class QueueReplicator : public broker::E
public boost::enable_shared_from_this<QueueReplicator>
{
public:
- static const std::string DEQUEUE_EVENT_KEY;
- static const std::string ID_EVENT_KEY;
static const std::string QPID_SYNC_FREQUENCY;
+ static const std::string REPLICATOR_PREFIX;
static std::string replicatorName(const std::string& queueName);
static bool isReplicatorName(const std::string&);
- /** Test if a string is an event key */
- static bool isEventKey(const std::string key);
-
QueueReplicator(HaBroker&,
boost::shared_ptr<broker::Queue> q,
boost::shared_ptr<broker::Link> l);
~QueueReplicator();
- void activate(); // Must be called immediately after constructor.
+ void activate(); // Must be called immediately after constructor.
+ void disconnect(); // Called when we are disconnected from the primary.
std::string getType() const;
- bool bind(boost::shared_ptr<broker::Queue
- >, const std::string&, const framing::FieldTable*);
- bool unbind(boost::shared_ptr<broker::Queue>, const std::string&, const framing::FieldTable*);
+
void route(broker::Deliverable&);
- bool isBound(boost::shared_ptr<broker::Queue>, const std::string* const, const framing::FieldTable* const);
// Set if the queue has ever been subscribed to, used for auto-delete cleanup.
void setSubscribed() { subscribed = true; }
@@ -89,27 +84,44 @@ class QueueReplicator : public broker::E
ReplicationId getMaxId();
- private:
- typedef qpid::sys::unordered_map<ReplicationId, QueuePosition, TrivialHasher<int32_t> > PositionMap;
+ // No-op unused Exchange virtual functions.
+ bool bind(boost::shared_ptr<broker::Queue>, const std::string&, const framing::FieldTable*);
+ bool unbind(boost::shared_ptr<broker::Queue>, const std::string&, const framing::FieldTable*);
+ bool isBound(boost::shared_ptr<broker::Queue>, const std::string* const, const framing::FieldTable* const);
+
+ protected:
+ typedef boost::function<void(const std::string&, sys::Mutex::ScopedLock&)> DispatchFn;
+ typedef qpid::sys::unordered_map<std::string, DispatchFn> DispatchMap;
+
+ virtual void deliver(const broker::Message&);
+ virtual void destroy(); // Called when the queue is destroyed.
+ sys::Mutex lock;
+ HaBroker& haBroker;
+ const BrokerInfo brokerInfo;
+ DispatchMap dispatch;
+ boost::shared_ptr<broker::Link> link;
+ boost::shared_ptr<broker::Bridge> bridge;
+ boost::shared_ptr<broker::Queue> queue;
+ broker::SessionHandler* sessionHandler;
+
+ private:
+ typedef qpid::sys::unordered_map<
+ ReplicationId, QueuePosition, Hasher<ReplicationId> > PositionMap;
class ErrorListener;
class QueueObserver;
void initializeBridge(broker::Bridge& bridge, broker::SessionHandler& sessionHandler);
- void destroy(); // Called when the queue is destroyed.
- void dequeue(const ReplicationIdSet&, sys::Mutex::ScopedLock&);
- HaBroker& haBroker;
+ // Dispatch functions
+ void dequeueEvent(const std::string& data, sys::Mutex::ScopedLock&);
+ void idEvent(const std::string& data, sys::Mutex::ScopedLock&);
+
std::string logPrefix;
std::string bridgeName;
- sys::Mutex lock;
- boost::shared_ptr<broker::Queue> queue;
- boost::shared_ptr<broker::Link> link;
- boost::shared_ptr<broker::Bridge> bridge;
- BrokerInfo brokerInfo;
+
bool subscribed;
const Settings& settings;
- bool destroyed;
PositionMap positions;
ReplicationIdSet idSet; // Set of replicationIds on the queue.
ReplicationId nextId; // ID for next message to arrive.
Modified: qpid/branches/linearstore/qpid/cpp/src/qpid/ha/QueueSnapshots.h
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/ha/QueueSnapshots.h?rev=1525101&r1=1525100&r2=1525101&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/ha/QueueSnapshots.h (original)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/ha/QueueSnapshots.h Fri Sep 20 18:59:30 2013
@@ -27,7 +27,7 @@
#include "hash.h"
#include "qpid/assert.h"
-#include "qpid/broker/ConfigurationObserver.h"
+#include "qpid/broker/BrokerObserver.h"
#include "qpid/broker/Queue.h"
#include "qpid/sys/Mutex.h"
@@ -37,10 +37,10 @@ namespace qpid {
namespace ha {
/**
- * ConfigurationObserver that maintains a map of the QueueSnapshot for each queue.
+ * BrokerObserver that maintains a map of the QueueSnapshot for each queue.
* THREAD SAFE.
*/
-class QueueSnapshots : public broker::ConfigurationObserver
+class QueueSnapshots : public broker::BrokerObserver
{
public:
boost::shared_ptr<QueueSnapshot> get(const boost::shared_ptr<broker::Queue>& q) const {
@@ -49,7 +49,7 @@ class QueueSnapshots : public broker::Co
return i != snapshots.end() ? i->second : boost::shared_ptr<QueueSnapshot>();
}
- // ConfigurationObserver overrides.
+ // BrokerObserver overrides.
void queueCreate(const boost::shared_ptr<broker::Queue>& q) {
sys::Mutex::ScopedLock l(lock);
boost::shared_ptr<QueueSnapshot> observer(new QueueSnapshot);
@@ -69,7 +69,7 @@ class QueueSnapshots : public broker::Co
private:
typedef qpid::sys::unordered_map<boost::shared_ptr<broker::Queue>,
boost::shared_ptr<QueueSnapshot>,
- SharedPtrHasher<broker::Queue>
+ Hasher<boost::shared_ptr<broker::Queue> >
> SnapshotMap;
SnapshotMap snapshots;
mutable sys::Mutex lock;
Modified: qpid/branches/linearstore/qpid/cpp/src/qpid/ha/RemoteBackup.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/ha/RemoteBackup.cpp?rev=1525101&r1=1525100&r2=1525101&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/ha/RemoteBackup.cpp (original)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/ha/RemoteBackup.cpp Fri Sep 20 18:59:30 2013
@@ -20,6 +20,7 @@
*/
#include "RemoteBackup.h"
#include "QueueGuard.h"
+#include "TxReplicator.h"
#include "qpid/broker/Broker.h"
#include "qpid/broker/Connection.h"
#include "qpid/broker/Queue.h"
@@ -108,13 +109,13 @@ void RemoteBackup::ready(const QueuePtr&
QPID_LOG(debug, logPrefix << "Caught up on queue: " << q->getName() );
}
-// Called via ConfigurationObserver::queueCreate and from catchupQueue
+// Called via BrokerObserver::queueCreate and from catchupQueue
void RemoteBackup::queueCreate(const QueuePtr& q) {
if (replicationTest.getLevel(*q) == ALL)
guards[q].reset(new QueueGuard(*q, brokerInfo));
}
-// Called via ConfigurationObserver
+// Called via BrokerObserver
void RemoteBackup::queueDestroy(const QueuePtr& q) {
catchupQueues.erase(q);
GuardMap::iterator i = guards.find(q);
Modified: qpid/branches/linearstore/qpid/cpp/src/qpid/ha/RemoteBackup.h
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/ha/RemoteBackup.h?rev=1525101&r1=1525100&r2=1525101&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/ha/RemoteBackup.h (original)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/ha/RemoteBackup.h Fri Sep 20 18:59:30 2013
@@ -71,10 +71,10 @@ class RemoteBackup
*/
void ready(const QueuePtr& queue);
- /** Called via ConfigurationObserver */
+ /** Called via BrokerObserver */
void queueCreate(const QueuePtr&);
- /** Called via ConfigurationObserver. Note: may set isReady() */
+ /** Called via BrokerObserver. Note: may set isReady() */
void queueDestroy(const QueuePtr&);
/**@return true when all catch-up queues for this backup are ready. */
@@ -96,8 +96,10 @@ class RemoteBackup
void startCatchup() { started = true; }
private:
- typedef qpid::sys::unordered_map<QueuePtr, GuardPtr,
- SharedPtrHasher<broker::Queue> > GuardMap;
+ typedef qpid::sys::unordered_map<
+ QueuePtr, GuardPtr, Hasher<boost::shared_ptr<broker::Queue> >
+ > GuardMap;
+
typedef std::set<QueuePtr> QueueSet;
std::string logPrefix;
Modified: qpid/branches/linearstore/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp?rev=1525101&r1=1525100&r2=1525101&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp (original)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp Fri Sep 20 18:59:30 2013
@@ -19,7 +19,7 @@
*
*/
-#include "makeMessage.h"
+#include "Event.h"
#include "IdSetter.h"
#include "QueueGuard.h"
#include "QueueReplicator.h"
@@ -111,7 +111,8 @@ ReplicatingSubscription::ReplicatingSubs
) : ConsumerImpl(parent, name, queue, ack, REPLICATOR, exclusive, tag,
resumeId, resumeTtl, arguments),
position(0), ready(false), cancelled(false),
- haBroker(hb)
+ haBroker(hb),
+ primary(boost::dynamic_pointer_cast<Primary>(haBroker.getRole()))
{
try {
FieldTable ft;
@@ -137,7 +138,7 @@ ReplicatingSubscription::ReplicatingSubs
}
// If there's already a guard (we are in failover) use it, else create one.
- if (Primary::get()) guard = Primary::get()->getGuard(queue, info);
+ if (primary) guard = primary->getGuard(queue, info);
if (!guard) guard.reset(new QueueGuard(*queue, info));
// NOTE: Once the observer is attached we can have concurrent
@@ -148,20 +149,19 @@ ReplicatingSubscription::ReplicatingSubs
// between the snapshot and attaching the observer.
observer.reset(new QueueObserver(*this));
queue->addObserver(observer);
- ReplicationIdSet primary = haBroker.getQueueSnapshots()->get(queue)->snapshot();
+ ReplicationIdSet primaryIds = haBroker.getQueueSnapshots()->get(queue)->snapshot();
std::string backupStr = arguments.getAsString(ReplicatingSubscription::QPID_ID_SET);
- ReplicationIdSet backup;
- if (!backupStr.empty()) backup = decodeStr<ReplicationIdSet>(backupStr);
+ ReplicationIdSet backupIds;
+ if (!backupStr.empty()) backupIds = decodeStr<ReplicationIdSet>(backupStr);
// Initial dequeues are messages on backup but not on primary.
- ReplicationIdSet initDequeues = backup - primary;
+ ReplicationIdSet initDequeues = backupIds - primaryIds;
QueuePosition front,back;
queue->getRange(front, back, broker::REPLICATOR); // Outside lock, getRange locks queue
{
sys::Mutex::ScopedLock l(lock); // Concurrent calls to dequeued()
dequeues += initDequeues; // Messages on backup that are not on primary.
- skip = backup - initDequeues; // Messages already on the backup.
-
+ skip = backupIds - initDequeues; // Messages already on the backup.
// Queue front is moving but we know this subscriptions will start at a
// position >= front so if front is safe then position must be.
position = front;
@@ -189,6 +189,7 @@ ReplicatingSubscription::~ReplicatingSub
//
void ReplicatingSubscription::initialize() {
try {
+ if (primary) primary->addReplica(*this);
Mutex::ScopedLock l(lock); // Note dequeued() can be called concurrently.
// Send initial dequeues to the backup.
// There must be a shared_ptr(this) when sending.
@@ -216,9 +217,8 @@ bool ReplicatingSubscription::deliver(
try {
bool result = false;
if (skip.contains(id)) {
+ QPID_LOG(trace, logPrefix << "Skip " << LogMessageId(*getQueue(), m));
skip -= id;
- QPID_LOG(trace, logPrefix << "On backup, skip " <<
- LogMessageId(*getQueue(), m));
guard->complete(id); // This will never be acknowledged.
notify();
result = true;
@@ -238,16 +238,13 @@ bool ReplicatingSubscription::deliver(
}
}
-/**
- *@param position: must be <= last position seen by subscription.
- */
void ReplicatingSubscription::checkReady(sys::Mutex::ScopedLock& l) {
if (!ready && isGuarded(l) && unready.empty()) {
ready = true;
sys::Mutex::ScopedUnlock u(lock);
// Notify Primary that a subscription is ready.
QPID_LOG(debug, logPrefix << "Caught up");
- if (Primary::get()) Primary::get()->readyReplica(*this);
+ if (primary) primary->readyReplica(*this);
}
}
@@ -260,6 +257,7 @@ void ReplicatingSubscription::cancel()
cancelled = true;
}
QPID_LOG(debug, logPrefix << "Cancelled");
+ if (primary) primary->removeReplica(*this);
getQueue()->removeObserver(observer);
guard->cancel();
ConsumerImpl::cancel();
@@ -285,9 +283,7 @@ void ReplicatingSubscription::sendDequeu
{
if (dequeues.empty()) return;
QPID_LOG(trace, logPrefix << "Sending dequeues " << dequeues);
- string buffer = encodeStr(dequeues);
- dequeues.clear();
- sendEvent(QueueReplicator::DEQUEUE_EVENT_KEY, buffer, l);
+ sendEvent(DequeueEvent(dequeues), l);
}
// Called after the message has been removed
@@ -307,23 +303,16 @@ void ReplicatingSubscription::dequeued(R
// Called with lock held. Called in subscription's connection thread.
void ReplicatingSubscription::sendIdEvent(ReplicationId pos, Mutex::ScopedLock& l)
{
- sendEvent(QueueReplicator::ID_EVENT_KEY, encodeStr(pos), l);
+ sendEvent(IdEvent(pos), l);
}
-void ReplicatingSubscription::sendEvent(const std::string& key,
- const std::string& buffer,
- Mutex::ScopedLock&)
+void ReplicatingSubscription::sendEvent(const Event& event, Mutex::ScopedLock&)
{
Mutex::ScopedUnlock u(lock);
- broker::Message message = makeMessage(buffer);
- MessageTransfer& transfer = MessageTransfer::get(message);
- DeliveryProperties* props =
- transfer.getFrames().getHeaders()->get<DeliveryProperties>(true);
- props->setRoutingKey(key);
// Send the event directly to the base consumer implementation. The dummy
// consumer prevents acknowledgements being handled, which is what we want
// for events
- ConsumerImpl::deliver(QueueCursor(), message, boost::shared_ptr<Consumer>());
+ ConsumerImpl::deliver(QueueCursor(), event.message(), boost::shared_ptr<Consumer>());
}
// Called in subscription's connection thread.
@@ -342,4 +331,9 @@ bool ReplicatingSubscription::doDispatch
}
}
+void ReplicatingSubscription::addSkip(const ReplicationIdSet& ids) {
+ Mutex::ScopedLock l(lock);
+ skip += ids;
+}
+
}} // namespace qpid::ha
Modified: qpid/branches/linearstore/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h?rev=1525101&r1=1525100&r2=1525101&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h (original)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h Fri Sep 20 18:59:30 2013
@@ -25,7 +25,6 @@
#include "BrokerInfo.h"
#include "qpid/broker/SemanticState.h"
#include "qpid/broker/ConsumerFactory.h"
-#include "qpid/types/Uuid.h"
#include <iosfwd>
namespace qpid {
@@ -44,6 +43,8 @@ class Buffer;
namespace ha {
class QueueGuard;
class HaBroker;
+class Event;
+class Primary;
/**
* A susbcription that replicates to a remote backup.
@@ -118,6 +119,9 @@ class ReplicatingSubscription : public b
BrokerInfo getBrokerInfo() const { return info; }
+ /** Skip replicating enqueue of of ids. */
+ void addSkip(const ReplicationIdSet& ids);
+
protected:
bool doDispatch();
@@ -128,7 +132,7 @@ class ReplicatingSubscription : public b
std::string logPrefix;
QueuePosition position;
ReplicationIdSet dequeues; // Dequeues to be sent in next dequeue event.
- ReplicationIdSet skip; // Messages already on backup will be skipped.
+ ReplicationIdSet skip; // Skip enqueues: messages already on backup and tx enqueues.
ReplicationIdSet unready; // Unguarded, replicated and un-acknowledged.
bool ready;
bool cancelled;
@@ -136,12 +140,13 @@ class ReplicatingSubscription : public b
boost::shared_ptr<QueueGuard> guard;
HaBroker& haBroker;
boost::shared_ptr<QueueObserver> observer;
+ boost::shared_ptr<Primary> primary;
bool isGuarded(sys::Mutex::ScopedLock&);
void dequeued(ReplicationId);
void sendDequeueEvent(sys::Mutex::ScopedLock&);
void sendIdEvent(ReplicationId, sys::Mutex::ScopedLock&);
- void sendEvent(const std::string& key, const std::string& data, sys::Mutex::ScopedLock&);
+ void sendEvent(const Event&, sys::Mutex::ScopedLock&);
void checkReady(sys::Mutex::ScopedLock&);
friend class Factory;
};
Modified: qpid/branches/linearstore/qpid/cpp/src/qpid/ha/ReplicationTest.h
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/ha/ReplicationTest.h?rev=1525101&r1=1525100&r2=1525101&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/ha/ReplicationTest.h (original)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/ha/ReplicationTest.h Fri Sep 20 18:59:30 2013
@@ -41,6 +41,13 @@ namespace ha {
/**
* Test whether something is replicated, taking into account the
* default replication level.
+ *
+ * The primary uses a ReplicationTest with default based on configuration
+ * settings, and marks objects to be replicated with an explict replication
+ * argument.
+ *
+ * The backup uses a default of NONE, so it always accepts what the primary has
+ * marked on the object.
*/
class ReplicationTest
{
Modified: qpid/branches/linearstore/qpid/cpp/src/qpid/ha/StatusCheck.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/ha/StatusCheck.cpp?rev=1525101&r1=1525100&r2=1525101&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/ha/StatusCheck.cpp (original)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/ha/StatusCheck.cpp Fri Sep 20 18:59:30 2013
@@ -20,6 +20,8 @@
*/
#include "StatusCheck.h"
#include "ConnectionObserver.h"
+#include "HaBroker.h"
+#include "qpid/broker/Broker.h"
#include "qpid/log/Statement.h"
#include "qpid/messaging/Address.h"
#include "qpid/messaging/Connection.h"
@@ -41,27 +43,32 @@ const string HA_BROKER = "org.apache.qpi
class StatusCheckThread : public sys::Runnable {
public:
- StatusCheckThread(StatusCheck& sc, const qpid::Address& addr, const BrokerInfo& self)
- : url(addr), statusCheck(sc), brokerInfo(self) {}
+ StatusCheckThread(StatusCheck& sc, const qpid::Address& addr)
+ : url(addr), statusCheck(sc) {}
void run();
private:
Url url;
StatusCheck& statusCheck;
- BrokerInfo brokerInfo;
};
void StatusCheckThread::run() {
- QPID_LOG(debug, statusCheck.logPrefix << "Checking status of " << url);
+ string logPrefix("Status check " + url.str() + ": ");
Connection c;
try {
+ // Check for self connections
Variant::Map options, clientProperties;
- clientProperties = brokerInfo.asMap(); // Detect self connections.
clientProperties[ConnectionObserver::ADMIN_TAG] = 1; // Allow connection to backups.
clientProperties[ConnectionObserver::ADDRESS_TAG] = url.str();
- clientProperties[ConnectionObserver::BACKUP_TAG] = brokerInfo.asMap();
+ clientProperties[ConnectionObserver::BACKUP_TAG] = statusCheck.haBroker.getBrokerInfo().asMap();
+ // Set connection options
+ Settings settings(statusCheck.haBroker.getSettings());
+ if (settings.username.size()) options["username"] = settings.username;
+ if (settings.password.size()) options["password"] = settings.password;
+ if (settings.mechanism.size()) options["sasl_mechanisms"] = settings.mechanism;
options["client-properties"] = clientProperties;
- options["heartbeat"] = statusCheck.linkHeartbeatInterval/sys::TIME_SEC;
+ sys::Duration heartbeat(statusCheck.haBroker.getBroker().getOptions().linkHeartbeatInterval);
+ options["heartbeat"] = heartbeat/sys::TIME_SEC;
c = Connection(url.str(), options);
c.open();
@@ -81,7 +88,7 @@ void StatusCheckThread::run() {
content["_object_id"] = oid;
encode(content, request);
s.send(request);
- messaging::Duration timeout(statusCheck.linkHeartbeatInterval/sys::TIME_MSEC);
+ messaging::Duration timeout(heartbeat/sys::TIME_MSEC);
Message response = r.fetch(timeout);
session.acknowledge();
Variant::List contentIn;
@@ -89,29 +96,24 @@ void StatusCheckThread::run() {
if (contentIn.size() == 1) {
Variant::Map details = contentIn.front().asMap()["_values"].asMap();
string status = details["status"].getString();
+ QPID_LOG(debug, logPrefix << status);
if (status != "joining") {
statusCheck.setPromote(false);
- QPID_LOG(info, statusCheck.logPrefix << "Status of " << url << " is "
- << status << ", this broker will refuse promotion.");
+ QPID_LOG(info, logPrefix << "Joining established cluster");
}
- QPID_LOG(debug, statusCheck.logPrefix << "Status of " << url << ": " << status);
}
+ else
+ QPID_LOG(error, logPrefix << "Invalid response " << response.getContent())
} catch(const exception& error) {
- QPID_LOG(info, statusCheck.logPrefix << "Checking status of " << url << ": " << error.what());
- }
- try { c.close(); }
- catch(const exception&) {
- QPID_LOG(warning, statusCheck.logPrefix << "Error closing status check connection to " << url);
- }
- try { c.close(); }
- catch(const exception&) {
- QPID_LOG(warning, "Error closing status check connection to " << url);
+ // Its not an error to fail to connect to self.
+ if (statusCheck.haBroker.getBrokerInfo().getAddress() != url[0])
+ QPID_LOG(warning, logPrefix << error.what());
}
+ try { c.close(); } catch(...) {}
delete this;
}
-StatusCheck::StatusCheck(const string& lp, sys::Duration lh, const BrokerInfo& self)
- : logPrefix(lp), promote(true), linkHeartbeatInterval(lh), brokerInfo(self)
+StatusCheck::StatusCheck(HaBroker& hb) : promote(true), haBroker(hb)
{}
StatusCheck::~StatusCheck() {
@@ -122,7 +124,7 @@ StatusCheck::~StatusCheck() {
void StatusCheck::setUrl(const Url& url) {
Mutex::ScopedLock l(lock);
for (size_t i = 0; i < url.size(); ++i)
- threads.push_back(Thread(new StatusCheckThread(*this, url[i], brokerInfo)));
+ threads.push_back(Thread(new StatusCheckThread(*this, url[i])));
}
bool StatusCheck::canPromote() {
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org