You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by sh...@apache.org on 2009/03/11 00:11:10 UTC
svn commit: r752300 [2/12] - in /qpid/branches/qpid-1673/qpid: cpp/
cpp/examples/ cpp/examples/direct/ cpp/examples/failover/
cpp/examples/fanout/ cpp/examples/pub-sub/ cpp/examples/qmf-console/
cpp/examples/request-response/ cpp/examples/tradedemo/ cp...
Modified: qpid/branches/qpid-1673/qpid/cpp/src/qpid/broker/Exchange.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-1673/qpid/cpp/src/qpid/broker/Exchange.cpp?rev=752300&r1=752299&r2=752300&view=diff
==============================================================================
--- qpid/branches/qpid-1673/qpid/cpp/src/qpid/broker/Exchange.cpp (original)
+++ qpid/branches/qpid-1673/qpid/cpp/src/qpid/broker/Exchange.cpp Tue Mar 10 23:10:57 2009
@@ -102,8 +102,8 @@
Exchange::Exchange(const string& _name, bool _durable, const qpid::framing::FieldTable& _args,
Manageable* parent)
- : name(_name), durable(_durable), args(_args), alternateUsers(0), persistenceId(0),
- sequence(false), sequenceNo(0), ive(false), mgmtExchange(0)
+ : name(_name), durable(_durable), alternateUsers(0), persistenceId(0),
+ args(_args), sequence(false), sequenceNo(0), ive(false), mgmtExchange(0)
{
if (parent != 0)
{
@@ -275,3 +275,7 @@
{
return b->queue == queue;
}
+
+void Exchange::setProperties(const boost::intrusive_ptr<Message>& msg) {
+ msg->getProperties<DeliveryProperties>()->setExchange(getName());
+}
Modified: qpid/branches/qpid-1673/qpid/cpp/src/qpid/broker/Exchange.h
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-1673/qpid/cpp/src/qpid/broker/Exchange.h?rev=752300&r1=752299&r2=752300&view=diff
==============================================================================
--- qpid/branches/qpid-1673/qpid/cpp/src/qpid/broker/Exchange.h (original)
+++ qpid/branches/qpid-1673/qpid/cpp/src/qpid/broker/Exchange.h Tue Mar 10 23:10:57 2009
@@ -59,12 +59,12 @@
private:
const std::string name;
const bool durable;
- mutable qpid::framing::FieldTable args;
boost::shared_ptr<Exchange> alternate;
uint32_t alternateUsers;
mutable uint64_t persistenceId;
protected:
+ mutable qpid::framing::FieldTable args;
bool sequence;
mutable qpid::sys::Mutex sequenceLock;
int64_t sequenceNo;
@@ -140,13 +140,14 @@
virtual bool bind(Queue::shared_ptr queue, const std::string& routingKey, const qpid::framing::FieldTable* args) = 0;
virtual bool unbind(Queue::shared_ptr queue, const std::string& routingKey, const qpid::framing::FieldTable* args) = 0;
virtual bool isBound(Queue::shared_ptr queue, const std::string* const routingKey, const qpid::framing::FieldTable* const args) = 0;
+ virtual void setProperties(const boost::intrusive_ptr<Message>&);
virtual void route(Deliverable& msg, const std::string& routingKey, const qpid::framing::FieldTable* args) = 0;
-
+
//PersistableExchange:
void setPersistenceId(uint64_t id) const;
uint64_t getPersistenceId() const { return persistenceId; }
uint32_t encodedSize() const;
- QPID_BROKER_EXTERN void encode(framing::Buffer& buffer) const;
+ QPID_BROKER_EXTERN virtual void encode(framing::Buffer& buffer) const;
static QPID_BROKER_EXTERN Exchange::shared_ptr decode(ExchangeRegistry& exchanges, framing::Buffer& buffer);
Modified: qpid/branches/qpid-1673/qpid/cpp/src/qpid/broker/HeadersExchange.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-1673/qpid/cpp/src/qpid/broker/HeadersExchange.cpp?rev=752300&r1=752299&r2=752300&view=diff
==============================================================================
--- qpid/branches/qpid-1673/qpid/cpp/src/qpid/broker/HeadersExchange.cpp (original)
+++ qpid/branches/qpid-1673/qpid/cpp/src/qpid/broker/HeadersExchange.cpp Tue Mar 10 23:10:57 2009
@@ -105,33 +105,42 @@
void HeadersExchange::route(Deliverable& msg, const string& /*routingKey*/, const FieldTable* args){
- if (!args) return;//can't match if there were no headers passed in
+ if (!args) {
+ //can't match if there were no headers passed in
+ if (mgmtExchange != 0) {
+ mgmtExchange->inc_msgReceives();
+ mgmtExchange->inc_byteReceives(msg.contentSize());
+ mgmtExchange->inc_msgDrops();
+ mgmtExchange->inc_byteDrops(msg.contentSize());
+ }
+ return;
+ }
+
PreRoute pr(msg, this);
uint32_t count(0);
Bindings::ConstPtr p = bindings.snapshot();
if (p.get()){
- for (std::vector<Binding::shared_ptr>::const_iterator i = p->begin(); i != p->end(); ++i, count++) {
- if (match((*i)->args, *args)) msg.deliverTo((*i)->queue);
- if ((*i)->mgmtBinding != 0)
- (*i)->mgmtBinding->inc_msgMatched ();
+ for (std::vector<Binding::shared_ptr>::const_iterator i = p->begin(); i != p->end(); ++i) {
+ if (match((*i)->args, *args)) {
+ msg.deliverTo((*i)->queue);
+ count++;
+ if ((*i)->mgmtBinding != 0)
+ (*i)->mgmtBinding->inc_msgMatched();
+ }
}
}
- if (mgmtExchange != 0)
- {
- mgmtExchange->inc_msgReceives ();
- mgmtExchange->inc_byteReceives (msg.contentSize ());
- if (count == 0)
- {
- mgmtExchange->inc_msgDrops ();
- mgmtExchange->inc_byteDrops (msg.contentSize ());
- }
- else
- {
- mgmtExchange->inc_msgRoutes (count);
- mgmtExchange->inc_byteRoutes (count * msg.contentSize ());
+ if (mgmtExchange != 0) {
+ mgmtExchange->inc_msgReceives();
+ mgmtExchange->inc_byteReceives(msg.contentSize());
+ if (count == 0) {
+ mgmtExchange->inc_msgDrops();
+ mgmtExchange->inc_byteDrops(msg.contentSize());
+ } else {
+ mgmtExchange->inc_msgRoutes(count);
+ mgmtExchange->inc_byteRoutes(count * msg.contentSize());
}
}
}
Modified: qpid/branches/qpid-1673/qpid/cpp/src/qpid/broker/Link.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-1673/qpid/cpp/src/qpid/broker/Link.cpp?rev=752300&r1=752299&r2=752300&view=diff
==============================================================================
--- qpid/branches/qpid-1673/qpid/cpp/src/qpid/broker/Link.cpp (original)
+++ qpid/branches/qpid-1673/qpid/cpp/src/qpid/broker/Link.cpp Tue Mar 10 23:10:57 2009
@@ -158,7 +158,7 @@
}
for (Bridges::iterator i = active.begin(); i != active.end(); i++) {
- (*i)->cancel();
+ (*i)->closed();
created.push_back(*i);
}
active.clear();
@@ -217,21 +217,27 @@
void Link::cancel(Bridge::shared_ptr bridge)
{
- Mutex::ScopedLock mutex(lock);
-
- for (Bridges::iterator i = created.begin(); i != created.end(); i++) {
- if ((*i).get() == bridge.get()) {
- created.erase(i);
- break;
+ {
+ Mutex::ScopedLock mutex(lock);
+
+ for (Bridges::iterator i = created.begin(); i != created.end(); i++) {
+ if ((*i).get() == bridge.get()) {
+ created.erase(i);
+ break;
+ }
}
- }
- for (Bridges::iterator i = active.begin(); i != active.end(); i++) {
- if ((*i).get() == bridge.get()) {
- bridge->cancel();
- active.erase(i);
- break;
+ for (Bridges::iterator i = active.begin(); i != active.end(); i++) {
+ if ((*i).get() == bridge.get()) {
+ cancellations.push_back(bridge);
+ bridge->closed();
+ active.erase(i);
+ break;
+ }
}
}
+ if (!cancellations.empty()) {
+ connection->requestIOProcessing (boost::bind(&Link::ioThreadProcessing, this));
+ }
}
void Link::ioThreadProcessing()
@@ -242,7 +248,7 @@
return;
QPID_LOG(debug, "Link::ioThreadProcessing()");
- //process any pending creates
+ //process any pending creates and/or cancellations
if (!created.empty()) {
for (Bridges::iterator i = created.begin(); i != created.end(); ++i) {
active.push_back(*i);
@@ -250,6 +256,13 @@
}
created.clear();
}
+ if (!cancellations.empty()) {
+ for (Bridges::iterator i = cancellations.begin(); i != cancellations.end(); ++i) {
+ active.push_back(*i);
+ (*i)->cancel(*connection);
+ }
+ cancellations.clear();
+ }
}
void Link::setConnection(Connection* c)
@@ -284,7 +297,7 @@
}
}
}
- else if (state == STATE_OPERATIONAL && !created.empty() && connection != 0)
+ else if (state == STATE_OPERATIONAL && (!created.empty() || !cancellations.empty()) && connection != 0)
connection->requestIOProcessing (boost::bind(&Link::ioThreadProcessing, this));
}
Modified: qpid/branches/qpid-1673/qpid/cpp/src/qpid/broker/Link.h
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-1673/qpid/cpp/src/qpid/broker/Link.h?rev=752300&r1=752299&r2=752300&view=diff
==============================================================================
--- qpid/branches/qpid-1673/qpid/cpp/src/qpid/broker/Link.h (original)
+++ qpid/branches/qpid-1673/qpid/cpp/src/qpid/broker/Link.h Tue Mar 10 23:10:57 2009
@@ -67,6 +67,7 @@
typedef std::vector<Bridge::shared_ptr> Bridges;
Bridges created; // Bridges pending creation
Bridges active; // Bridges active
+ Bridges cancellations; // Bridges pending cancellation
uint channelCounter;
Connection* connection;
management::ManagementAgent* agent;
Modified: qpid/branches/qpid-1673/qpid/cpp/src/qpid/broker/LinkRegistry.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-1673/qpid/cpp/src/qpid/broker/LinkRegistry.cpp?rev=752300&r1=752299&r2=752300&view=diff
==============================================================================
--- qpid/branches/qpid-1673/qpid/cpp/src/qpid/broker/LinkRegistry.cpp (original)
+++ qpid/branches/qpid-1673/qpid/cpp/src/qpid/broker/LinkRegistry.cpp Tue Mar 10 23:10:57 2009
@@ -19,19 +19,24 @@
*
*/
#include "LinkRegistry.h"
+#include "Connection.h"
#include "qpid/log/Statement.h"
#include <iostream>
+#include <boost/format.hpp>
using namespace qpid::broker;
using namespace qpid::sys;
using std::pair;
using std::stringstream;
using boost::intrusive_ptr;
+using boost::format;
+using boost::str;
namespace _qmf = qmf::org::apache::qpid::broker;
#define LINK_MAINT_INTERVAL 2
-LinkRegistry::LinkRegistry (Broker* _broker) : broker(_broker), parent(0), store(0), passive(false), passiveChanged(false)
+LinkRegistry::LinkRegistry (Broker* _broker) : broker(_broker), parent(0), store(0), passive(false), passiveChanged(false),
+ realm(broker ? broker->getOptions().realm : "")
{
timer.add (intrusive_ptr<TimerTask> (new Periodic(*this)));
}
@@ -241,6 +246,7 @@
{
l->second->established();
l->second->setConnection(c);
+ c->setUserId(str(format("%1%@%2%") % l->second->getUsername() % realm));
}
}
Modified: qpid/branches/qpid-1673/qpid/cpp/src/qpid/broker/LinkRegistry.h
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-1673/qpid/cpp/src/qpid/broker/LinkRegistry.h?rev=752300&r1=752299&r2=752300&view=diff
==============================================================================
--- qpid/branches/qpid-1673/qpid/cpp/src/qpid/broker/LinkRegistry.h (original)
+++ qpid/branches/qpid-1673/qpid/cpp/src/qpid/broker/LinkRegistry.h Tue Mar 10 23:10:57 2009
@@ -66,6 +66,7 @@
MessageStore* store;
bool passive;
bool passiveChanged;
+ std::string realm;
void periodicMaintenance ();
bool updateAddress(const std::string& oldKey, const TcpAddress& newAddress);
Modified: qpid/branches/qpid-1673/qpid/cpp/src/qpid/broker/Message.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-1673/qpid/cpp/src/qpid/broker/Message.cpp?rev=752300&r1=752299&r2=752300&view=diff
==============================================================================
--- qpid/branches/qpid-1673/qpid/cpp/src/qpid/broker/Message.cpp (original)
+++ qpid/branches/qpid-1673/qpid/cpp/src/qpid/broker/Message.cpp Tue Mar 10 23:10:57 2009
@@ -382,4 +382,9 @@
void Message::setDequeueCompleteCallback(MessageCallback& cb) { dequeueCallback = &cb; }
void Message::resetDequeueCompleteCallback() { dequeueCallback = 0; }
+framing::FieldTable& Message::getOrInsertHeaders()
+{
+ return getProperties<MessageProperties>()->getApplicationHeaders();
+}
+
}} // namespace qpid::broker
Modified: qpid/branches/qpid-1673/qpid/cpp/src/qpid/broker/Message.h
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-1673/qpid/cpp/src/qpid/broker/Message.h?rev=752300&r1=752299&r2=752300&view=diff
==============================================================================
--- qpid/branches/qpid-1673/qpid/cpp/src/qpid/broker/Message.h (original)
+++ qpid/branches/qpid-1673/qpid/cpp/src/qpid/broker/Message.h Tue Mar 10 23:10:57 2009
@@ -73,6 +73,7 @@
QPID_BROKER_EXTERN std::string getExchangeName() const;
bool isImmediate() const;
QPID_BROKER_EXTERN const framing::FieldTable* getApplicationHeaders() const;
+ framing::FieldTable& getOrInsertHeaders();
QPID_BROKER_EXTERN bool isPersistent();
bool requiresAccept();
Modified: qpid/branches/qpid-1673/qpid/cpp/src/qpid/broker/Queue.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-1673/qpid/cpp/src/qpid/broker/Queue.cpp?rev=752300&r1=752299&r2=752300&view=diff
==============================================================================
--- qpid/branches/qpid-1673/qpid/cpp/src/qpid/broker/Queue.cpp (original)
+++ qpid/branches/qpid-1673/qpid/cpp/src/qpid/broker/Queue.cpp Tue Mar 10 23:10:57 2009
@@ -93,7 +93,8 @@
policyExceeded(false),
mgmtObject(0),
eventMode(0),
- eventMgr(0)
+ eventMgr(0),
+ insertSeqNo(0)
{
if (parent != 0)
{
@@ -176,7 +177,7 @@
void Queue::recover(boost::intrusive_ptr<Message>& msg){
- push(msg);
+ push(msg, true);
msg->enqueueComplete(); // mark the message as enqueued
mgntEnqStats(msg);
@@ -545,12 +546,13 @@
++dequeueTracker;
}
-void Queue::push(boost::intrusive_ptr<Message>& msg){
+void Queue::push(boost::intrusive_ptr<Message>& msg, bool isRecovery){
QueueListeners::NotificationSet copy;
{
Mutex::ScopedLock locker(messageLock);
QueuedMessage qm(this, msg, ++sequence);
if (policy.get()) policy->tryEnqueue(qm);
+ if (insertSeqNo) msg->getOrInsertHeaders().setInt64(seqNoKey, sequence);
LVQ::iterator i;
const framing::FieldTable* ft = msg->getApplicationHeaders();
@@ -566,14 +568,21 @@
boost::intrusive_ptr<Message> old = i->second->getReplacementMessage(this);
if (!old) old = i->second;
i->second->setReplacementMessage(msg,this);
- dequeued(QueuedMessage(qm.queue, old, qm.position));
+ if (isRecovery) {
+ //can't issue new requests for the store until
+ //recovery is complete
+ pendingDequeues.push_back(QueuedMessage(qm.queue, old, qm.position));
+ } else {
+ dequeue(0, QueuedMessage(qm.queue, old, qm.position));
+ }
}
}else {
messages.push_back(qm);
listeners.populate(copy);
}
- if (eventMode && eventMgr) {
- eventMgr->enqueued(qm);
+ if (eventMode) {
+ if (eventMgr) eventMgr->enqueued(qm);
+ else QPID_LOG(warning, "Enqueue manager not set, events not generated for " << getName());
}
}
copy.notify();
@@ -664,7 +673,7 @@
msg->addTraceId(traceId);
}
- if (msg->isPersistent() && store && !lastValueQueue) {
+ if (msg->isPersistent() && store) {
msg->enqueueAsync(shared_from_this(), store); //increment to async counter -- for message sent to more than one queue
boost::intrusive_ptr<PersistableMessage> pmsg = boost::static_pointer_cast<PersistableMessage>(msg);
store->enqueue(ctxt, pmsg, *this);
@@ -676,14 +685,14 @@
// return true if store exists,
bool Queue::dequeue(TransactionContext* ctxt, const QueuedMessage& msg)
{
- if (policy.get() && !policy->isEnqueued(msg)) return false;
{
Mutex::ScopedLock locker(messageLock);
+ if (policy.get() && !policy->isEnqueued(msg)) return false;
if (!ctxt) {
dequeued(msg);
}
}
- if (msg.payload->isPersistent() && store && !lastValueQueue) {
+ if (msg.payload->isPersistent() && store) {
msg.payload->dequeueAsync(shared_from_this(), store); //increment to async counter -- for message sent to more than one queue
boost::intrusive_ptr<PersistableMessage> pmsg = boost::static_pointer_cast<PersistableMessage>(msg.payload);
store->dequeue(ctxt, pmsg, *this);
@@ -976,3 +985,16 @@
{
eventMgr = &mgr;
}
+
+void Queue::recoveryComplete()
+{
+ //process any pending dequeues
+ for_each(pendingDequeues.begin(), pendingDequeues.end(), boost::bind(&Queue::dequeue, this, (TransactionContext*) 0, _1));
+ pendingDequeues.clear();
+}
+
+void Queue::insertSequenceNumbers(const std::string& key)
+{
+ seqNoKey = key;
+ insertSeqNo = !seqNoKey.empty();
+}
Modified: qpid/branches/qpid-1673/qpid/cpp/src/qpid/broker/Queue.h
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-1673/qpid/cpp/src/qpid/broker/Queue.h?rev=752300&r1=752299&r2=752300&view=diff
==============================================================================
--- qpid/branches/qpid-1673/qpid/cpp/src/qpid/broker/Queue.h (original)
+++ qpid/branches/qpid-1673/qpid/cpp/src/qpid/broker/Queue.h Tue Mar 10 23:10:57 2009
@@ -87,6 +87,7 @@
std::vector<std::string> traceExclude;
QueueListeners listeners;
Messages messages;
+ Messages pendingDequeues;//used to avoid dequeuing during recovery
LVQ lvq;
mutable qpid::sys::Mutex consumerLock;
mutable qpid::sys::Mutex messageLock;
@@ -102,8 +103,10 @@
RateTracker dequeueTracker;
int eventMode;
QueueEvents* eventMgr;
+ bool insertSeqNo;
+ std::string seqNoKey;
- void push(boost::intrusive_ptr<Message>& msg);
+ void push(boost::intrusive_ptr<Message>& msg, bool isRecovery=false);
void setPolicy(std::auto_ptr<QueuePolicy> policy);
bool seek(QueuedMessage& msg, Consumer::shared_ptr position);
bool getNextMessage(QueuedMessage& msg, Consumer::shared_ptr c);
@@ -298,6 +301,11 @@
void setPosition(framing::SequenceNumber pos);
int getEventMode();
void setQueueEventManager(QueueEvents&);
+ void insertSequenceNumbers(const std::string& key);
+ /**
+ * Notify queue that recovery has completed.
+ */
+ void recoveryComplete();
};
}
}
Modified: qpid/branches/qpid-1673/qpid/cpp/src/qpid/broker/QueueEvents.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-1673/qpid/cpp/src/qpid/broker/QueueEvents.cpp?rev=752300&r1=752299&r2=752300&view=diff
==============================================================================
--- qpid/branches/qpid-1673/qpid/cpp/src/qpid/broker/QueueEvents.cpp (original)
+++ qpid/branches/qpid-1673/qpid/cpp/src/qpid/broker/QueueEvents.cpp Tue Mar 10 23:10:57 2009
@@ -20,12 +20,13 @@
*/
#include "QueueEvents.h"
#include "qpid/Exception.h"
+#include "qpid/log/Statement.h"
namespace qpid {
namespace broker {
QueueEvents::QueueEvents(const boost::shared_ptr<sys::Poller>& poller) :
- eventQueue(boost::bind(&QueueEvents::handle, this, _1), poller)
+ eventQueue(boost::bind(&QueueEvents::handle, this, _1), poller), enabled(true)
{
eventQueue.start();
}
@@ -37,12 +38,12 @@
void QueueEvents::enqueued(const QueuedMessage& m)
{
- eventQueue.push(Event(ENQUEUE, m));
+ if (enabled) eventQueue.push(Event(ENQUEUE, m));
}
void QueueEvents::dequeued(const QueuedMessage& m)
{
- eventQueue.push(Event(DEQUEUE, m));
+ if (enabled) eventQueue.push(Event(DEQUEUE, m));
}
void QueueEvents::registerListener(const std::string& id, const EventListener& listener)
@@ -81,6 +82,18 @@
if (!eventQueue.empty() && !listeners.empty()) eventQueue.shutdown();
}
+void QueueEvents::enable()
+{
+ enabled = true;
+ QPID_LOG(debug, "Queue events enabled");
+}
+
+void QueueEvents::disable()
+{
+ enabled = false;
+ QPID_LOG(debug, "Queue events disabled");
+}
+
QueueEvents::Event::Event(EventType t, const QueuedMessage& m) : type(t), msg(m) {}
Modified: qpid/branches/qpid-1673/qpid/cpp/src/qpid/broker/QueueEvents.h
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-1673/qpid/cpp/src/qpid/broker/QueueEvents.h?rev=752300&r1=752299&r2=752300&view=diff
==============================================================================
--- qpid/branches/qpid-1673/qpid/cpp/src/qpid/broker/QueueEvents.h (original)
+++ qpid/branches/qpid-1673/qpid/cpp/src/qpid/broker/QueueEvents.h Tue Mar 10 23:10:57 2009
@@ -61,6 +61,8 @@
QPID_BROKER_EXTERN void registerListener(const std::string& id,
const EventListener&);
QPID_BROKER_EXTERN void unregisterListener(const std::string& id);
+ void enable();
+ void disable();
//process all outstanding events
QPID_BROKER_EXTERN void shutdown();
private:
@@ -69,6 +71,7 @@
EventQueue eventQueue;
Listeners listeners;
+ volatile bool enabled;
qpid::sys::Mutex lock;//protect listeners from concurrent access
void handle(EventQueue::Queue& e);
Modified: qpid/branches/qpid-1673/qpid/cpp/src/qpid/broker/QueuePolicy.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-1673/qpid/cpp/src/qpid/broker/QueuePolicy.cpp?rev=752300&r1=752299&r2=752300&view=diff
==============================================================================
--- qpid/branches/qpid-1673/qpid/cpp/src/qpid/broker/QueuePolicy.cpp (original)
+++ qpid/branches/qpid-1673/qpid/cpp/src/qpid/broker/QueuePolicy.cpp Tue Mar 10 23:10:57 2009
@@ -126,7 +126,7 @@
FieldTable::ValuePtr v = settings.get(typeKey);
if (v && v->convertsTo<std::string>()) {
std::string t = v->get<std::string>();
- transform(t.begin(), t.end(), t.begin(), tolower);
+ std::transform(t.begin(), t.end(), t.begin(), tolower);
if (t == REJECT || t == FLOW_TO_DISK || t == RING || t == RING_STRICT) return t;
}
return FLOW_TO_DISK;
@@ -197,11 +197,12 @@
void RingQueuePolicy::dequeued(const QueuedMessage& m)
{
qpid::sys::Mutex::ScopedLock l(lock);
- QueuePolicy::dequeued(m);
//find and remove m from queue
- for (Messages::iterator i = queue.begin(); i != queue.end() && m.position <= i->position; i++) {
- if (i->position == m.position) {
+ for (Messages::iterator i = queue.begin(); i != queue.end(); i++) {
+ if (i->payload == m.payload) {
queue.erase(i);
+ //now update count and size
+ QueuePolicy::dequeued(m);
break;
}
}
@@ -210,9 +211,11 @@
bool RingQueuePolicy::isEnqueued(const QueuedMessage& m)
{
qpid::sys::Mutex::ScopedLock l(lock);
- //for non-strict ring policy, a message can be dequeued before acked; need to detect this
- for (Messages::iterator i = queue.begin(); i != queue.end() && m.position <= i->position; i++) {
- if (i->position == m.position) {
+ //for non-strict ring policy, a message can be replaced (and
+ //therefore dequeued) before it is accepted or released by
+ //subscriber; need to detect this
+ for (Messages::const_iterator i = queue.begin(); i != queue.end(); i++) {
+ if (i->payload == m.payload) {
return true;
}
}
@@ -236,13 +239,10 @@
oldest = queue.front();
}
if (oldest.queue->acquire(oldest) || !strict) {
- qpid::sys::Mutex::ScopedLock l(lock);
- if (oldest.position == queue.front().position) {
- queue.pop_front();
- QPID_LOG(debug, "Ring policy triggered in queue "
- << (m.queue ? m.queue->getName() : std::string("unknown queue"))
- << ": removed message " << oldest.position << " to make way for " << m.position);
- }
+ oldest.queue->dequeue(0, oldest);
+ QPID_LOG(debug, "Ring policy triggered in queue "
+ << (m.queue ? m.queue->getName() : std::string("unknown queue"))
+ << ": removed message " << oldest.position << " to make way for " << m.position);
return true;
} else {
QPID_LOG(debug, "Ring policy could not be triggered in queue "
Modified: qpid/branches/qpid-1673/qpid/cpp/src/qpid/broker/QueueRegistry.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-1673/qpid/cpp/src/qpid/broker/QueueRegistry.cpp?rev=752300&r1=752299&r2=752300&view=diff
==============================================================================
--- qpid/branches/qpid-1673/qpid/cpp/src/qpid/broker/QueueRegistry.cpp (original)
+++ qpid/branches/qpid-1673/qpid/cpp/src/qpid/broker/QueueRegistry.cpp Tue Mar 10 23:10:57 2009
@@ -19,6 +19,7 @@
*
*/
#include "QueueRegistry.h"
+#include "QueueEvents.h"
#include "qpid/log/Statement.h"
#include <sstream>
#include <assert.h>
@@ -27,7 +28,7 @@
using namespace qpid::sys;
QueueRegistry::QueueRegistry() :
- counter(1), store(0), parent(0), lastNode(false) {}
+ counter(1), store(0), events(0), parent(0), lastNode(false) {}
QueueRegistry::~QueueRegistry(){}
@@ -43,7 +44,8 @@
if (i == queues.end()) {
Queue::shared_ptr queue(new Queue(name, autoDelete, durable ? store : 0, owner, parent));
queues[name] = queue;
- if (lastNode) queue->setLastNodeFailure();
+ if (lastNode) queue->setLastNodeFailure();
+ if (events) queue->setQueueEventManager(*events);
return std::pair<Queue::shared_ptr, bool>(queue, true);
} else {
@@ -105,3 +107,7 @@
lastNode = _lastNode;
}
+void QueueRegistry::setQueueEvents(QueueEvents* e)
+{
+ events = e;
+}
Modified: qpid/branches/qpid-1673/qpid/cpp/src/qpid/broker/QueueRegistry.h
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-1673/qpid/cpp/src/qpid/broker/QueueRegistry.h?rev=752300&r1=752299&r2=752300&view=diff
==============================================================================
--- qpid/branches/qpid-1673/qpid/cpp/src/qpid/broker/QueueRegistry.h (original)
+++ qpid/branches/qpid-1673/qpid/cpp/src/qpid/broker/QueueRegistry.h Tue Mar 10 23:10:57 2009
@@ -32,6 +32,8 @@
namespace qpid {
namespace broker {
+class QueueEvents;
+
/**
* A registry of queues indexed by queue name.
*
@@ -90,6 +92,8 @@
*/
string generateName();
+ void setQueueEvents(QueueEvents*);
+
/**
* Set the store to use. May only be called once.
*/
@@ -124,6 +128,7 @@
mutable qpid::sys::RWlock lock;
int counter;
MessageStore* store;
+ QueueEvents* events;
management::Manageable* parent;
bool lastNode; //used to set mode on queue declare
Modified: qpid/branches/qpid-1673/qpid/cpp/src/qpid/broker/RecoveryManagerImpl.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-1673/qpid/cpp/src/qpid/broker/RecoveryManagerImpl.cpp?rev=752300&r1=752299&r2=752300&view=diff
==============================================================================
--- qpid/branches/qpid-1673/qpid/cpp/src/qpid/broker/RecoveryManagerImpl.cpp (original)
+++ qpid/branches/qpid-1673/qpid/cpp/src/qpid/broker/RecoveryManagerImpl.cpp Tue Mar 10 23:10:57 2009
@@ -149,7 +149,8 @@
void RecoveryManagerImpl::recoveryComplete()
{
- //TODO (finalise binding setup etc)
+ //notify all queues
+ queues.eachQueue(boost::bind(&Queue::recoveryComplete, _1));
}
bool RecoverableMessageImpl::loadContent(uint64_t available)
Modified: qpid/branches/qpid-1673/qpid/cpp/src/qpid/broker/SaslAuthenticator.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-1673/qpid/cpp/src/qpid/broker/SaslAuthenticator.cpp?rev=752300&r1=752299&r2=752300&view=diff
==============================================================================
--- qpid/branches/qpid-1673/qpid/cpp/src/qpid/broker/SaslAuthenticator.cpp (original)
+++ qpid/branches/qpid-1673/qpid/cpp/src/qpid/broker/SaslAuthenticator.cpp Tue Mar 10 23:10:57 2009
@@ -141,21 +141,31 @@
void NullAuthenticator::getMechanisms(Array& mechanisms)
{
mechanisms.add(boost::shared_ptr<FieldValue>(new Str16Value("ANONYMOUS")));
+ mechanisms.add(boost::shared_ptr<FieldValue>(new Str16Value("PLAIN")));//useful for testing
}
void NullAuthenticator::start(const string& mechanism, const string& response)
{
if (mechanism == "PLAIN") { // Old behavior
- if (response.size() > 0 && response[0] == (char) 0) {
- string temp = response.substr(1);
- string::size_type i = temp.find((char)0);
- string uid = temp.substr(0, i);
- string pwd = temp.substr(i + 1);
- i = uid.find_last_of(realm);
- if (i == string::npos || i != (uid.size() - 1)) {
- uid = str(format("%1%@%2%") % uid % realm);
+ if (response.size() > 0) {
+ string uid;
+ string::size_type i = response.find((char)0);
+ if (i == 0 && response.size() > 1) {
+ //no authorization id; use authentication id
+ i = response.find((char)0, 1);
+ if (i != string::npos) uid = response.substr(1, i-1);
+ } else if (i != string::npos) {
+ //authorization id is first null delimited field
+ uid = response.substr(0, i);
+ }//else not a valid SASL PLAIN response, throw error?
+ if (!uid.empty()) {
+ //append realm if it has not already been added
+ i = uid.find(realm);
+ if (i == string::npos || realm.size() + i < uid.size()) {
+ uid = str(format("%1%@%2%") % uid % realm);
+ }
+ connection.setUserId(uid);
}
- connection.setUserId(uid);
}
} else {
connection.setUserId("anonymous");
Modified: qpid/branches/qpid-1673/qpid/cpp/src/qpid/broker/SemanticState.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-1673/qpid/cpp/src/qpid/broker/SemanticState.cpp?rev=752300&r1=752299&r2=752300&view=diff
==============================================================================
--- qpid/branches/qpid-1673/qpid/cpp/src/qpid/broker/SemanticState.cpp (original)
+++ qpid/branches/qpid-1673/qpid/cpp/src/qpid/broker/SemanticState.cpp Tue Mar 10 23:10:57 2009
@@ -355,20 +355,12 @@
}
void SemanticState::route(intrusive_ptr<Message> msg, Deliverable& strategy) {
+ msg->setTimestamp(getSession().getBroker().getExpiryPolicy());
+
std::string exchangeName = msg->getExchangeName();
- //TODO: the following should be hidden behind message (using MessageAdapter or similar)
-
- if (msg->isA<MessageTransferBody>()) {
- // Do not replace the delivery-properties.exchange if it is is already set.
- // This is used internally (by the cluster) to force the exchange name on a message.
- // The client library ensures this is always empty for messages from normal clients.
- if (!msg->hasProperties<DeliveryProperties>() || msg->getProperties<DeliveryProperties>()->getExchange().empty())
- msg->getProperties<DeliveryProperties>()->setExchange(exchangeName);
- msg->setTimestamp(getSession().getBroker().getExpiryPolicy());
- }
- if (!cacheExchange || cacheExchange->getName() != exchangeName){
+ if (!cacheExchange || cacheExchange->getName() != exchangeName)
cacheExchange = session.getBroker().getExchanges().get(exchangeName);
- }
+ cacheExchange->setProperties(msg);
/* verify the userid if specified: */
std::string id =
@@ -516,14 +508,16 @@
void SemanticState::ConsumerImpl::addByteCredit(uint32_t value)
{
if (byteCredit != 0xFFFFFFFF) {
- byteCredit += value;
+ if (value == 0xFFFFFFFF) byteCredit = value;
+ else byteCredit += value;
}
}
void SemanticState::ConsumerImpl::addMessageCredit(uint32_t value)
{
if (msgCredit != 0xFFFFFFFF) {
- msgCredit += value;
+ if (value == 0xFFFFFFFF) msgCredit = value;
+ else msgCredit += value;
}
}
Modified: qpid/branches/qpid-1673/qpid/cpp/src/qpid/broker/SessionAdapter.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-1673/qpid/cpp/src/qpid/broker/SessionAdapter.cpp?rev=752300&r1=752299&r2=752300&view=diff
==============================================================================
--- qpid/branches/qpid-1673/qpid/cpp/src/qpid/broker/SessionAdapter.cpp (original)
+++ qpid/branches/qpid-1673/qpid/cpp/src/qpid/broker/SessionAdapter.cpp Tue Mar 10 23:10:57 2009
@@ -362,10 +362,6 @@
getBroker().getExchanges().getDefault()->bind(queue, name, 0);
queue->bound(getBroker().getExchanges().getDefault()->getName(), name, arguments);
- //if event generation is turned on, pass in a pointer to
- //the QueueEvents instance to use
- if (queue->getEventMode()) queue->setQueueEventManager(getBroker().getQueueEvents());
-
//handle automatic cleanup:
if (exclusive) {
exclusiveQueues.push_back(queue);
Modified: qpid/branches/qpid-1673/qpid/cpp/src/qpid/broker/SessionHandler.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-1673/qpid/cpp/src/qpid/broker/SessionHandler.cpp?rev=752300&r1=752299&r2=752300&view=diff
==============================================================================
--- qpid/branches/qpid-1673/qpid/cpp/src/qpid/broker/SessionHandler.cpp (original)
+++ qpid/branches/qpid-1673/qpid/cpp/src/qpid/broker/SessionHandler.cpp Tue Mar 10 23:10:57 2009
@@ -34,7 +34,8 @@
SessionHandler::SessionHandler(Connection& c, ChannelId ch)
: amqp_0_10::SessionHandler(&c.getOutput(), ch),
connection(c),
- proxy(out)
+ proxy(out),
+ clusterOrderProxy(c.getClusterOrderOutput() ? new SetChannelProxy(ch, c.getClusterOrderOutput()) : 0)
{}
SessionHandler::~SessionHandler() {}
@@ -84,11 +85,23 @@
if (session.get()) session->readyToSend();
}
-// TODO aconway 2008-05-12: hacky - handle attached for bridge clients.
-// We need to integrate the client code so we can run a real client
-// in the bridge.
-//
-void SessionHandler::attached(const std::string& name) {
+/**
+ * Used by inter-broker bridges to set up session id and attach
+ */
+void SessionHandler::attachAs(const std::string& name)
+{
+ SessionId id(connection.getUserId(), name);
+ SessionState::Configuration config = connection.broker.getSessionManager().getSessionConfig();
+ session.reset(new SessionState(connection.getBroker(), *this, id, config));
+ sendAttach(false);
+}
+
+/**
+ * TODO: this is a little ugly, fix it; its currently still relied on
+ * for 'push' bridges
+ */
+void SessionHandler::attached(const std::string& name)
+{
if (session.get()) {
amqp_0_10::SessionHandler::attached(name);
} else {
Modified: qpid/branches/qpid-1673/qpid/cpp/src/qpid/broker/SessionHandler.h
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-1673/qpid/cpp/src/qpid/broker/SessionHandler.h?rev=752300&r1=752299&r2=752300&view=diff
==============================================================================
--- qpid/branches/qpid-1673/qpid/cpp/src/qpid/broker/SessionHandler.h (original)
+++ qpid/branches/qpid-1673/qpid/cpp/src/qpid/broker/SessionHandler.h Tue Mar 10 23:10:57 2009
@@ -54,10 +54,20 @@
framing::AMQP_ClientProxy& getProxy() { return proxy; }
const framing::AMQP_ClientProxy& getProxy() const { return proxy; }
+ /**
+ * If commands are sent based on the local time (e.g. in timers), they don't have
+ * a well-defined ordering across cluster nodes.
+ * This proxy is for sending such commands. In a clustered broker it will take steps
+ * to synchronize command order across the cluster. In a stand-alone broker
+ * it is just a synonym for getProxy()
+ */
+ framing::AMQP_ClientProxy& getClusterOrderProxy() {
+ return clusterOrderProxy.get() ? *clusterOrderProxy : proxy;
+ }
+
virtual void handleDetach();
-
- // Overrides
- void attached(const std::string& name);
+ void attached(const std::string& name);//used by 'pushing' inter-broker bridges
+ void attachAs(const std::string& name);//used by 'pulling' inter-broker bridges
protected:
virtual void setState(const std::string& sessionName, bool force);
@@ -69,9 +79,16 @@
virtual void readyToSend();
private:
+ struct SetChannelProxy : public framing::AMQP_ClientProxy { // Proxy that sets the channel.
+ framing::ChannelHandler setChannel;
+ SetChannelProxy(uint16_t ch, framing::FrameHandler* out)
+ : framing::AMQP_ClientProxy(setChannel), setChannel(ch, out) {}
+ };
+
Connection& connection;
framing::AMQP_ClientProxy proxy;
std::auto_ptr<SessionState> session;
+ std::auto_ptr<SetChannelProxy> clusterOrderProxy;
};
}} // namespace qpid::broker
Modified: qpid/branches/qpid-1673/qpid/cpp/src/qpid/broker/SessionState.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-1673/qpid/cpp/src/qpid/broker/SessionState.cpp?rev=752300&r1=752299&r2=752300&view=diff
==============================================================================
--- qpid/branches/qpid-1673/qpid/cpp/src/qpid/broker/SessionState.cpp (original)
+++ qpid/branches/qpid-1673/qpid/cpp/src/qpid/broker/SessionState.cpp Tue Mar 10 23:10:57 2009
@@ -66,7 +66,7 @@
uint32_t maxRate = broker.getOptions().maxSessionRate;
if (maxRate) {
if (handler->getConnection().getClientThrottling()) {
- rateFlowcontrol = new RateFlowcontrol(maxRate);
+ rateFlowcontrol.reset(new RateFlowcontrol(maxRate));
} else {
QPID_LOG(warning, getId() << ": Unable to flow control client - client doesn't support");
}
@@ -210,14 +210,17 @@
{}
void fire() {
- QPID_LOG(critical, "ScheduledCreditTask fired"); // FIXME aconway 2009-02-23: REMOVE
// This is the best we can currently do to avoid a destruction/fire race
if (!isCancelled()) {
- if ( !sessionState.processSendCredit(0) ) {
- QPID_LOG(warning, sessionState.getId() << ": Reschedule sending credit");
- reset();
- timer.add(this);
- }
+ sessionState.getConnection().requestIOProcessing(boost::bind(&ScheduledCreditTask::sendCredit, this));
+ }
+ }
+
+ void sendCredit() {
+ if ( !sessionState.processSendCredit(0) ) {
+ QPID_LOG(warning, sessionState.getId() << ": Reschedule sending credit");
+ reset();
+ timer.add(this);
}
}
};
@@ -275,7 +278,8 @@
if ( msgs > 0 && rateFlowcontrol->flowStopped() ) {
QPID_LOG(warning, getId() << ": producer throttling violation");
// TODO: Probably do message.stop("") first time then disconnect
- getProxy().getMessage().stop("");
+ // See comment on getClusterOrderProxy() in .h file
+ getClusterOrderProxy().getMessage().stop("");
return true;
}
AbsTime now = AbsTime::now();
@@ -283,7 +287,7 @@
if (mgmtObject) mgmtObject->dec_clientCredit(msgs);
if ( sendCredit>0 ) {
QPID_LOG(debug, getId() << ": send producer credit " << sendCredit);
- getProxy().getMessage().flow("", 0, sendCredit);
+ getClusterOrderProxy().getMessage().flow("", 0, sendCredit);
rateFlowcontrol->sentCredit(now, sendCredit);
if (mgmtObject) mgmtObject->inc_clientCredit(sendCredit);
return true;
@@ -364,8 +368,9 @@
// Issue initial credit - use a heuristic here issue min of 300 messages or 1 secs worth
uint32_t credit = std::min(rateFlowcontrol->getRate(), 300U);
QPID_LOG(debug, getId() << ": Issuing producer message credit " << credit);
- getProxy().getMessage().setFlowMode("", 0);
- getProxy().getMessage().flow("", 0, credit);
+ // See comment on getClusterOrderProxy() in .h file
+ getClusterOrderProxy().getMessage().setFlowMode("", 0);
+ getClusterOrderProxy().getMessage().flow("", 0, credit);
rateFlowcontrol->sentCredit(AbsTime::now(), credit);
if (mgmtObject) mgmtObject->inc_clientCredit(credit);
}
@@ -373,4 +378,8 @@
Broker& SessionState::getBroker() { return broker; }
+framing::AMQP_ClientProxy& SessionState::getClusterOrderProxy() {
+ return handler->getClusterOrderProxy();
+}
+
}} // namespace qpid::broker
Modified: qpid/branches/qpid-1673/qpid/cpp/src/qpid/broker/SessionState.h
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-1673/qpid/cpp/src/qpid/broker/SessionState.h?rev=752300&r1=752299&r2=752300&view=diff
==============================================================================
--- qpid/branches/qpid-1673/qpid/cpp/src/qpid/broker/SessionState.h (original)
+++ qpid/branches/qpid-1673/qpid/cpp/src/qpid/broker/SessionState.h Tue Mar 10 23:10:57 2009
@@ -125,6 +125,15 @@
void sendAcceptAndCompletion();
+ /**
+ * If commands are sent based on the local time (e.g. in timers), they don't have
+ * a well-defined ordering across cluster nodes.
+ * This proxy is for sending such commands. In a clustered broker it will take steps
+ * to synchronize command order across the cluster. In a stand-alone broker
+ * it is just a synonym for getProxy()
+ */
+ framing::AMQP_ClientProxy& getClusterOrderProxy();
+
Broker& broker;
SessionHandler* handler;
sys::AbsTime expiry; // Used by SessionManager.
@@ -138,7 +147,7 @@
// State used for producer flow control (rate limited)
qpid::sys::Mutex rateLock;
- RateFlowcontrol* rateFlowcontrol;
+ boost::scoped_ptr<RateFlowcontrol> rateFlowcontrol;
boost::intrusive_ptr<TimerTask> flowControlTimer;
friend class SessionManager;
Modified: qpid/branches/qpid-1673/qpid/cpp/src/qpid/broker/TxAccept.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-1673/qpid/cpp/src/qpid/broker/TxAccept.cpp?rev=752300&r1=752299&r2=752300&view=diff
==============================================================================
--- qpid/branches/qpid-1673/qpid/cpp/src/qpid/broker/TxAccept.cpp (original)
+++ qpid/branches/qpid-1673/qpid/cpp/src/qpid/broker/TxAccept.cpp Tue Mar 10 23:10:57 2009
@@ -50,12 +50,12 @@
void TxAccept::RangeOps::prepare(TransactionContext* ctxt)
{
- for_each(ranges.begin(), ranges.end(), bind(&RangeOp::prepare, _1, ctxt));
+ std::for_each(ranges.begin(), ranges.end(), bind(&RangeOp::prepare, _1, ctxt));
}
void TxAccept::RangeOps::commit()
{
- for_each(ranges.begin(), ranges.end(), bind(&RangeOp::commit, _1));
+ std::for_each(ranges.begin(), ranges.end(), bind(&RangeOp::commit, _1));
//now remove if isRedundant():
if (!ranges.empty()) {
ack_iterator i = ranges.front().range.start;
Modified: qpid/branches/qpid-1673/qpid/cpp/src/qpid/client/ConnectionImpl.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-1673/qpid/cpp/src/qpid/client/ConnectionImpl.cpp?rev=752300&r1=752299&r2=752300&view=diff
==============================================================================
--- qpid/branches/qpid-1673/qpid/cpp/src/qpid/client/ConnectionImpl.cpp (original)
+++ qpid/branches/qpid-1673/qpid/cpp/src/qpid/client/ConnectionImpl.cpp Tue Mar 10 23:10:57 2009
@@ -180,6 +180,9 @@
template <class F> void ConnectionImpl::closeInternal(const F& f) {
+ if (heartbeatTask) {
+ heartbeatTask->cancel();
+ }
{
Mutex::ScopedUnlock u(lock);
connector->close();
Modified: qpid/branches/qpid-1673/qpid/cpp/src/qpid/client/Connector.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-1673/qpid/cpp/src/qpid/client/Connector.cpp?rev=752300&r1=752299&r2=752300&view=diff
==============================================================================
--- qpid/branches/qpid-1673/qpid/cpp/src/qpid/client/Connector.cpp (original)
+++ qpid/branches/qpid-1673/qpid/cpp/src/qpid/client/Connector.cpp Tue Mar 10 23:10:57 2009
@@ -92,8 +92,6 @@
framing::ProtocolVersion version;
bool initiated;
-
- sys::Mutex closedLock;
bool closed;
bool joined;
@@ -185,7 +183,7 @@
}
void TCPConnector::connect(const std::string& host, int port){
- Mutex::ScopedLock l(closedLock);
+ Mutex::ScopedLock l(lock);
assert(closed);
try {
socket.connect(host, port);
@@ -207,7 +205,7 @@
}
void TCPConnector::init(){
- Mutex::ScopedLock l(closedLock);
+ Mutex::ScopedLock l(lock);
assert(joined);
ProtocolInitiation init(version);
writeDataBlock(init);
@@ -216,17 +214,21 @@
}
bool TCPConnector::closeInternal() {
- Mutex::ScopedLock l(closedLock);
- bool ret = !closed;
+ bool ret;
+ {
+ Mutex::ScopedLock l(lock);
+ ret = !closed;
if (!closed) {
closed = true;
+ aio->queueForDeletion();
poller->shutdown();
}
- if (!joined && receiver.id() != Thread::current().id()) {
- joined = true;
- Mutex::ScopedUnlock u(closedLock);
- receiver.join();
+ if (joined || receiver.id() == Thread::current().id()) {
+ return ret;
}
+ joined = true;
+ }
+ receiver.join();
return ret;
}
@@ -259,21 +261,19 @@
}
void TCPConnector::send(AMQFrame& frame) {
+ Mutex::ScopedLock l(lock);
+ frames.push_back(frame);
+ //only ask to write if this is the end of a frameset or if we
+ //already have a buffers worth of data
+ currentSize += frame.encodedSize();
bool notifyWrite = false;
- {
- Mutex::ScopedLock l(lock);
- frames.push_back(frame);
- //only ask to write if this is the end of a frameset or if we
- //already have a buffers worth of data
- currentSize += frame.encodedSize();
- if (frame.getEof()) {
- lastEof = frames.size();
- notifyWrite = true;
- } else {
- notifyWrite = (currentSize >= maxFrameSize);
- }
+ if (frame.getEof()) {
+ lastEof = frames.size();
+ notifyWrite = true;
+ } else {
+ notifyWrite = (currentSize >= maxFrameSize);
}
- if (notifyWrite) aio->notifyPendingWrite();
+ if (notifyWrite && !closed) aio->notifyPendingWrite();
}
void TCPConnector::handleClosed() {
@@ -384,14 +384,13 @@
assert(protect);
try {
Dispatcher d(poller);
-
+
for (int i = 0; i < 32; i++) {
aio->queueReadBuffer(new Buff(maxFrameSize));
}
-
+
aio->start(poller);
d.run();
- aio->queueForDeletion();
socket.close();
} catch (const std::exception& e) {
QPID_LOG(error, QPID_MSG("FAIL " << identifier << ": " << e.what()));
Modified: qpid/branches/qpid-1673/qpid/cpp/src/qpid/client/Dispatcher.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-1673/qpid/cpp/src/qpid/client/Dispatcher.cpp?rev=752300&r1=752299&r2=752300&view=diff
==============================================================================
--- qpid/branches/qpid-1673/qpid/cpp/src/qpid/client/Dispatcher.cpp (original)
+++ qpid/branches/qpid-1673/qpid/cpp/src/qpid/client/Dispatcher.cpp Tue Mar 10 23:10:57 2009
@@ -136,8 +136,7 @@
void Dispatcher::cancel(const std::string& destination) {
ScopedLock<Mutex> l(lock);
- listeners.erase(destination);
- if (autoStop && listeners.empty())
+ if (listeners.erase(destination) && running && autoStop && listeners.empty())
queue->close();
}
Modified: qpid/branches/qpid-1673/qpid/cpp/src/qpid/client/FailoverManager.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-1673/qpid/cpp/src/qpid/client/FailoverManager.cpp?rev=752300&r1=752299&r2=752300&view=diff
==============================================================================
--- qpid/branches/qpid-1673/qpid/cpp/src/qpid/client/FailoverManager.cpp (original)
+++ qpid/branches/qpid-1673/qpid/cpp/src/qpid/client/FailoverManager.cpp Tue Mar 10 23:10:57 2009
@@ -21,12 +21,15 @@
#include "FailoverManager.h"
#include "qpid/Exception.h"
#include "qpid/log/Statement.h"
+#include "qpid/sys/Time.h"
namespace qpid {
namespace client {
using qpid::sys::Monitor;
+using qpid::sys::AbsTime;
+using qpid::sys::Duration;
FailoverManager::FailoverManager(const ConnectionSettings& s,
ReconnectionStrategy* rs) : settings(s), strategy(rs), state(IDLE) {}
@@ -35,15 +38,21 @@
{
bool retry = false;
bool completed = false;
+ AbsTime failed;
while (!completed) {
try {
AsyncSession session = connect().newSession();
+ if (retry) {
+ Duration failoverTime(failed, AbsTime::now());
+ QPID_LOG(info, "Failed over for " << &c << " in " << (failoverTime/qpid::sys::TIME_MSEC) << " milliseconds");
+ }
c.execute(session, retry);
session.sync();//TODO: shouldn't be required
session.close();
completed = true;
} catch(const TransportFailure&) {
- retry = true;
+ retry = true;
+ failed = AbsTime::now();
}
}
}
Modified: qpid/branches/qpid-1673/qpid/cpp/src/qpid/client/SessionImpl.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-1673/qpid/cpp/src/qpid/client/SessionImpl.cpp?rev=752300&r1=752299&r2=752300&view=diff
==============================================================================
--- qpid/branches/qpid-1673/qpid/cpp/src/qpid/client/SessionImpl.cpp (original)
+++ qpid/branches/qpid-1673/qpid/cpp/src/qpid/client/SessionImpl.cpp Tue Mar 10 23:10:57 2009
@@ -512,6 +512,7 @@
if (id.getName() != _name) throw InternalErrorException("Incorrect session name");
setState(DETACHED);
QPID_LOG(info, "Session detached by peer: " << id);
+ proxy.detached(_name, DETACH_CODE_NORMAL);
}
void SessionImpl::detached(const std::string& _name, uint8_t _code) {
@@ -744,7 +745,8 @@
void SessionImpl::handleClosed()
{
- demux.close(exceptionHolder.empty() ? new ClosedException() : exceptionHolder);
+ demux.close(exceptionHolder.empty() ?
+ sys::ExceptionHolder(new ClosedException()) : exceptionHolder);
results.close();
}
Modified: qpid/branches/qpid-1673/qpid/cpp/src/qpid/client/SslConnector.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-1673/qpid/cpp/src/qpid/client/SslConnector.cpp?rev=752300&r1=752299&r2=752300&view=diff
==============================================================================
--- qpid/branches/qpid-1673/qpid/cpp/src/qpid/client/SslConnector.cpp (original)
+++ qpid/branches/qpid-1673/qpid/cpp/src/qpid/client/SslConnector.cpp Tue Mar 10 23:10:57 2009
@@ -221,6 +221,7 @@
bool ret = !closed;
if (!closed) {
closed = true;
+ aio->queueForDeletion();
poller->shutdown();
}
if (!joined && receiver.id() != Thread::current().id()) {
@@ -386,7 +387,6 @@
aio->start(poller);
d.run();
- aio->queueForDeletion();
socket.close();
} catch (const std::exception& e) {
QPID_LOG(error, e.what());
Modified: qpid/branches/qpid-1673/qpid/cpp/src/qpid/cluster/Cluster.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-1673/qpid/cpp/src/qpid/cluster/Cluster.cpp?rev=752300&r1=752299&r2=752300&view=diff
==============================================================================
--- qpid/branches/qpid-1673/qpid/cpp/src/qpid/cluster/Cluster.cpp (original)
+++ qpid/branches/qpid-1673/qpid/cpp/src/qpid/cluster/Cluster.cpp Tue Mar 10 23:10:57 2009
@@ -21,7 +21,9 @@
#include "Connection.h"
#include "UpdateClient.h"
#include "FailoverExchange.h"
+#include "UpdateExchange.h"
+#include "qpid/assert.h"
#include "qmf/org/apache/qpid/cluster/ArgsClusterStopClusterNode.h"
#include "qmf/org/apache/qpid/cluster/Package.h"
#include "qpid/broker/Broker.h"
@@ -91,9 +93,10 @@
cpg(*this),
name(settings.name),
myUrl(settings.url.empty() ? Url() : Url(settings.url)),
- myId(cpg.self()),
+ self(cpg.self()),
readMax(settings.readMax),
writeEstimate(settings.writeEstimate),
+ expiryPolicy(new ExpiryPolicy(mcast, self, broker.getTimer())),
mcast(cpg, poller, boost::bind(&Cluster::leave, this)),
dispatcher(cpg, poller, boost::bind(&Cluster::leave, this)),
deliverEventQueue(boost::bind(&Cluster::deliveredEvent, this, _1),
@@ -104,15 +107,12 @@
boost::bind(&Cluster::leave, this),
"Error delivering frames",
poller),
- connections(*this),
- decoder(boost::bind(&PollableFrameQueue::push, &deliverFrameQueue, _1), connections),
- expiryPolicy(new ExpiryPolicy(boost::bind(&Cluster::isLeader, this), mcast, myId, broker.getTimer())),
- frameId(0),
initialized(false),
+ decoder(boost::bind(&Cluster::deliverFrame, this, _1)),
+ discarding(true),
state(INIT),
lastSize(0),
- lastBroker(false),
- sequence(0)
+ lastBroker(false)
{
mAgent = ManagementAgent::Singleton::getInstance();
if (mAgent != 0){
@@ -122,7 +122,13 @@
mgmtObject->set_status("JOINING");
}
+ // Failover exchange provides membership updates to clients.
failoverExchange.reset(new FailoverExchange(this));
+ broker.getExchanges().registerExchange(failoverExchange);
+
+ // Update exchange is used during updates to replicate messages without modifying delivery-properties.exchange.
+ broker.getExchanges().registerExchange(boost::shared_ptr<broker::Exchange>(new UpdateExchange(this)));
+
if (settings.quorum) quorum.init();
cpg.join(name);
// pump the CPG dispatch manually till we get initialized.
@@ -149,21 +155,21 @@
// Called in connection thread to insert a client connection.
void Cluster::addLocalConnection(const boost::intrusive_ptr<Connection>& c) {
- Lock l(lock);
- connections.insert(c);
+ localConnections.insert(c);
}
// Called in connection thread to insert an updated shadow connection.
void Cluster::addShadowConnection(const boost::intrusive_ptr<Connection>& c) {
- Lock l(lock);
- assert(state <= UPDATEE); // Only during update.
- connections.insert(c);
+ // Safe to use connections here because we're pre-catchup, either
+ // discarding or stalled, so deliveredFrame is not processing any
+ // connection events.
+ assert(discarding);
+ connections.insert(ConnectionMap::value_type(c->getId(), c));
}
+// Called by Connection::deliverClose() in deliverFrameQueue thread.
void Cluster::erase(const ConnectionId& id) {
- // Called only by Connection::deliverClose in deliver thread, no need to lock.
connections.erase(id);
- decoder.erase(id);
}
std::vector<string> Cluster::getIds() const {
@@ -193,7 +199,6 @@
if (state != LEFT) {
state = LEFT;
QPID_LOG(notice, *this << " leaving cluster " << name);
- connections.clear();
try { broker.shutdown(); }
catch (const std::exception& e) {
QPID_LOG(critical, *this << " error during broker shutdown: " << e.what());
@@ -213,52 +218,88 @@
MemberId from(nodeid, pid);
framing::Buffer buf(static_cast<char*>(msg), msg_len);
Event e(Event::decodeCopy(from, buf));
- e.setSequence(sequence++);
- if (from == myId) // Record self-deliveries for flow control.
+ if (from == self) // Record self-deliveries for flow control.
mcast.selfDeliver(e);
- deliver(e);
+ deliverEvent(e);
}
-void Cluster::deliver(const Event& e) {
- if (state == LEFT) return;
- QPID_LATENCY_INIT(e);
+void Cluster::deliverEvent(const Event& e) {
deliverEventQueue.push(e);
}
-// Handler for deliverEventQueue
+void Cluster::deliverFrame(const EventFrame& e) {
+ deliverFrameQueue.push(e);
+}
+
+// Handler for deliverEventQueue.
+// This thread decodes frames from events.
void Cluster::deliveredEvent(const Event& e) {
- QPID_LATENCY_RECORD("delivered event queue", e);
- Buffer buf(const_cast<char*>(e.getData()), e.getSize());
- if (e.getType() == CONTROL) {
- AMQFrame frame;
- while (frame.decode(buf))
- deliverFrameQueue.push(EventFrame(e, frame));
- }
- else if (e.getType() == DATA)
- decoder.decode(e, e.getData());
+ QPID_LOG(trace, *this << " DLVR: " << e);
+ if (e.isCluster()) {
+ EventFrame ef(e, e.getFrame());
+ // Stop the deliverEventQueue on update offers.
+ // This preserves the connection decoder fragments for an update.
+ ClusterUpdateOfferBody* offer = dynamic_cast<ClusterUpdateOfferBody*>(ef.frame.getBody());
+ if (offer)
+ deliverEventQueue.stop();
+ deliverFrame(ef);
+ }
+ else if(!discarding) {
+ if (e.isControl())
+ deliverFrame(EventFrame(e, e.getFrame()));
+ else
+ decoder.decode(e, e.getData());
+}
+ else // Discard connection events if discarding is set.
+ QPID_LOG(trace, *this << " DROP: " << e);
}
-// Handler for deliverFrameQueue
+// Handler for deliverFrameQueue.
+// This thread executes the main logic.
void Cluster::deliveredFrame(const EventFrame& e) {
Mutex::ScopedLock l(lock);
- const_cast<AMQFrame&>(e.frame).setClusterId(frameId++);
- QPID_LOG(trace, *this << " DLVR: " << e);
- QPID_LATENCY_RECORD("delivered frame queue", e.frame);
- if (e.isCluster()) { // Cluster control frame
+ if (e.isCluster()) {
+ QPID_LOG(trace, *this << " DLVR: " << e);
ClusterDispatcher dispatch(*this, e.connectionId.getMember(), l);
if (!framing::invoke(dispatch, *e.frame.getBody()).wasHandled())
throw Exception(QPID_MSG("Invalid cluster control"));
}
- else { // Connection frame.
- if (state <= UPDATEE) {
- QPID_LOG(trace, *this << " DROP: " << e);
- return;
- }
- boost::intrusive_ptr<Connection> connection = connections.get(e.connectionId);
- if (connection) // Ignore frames to closed local connections.
+ else if (state >= CATCHUP) {
+ QPID_LOG(trace, *this << " DLVR: " << e);
+ ConnectionPtr connection = getConnection(e.connectionId, l);
+ if (connection)
connection->deliveredFrame(e);
}
- QPID_LATENCY_RECORD("processed", e.frame);
+ else // Drop connection frames while state < CATCHUP
+ QPID_LOG(trace, *this << " DROP: " << e);
+}
+
+// Called in deliverFrameQueue thread
+ConnectionPtr Cluster::getConnection(const ConnectionId& id, Lock&) {
+ ConnectionPtr cp;
+ ConnectionMap::iterator i = connections.find(id);
+ if (i != connections.end())
+ cp = i->second;
+ else {
+ if(id.getMember() == self)
+ cp = localConnections.getErase(id);
+ else {
+ // New remote connection, create a shadow.
+ std::ostringstream mgmtId;
+ mgmtId << id;
+ cp = new Connection(*this, shadowOut, mgmtId.str(), id);
+ }
+ if (cp)
+ connections.insert(ConnectionMap::value_type(id, cp));
+ }
+ return cp;
+}
+
+Cluster::ConnectionVector Cluster::getConnections(Lock&) {
+ ConnectionVector result(connections.size());
+ std::transform(connections.begin(), connections.end(), result.begin(),
+ boost::bind(&ConnectionMap::value_type::second, _1));
+ return result;
}
struct AddrList {
@@ -306,42 +347,45 @@
std::string addresses;
for (cpg_address* p = current; p < current+nCurrent; ++p)
addresses.append(MemberId(*p).str());
- deliver(Event::control(ClusterConfigChangeBody(ProtocolVersion(), addresses), myId));
+ deliverEvent(Event::control(ClusterConfigChangeBody(ProtocolVersion(), addresses), self));
}
void Cluster::setReady(Lock&) {
state = READY;
if (mgmtObject!=0) mgmtObject->set_status("ACTIVE");
mcast.release();
+ broker.getQueueEvents().enable();
}
void Cluster::configChange(const MemberId&, const std::string& addresses, Lock& l) {
bool memberChange = map.configChange(addresses);
if (state == LEFT) return;
- if (!map.isAlive(myId)) { // Final config change.
+ if (!map.isAlive(self)) { // Final config change.
leave(l);
return;
}
if (state == INIT) { // First configChange
if (map.aliveCount() == 1) {
- setClusterId(true);
+ setClusterId(true, l);
+ discarding = false;
setReady(l);
- map = ClusterMap(myId, myUrl, true);
+ map = ClusterMap(self, myUrl, true);
memberUpdate(l);
QPID_LOG(notice, *this << " first in cluster");
}
else { // Joining established group.
state = JOINER;
QPID_LOG(info, *this << " joining cluster: " << map);
- mcast.mcastControl(ClusterUpdateRequestBody(ProtocolVersion(), myUrl.str()), myId);
+ mcast.mcastControl(ClusterUpdateRequestBody(ProtocolVersion(), myUrl.str()), self);
elders = map.getAlive();
- elders.erase(myId);
+ elders.erase(self);
broker.getLinks().setPassive(true);
+ broker.getQueueEvents().disable();
}
- }
- else if (state >= READY && memberChange) {
+ }
+ else if (state >= CATCHUP && memberChange) {
memberUpdate(l);
elders = ClusterMap::intersection(elders, map.getAlive());
if (elders.empty()) {
@@ -351,13 +395,11 @@
}
}
-bool Cluster::isLeader() const { return elders.empty(); }
-
-void Cluster::tryMakeOffer(const MemberId& id, Lock& ) {
+void Cluster::makeOffer(const MemberId& id, Lock& ) {
if (state == READY && map.isJoiner(id)) {
state = OFFER;
QPID_LOG(info, *this << " send update-offer to " << id);
- mcast.mcastControl(ClusterUpdateOfferBody(ProtocolVersion(), id, clusterId), myId);
+ mcast.mcastControl(ClusterUpdateOfferBody(ProtocolVersion(), id, clusterId), self);
}
}
@@ -367,88 +409,89 @@
// callbacks will be invoked.
//
void Cluster::brokerShutdown() {
- if (state != LEFT) {
- try { cpg.shutdown(); }
- catch (const std::exception& e) {
- QPID_LOG(error, *this << " shutting down CPG: " << e.what());
- }
+ try { cpg.shutdown(); }
+ catch (const std::exception& e) {
+ QPID_LOG(error, *this << " shutting down CPG: " << e.what());
}
delete this;
}
void Cluster::updateRequest(const MemberId& id, const std::string& url, Lock& l) {
map.updateRequest(id, url);
- tryMakeOffer(id, l);
+ makeOffer(id, l);
}
void Cluster::ready(const MemberId& id, const std::string& url, Lock& l) {
if (map.ready(id, Url(url)))
memberUpdate(l);
- if (state == CATCHUP && id == myId) {
+ if (state == CATCHUP && id == self) {
setReady(l);
QPID_LOG(notice, *this << " caught up, active cluster member");
}
}
void Cluster::updateOffer(const MemberId& updater, uint64_t updateeInt, const Uuid& uuid, Lock& l) {
+ // NOTE: deliverEventQueue has been stopped at the update offer by
+ // deliveredEvent in case an update is required.
if (state == LEFT) return;
MemberId updatee(updateeInt);
boost::optional<Url> url = map.updateOffer(updater, updatee);
- if (updater == myId) {
+ if (updater == self) {
assert(state == OFFER);
- if (url) { // My offer was first.
+ if (url) // My offer was first.
updateStart(updatee, *url, l);
- }
else { // Another offer was first.
+ deliverEventQueue.start(); // Don't need to update
setReady(l);
QPID_LOG(info, *this << " cancelled update offer to " << updatee);
- tryMakeOffer(map.firstJoiner(), l); // Maybe make another offer.
+ makeOffer(map.firstJoiner(), l); // Maybe make another offer.
}
}
- else if (updatee == myId && url) {
+ else if (updatee == self && url) {
assert(state == JOINER);
- setClusterId(uuid);
+ setClusterId(uuid, l);
state = UPDATEE;
QPID_LOG(info, *this << " receiving update from " << updater);
- deliverFrameQueue.stop();
checkUpdateIn(l);
}
+ else
+ deliverEventQueue.start(); // Don't need to update
}
-void Cluster::updateStart(const MemberId& updatee, const Url& url, Lock&) {
+void Cluster::updateStart(const MemberId& updatee, const Url& url, Lock& l) {
+ // NOTE: deliverEventQueue is already stopped at the stall point by deliveredEvent.
if (state == LEFT) return;
assert(state == OFFER);
state = UPDATER;
- QPID_LOG(info, *this << " stall for update to " << updatee << " at " << url);
- deliverFrameQueue.stop();
- if (updateThread.id()) updateThread.join(); // Join the previous updatethread.
+ QPID_LOG(info, *this << " sending update to " << updatee << " at " << url);
+ if (updateThread.id())
+ updateThread.join(); // Join the previous updateThread to avoid leaks.
client::ConnectionSettings cs;
cs.username = settings.username;
cs.password = settings.password;
cs.mechanism = settings.mechanism;
updateThread = Thread(
- new UpdateClient(myId, updatee, url, broker, map, frameId, connections.values(),
+ new UpdateClient(self, updatee, url, broker, map, *expiryPolicy, getConnections(l), decoder,
boost::bind(&Cluster::updateOutDone, this),
boost::bind(&Cluster::updateOutError, this, _1),
cs));
}
// Called in update thread.
-void Cluster::updateInDone(const ClusterMap& m, uint64_t fid) {
+void Cluster::updateInDone(const ClusterMap& m) {
Lock l(lock);
updatedMap = m;
- frameId = fid;
checkUpdateIn(l);
}
-void Cluster::checkUpdateIn(Lock& ) {
- if (state == LEFT) return;
+void Cluster::checkUpdateIn(Lock&) {
if (state == UPDATEE && updatedMap) {
map = *updatedMap;
- mcast.mcastControl(ClusterReadyBody(ProtocolVersion(), myUrl.str()), myId);
+ mcast.mcastControl(ClusterReadyBody(ProtocolVersion(), myUrl.str()), self);
state = CATCHUP;
+ discarding = false; // ok to set, we're stalled for update.
QPID_LOG(info, *this << " received update, starting catch-up");
- deliverFrameQueue.start();
+ deliverEventQueue.start();
}
}
@@ -462,8 +505,8 @@
assert(state == UPDATER);
state = READY;
mcast.release();
- deliverFrameQueue.start();
- tryMakeOffer(map.firstJoiner(), l); // Try another offer
+ deliverEventQueue.start(); // Start processing events again.
+ makeOffer(map.firstJoiner(), l); // Try another offer
}
void Cluster::updateOutError(const std::exception& e) {
@@ -487,7 +530,7 @@
{
_qmf::ArgsClusterStopClusterNode& iargs = (_qmf::ArgsClusterStopClusterNode&) args;
stringstream stream;
- stream << myId;
+ stream << self;
if (iargs.i_brokerId == stream.str())
stopClusterNode(l);
}
@@ -508,7 +551,7 @@
void Cluster::stopFullCluster(Lock& ) {
QPID_LOG(notice, *this << " shutting down cluster " << name);
- mcast.mcastControl(ClusterShutdownBody(), myId);
+ mcast.mcastControl(ClusterShutdownBody(), self);
}
void Cluster::memberUpdate(Lock& l) {
@@ -518,13 +561,13 @@
size_t size = urls.size();
failoverExchange->setUrls(urls);
- if (size == 1 && lastSize > 1 && state >= READY) {
- QPID_LOG(info, *this << " last broker standing, update queue policies");
+ if (size == 1 && lastSize > 1 && state >= CATCHUP) {
+ QPID_LOG(notice, *this << " last broker standing, update queue policies");
lastBroker = true;
broker.getQueues().updateQueueClusterState(true);
}
else if (size > 1 && lastBroker) {
- QPID_LOG(info, *this << " last broker standing joined by " << size-1 << " replicas, updating queue policies" << size);
+ QPID_LOG(notice, *this << " last broker standing joined by " << size-1 << " replicas, updating queue policies" << size);
lastBroker = false;
broker.getQueues().updateQueueClusterState(false);
}
@@ -546,17 +589,23 @@
mgmtObject->set_memberIDs(idstr);
}
- // Close connections belonging to members that have now been excluded
- connections.update(myId, map);
+ // Erase connections belonging to members that have left the cluster.
+ ConnectionMap::iterator i = connections.begin();
+ while (i != connections.end()) {
+ ConnectionMap::iterator j = i++;
+ MemberId m = j->second->getId().getMember();
+ if (m != self && !map.isMember(m))
+ connections.erase(j);
+ }
}
std::ostream& operator<<(std::ostream& o, const Cluster& cluster) {
static const char* STATE[] = { "INIT", "JOINER", "UPDATEE", "CATCHUP", "READY", "OFFER", "UPDATER", "LEFT" };
- return o << cluster.myId << "(" << STATE[cluster.state] << ")";
+ return o << cluster.self << "(" << STATE[cluster.state] << ")";
}
MemberId Cluster::getId() const {
- return myId; // Immutable, no need to lock.
+ return self; // Immutable, no need to lock.
}
broker::Broker& Cluster::getBroker() const {
@@ -571,11 +620,11 @@
}
}
-void Cluster::setClusterId(const Uuid& uuid) {
+void Cluster::setClusterId(const Uuid& uuid, Lock&) {
clusterId = uuid;
if (mgmtObject) {
stringstream stream;
- stream << myId;
+ stream << self;
mgmtObject->set_clusterID(clusterId.str());
mgmtObject->set_memberID(stream.str());
}
Modified: qpid/branches/qpid-1673/qpid/cpp/src/qpid/cluster/Cluster.h
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-1673/qpid/cpp/src/qpid/cluster/Cluster.h?rev=752300&r1=752299&r2=752300&view=diff
==============================================================================
--- qpid/branches/qpid-1673/qpid/cpp/src/qpid/cluster/Cluster.h (original)
+++ qpid/branches/qpid-1673/qpid/cpp/src/qpid/cluster/Cluster.h Tue Mar 10 23:10:57 2009
@@ -19,34 +19,34 @@
*
*/
-#include "ClusterSettings.h"
#include "ClusterMap.h"
-#include "ConnectionMap.h"
+#include "ClusterSettings.h"
#include "Cpg.h"
+#include "Decoder.h"
#include "Event.h"
+#include "EventFrame.h"
+#include "ExpiryPolicy.h"
#include "FailoverExchange.h"
+#include "LockedConnectionMap.h"
#include "Multicaster.h"
-#include "EventFrame.h"
#include "NoOpConnectionOutputHandler.h"
+#include "PollableQueue.h"
#include "PollerDispatch.h"
#include "Quorum.h"
-#include "Decoder.h"
-#include "PollableQueue.h"
-#include "ExpiryPolicy.h"
+#include "qmf/org/apache/qpid/cluster/Cluster.h"
+#include "qpid/Url.h"
#include "qpid/broker/Broker.h"
-#include "qpid/sys/Monitor.h"
#include "qpid/management/Manageable.h"
-#include "qpid/Url.h"
-#include "qmf/org/apache/qpid/cluster/Cluster.h"
+#include "qpid/sys/Monitor.h"
-#include <boost/intrusive_ptr.hpp>
#include <boost/bind.hpp>
+#include <boost/intrusive_ptr.hpp>
#include <boost/optional.hpp>
#include <algorithm>
-#include <vector>
#include <map>
+#include <vector>
namespace qpid {
@@ -58,6 +58,7 @@
namespace cluster {
class Connection;
+class EventFrame;
/**
* Connection to the cluster
@@ -65,82 +66,91 @@
class Cluster : private Cpg::Handler, public management::Manageable {
public:
typedef boost::intrusive_ptr<Connection> ConnectionPtr;
- typedef std::vector<ConnectionPtr> Connections;
+ typedef std::vector<ConnectionPtr> ConnectionVector;
- /** Construct the cluster in plugin earlyInitialize */
+ // Public functions are thread safe unless otherwise mentioned in a comment.
+
+ // Construct the cluster in plugin earlyInitialize.
Cluster(const ClusterSettings&, broker::Broker&);
virtual ~Cluster();
- /** Join the cluster in plugin initialize. Requires transport
- * plugins to be available.. */
+ // Called by plugin initialize: cluster start-up requires transport plugins .
+ // Thread safety: only called by plugin initialize.
void initialize();
- // Connection map - called in connection threads.
+ // Connection map.
void addLocalConnection(const ConnectionPtr&);
void addShadowConnection(const ConnectionPtr&);
void erase(const ConnectionId&);
- // URLs of current cluster members - called in connection threads.
+ // URLs of current cluster members.
std::vector<std::string> getIds() const;
std::vector<Url> getUrls() const;
boost::shared_ptr<FailoverExchange> getFailoverExchange() const { return failoverExchange; }
- // Leave the cluster - called in any thread.
+ // Leave the cluster - called when fatal errors occur.
void leave();
// Update completed - called in update thread
- void updateInDone(const ClusterMap&, uint64_t frameId);
+ void updateInDone(const ClusterMap&);
MemberId getId() const;
broker::Broker& getBroker() const;
Multicaster& getMulticast() { return mcast; }
- boost::function<bool ()> isQuorate;
- void checkQuorum(); // called in connection threads.
+ void checkQuorum();
size_t getReadMax() { return readMax; }
size_t getWriteEstimate() { return writeEstimate; }
- bool isLeader() const; // Called in deliver thread.
+ void deliverFrame(const EventFrame&);
+
+ // Called only during update by Connection::shadowReady
+ Decoder& getDecoder() { return decoder; }
+
+ ExpiryPolicy& getExpiryPolicy() { return *expiryPolicy; }
private:
typedef sys::Monitor::ScopedLock Lock;
typedef PollableQueue<Event> PollableEventQueue;
typedef PollableQueue<EventFrame> PollableFrameQueue;
+ typedef std::map<ConnectionId, ConnectionPtr> ConnectionMap;
- // NB: The final Lock& parameter on functions below is used to mark functions
- // that should only be called by a function that already holds the lock.
- // The parameter makes it hard to forget since you have to have an instance of
- // a Lock to call the unlocked functions.
-
+ // NB: A dummy Lock& parameter marks functions that must only be
+ // called with Cluster::lock locked.
+
void leave(Lock&);
std::vector<std::string> getIds(Lock&) const;
std::vector<Url> getUrls(Lock&) const;
- // Make an offer if we can - called in deliver thread.
- void tryMakeOffer(const MemberId&, Lock&);
-
- // Called in main thread in ~Broker.
+ // == Called in main thread from Broker destructor.
void brokerShutdown();
+ // == Called in deliverEventQueue thread
+ void deliveredEvent(const Event&);
+
+ // == Called in deliverFrameQueue thread
+ void deliveredFrame(const EventFrame&);
+
// Cluster controls implement XML methods from cluster.xml.
- // Called in deliver thread.
- //
void updateRequest(const MemberId&, const std::string&, Lock&);
void updateOffer(const MemberId& updater, uint64_t updatee, const framing::Uuid&, Lock&);
void ready(const MemberId&, const std::string&, Lock&);
void configChange(const MemberId&, const std::string& addresses, Lock& l);
void messageExpired(const MemberId&, uint64_t, Lock& l);
void shutdown(const MemberId&, Lock&);
- void deliveredEvent(const Event&);
- void deliveredFrame(const EventFrame&);
- // Helper, called in deliver thread.
+ // Helper functions
+ ConnectionPtr getConnection(const ConnectionId&, Lock&);
+ ConnectionVector getConnections(Lock&);
void updateStart(const MemberId& updatee, const Url& url, Lock&);
-
+ void makeOffer(const MemberId&, Lock&);
void setReady(Lock&);
+ void memberUpdate(Lock&);
+ void setClusterId(const framing::Uuid&, Lock&);
+ // == Called in CPG dispatch thread
void deliver( // CPG deliver callback.
cpg_handle_t /*handle*/,
struct cpg_name *group,
@@ -149,7 +159,7 @@
void* /*msg*/,
int /*msg_len*/);
- void deliver(const Event&);
+ void deliverEvent(const Event&);
void configChange( // CPG config change callback.
cpg_handle_t /*handle*/,
@@ -159,23 +169,21 @@
struct cpg_address */*joined*/, int /*nJoined*/
);
+ // == Called in management threads.
virtual qpid::management::ManagementObject* GetManagementObject() const;
virtual management::Manageable::status_t ManagementMethod (uint32_t methodId, management::Args& args, std::string& text);
void stopClusterNode(Lock&);
void stopFullCluster(Lock&);
- void memberUpdate(Lock&);
- // Called in connection IO threads .
+ // == Called in connection IO threads .
void checkUpdateIn(Lock&);
- // Called in UpdateClient thread.
+ // == Called in UpdateClient thread.
void updateOutDone();
void updateOutError(const std::exception&);
void updateOutDone(Lock&);
- void setClusterId(const framing::Uuid&);
-
// Immutable members set on construction, never changed.
ClusterSettings settings;
broker::Broker& broker;
@@ -184,34 +192,38 @@
Cpg cpg;
const std::string name;
Url myUrl;
- const MemberId myId;
+ const MemberId self;
const size_t readMax;
const size_t writeEstimate;
framing::Uuid clusterId;
NoOpConnectionOutputHandler shadowOut;
qpid::management::ManagementAgent* mAgent;
+ boost::intrusive_ptr<ExpiryPolicy> expiryPolicy;
// Thread safe members
Multicaster mcast;
PollerDispatch dispatcher;
PollableEventQueue deliverEventQueue;
PollableFrameQueue deliverFrameQueue;
- ConnectionMap connections;
boost::shared_ptr<FailoverExchange> failoverExchange;
Quorum quorum;
-
- // Used only in delivery thread
- Decoder decoder;
- ClusterMap::Set elders;
- boost::intrusive_ptr<ExpiryPolicy> expiryPolicy;
- uint64_t frameId;
+ LockedConnectionMap localConnections;
// Used only during initialization
bool initialized;
- // Remaining members are protected by lock
+ // Used only in deliverEventQueue thread or when stalled for update.
+ Decoder decoder;
+ bool discarding;
+
+ // Remaining members are protected by lock.
+ // FIXME aconway 2009-03-06: Most of these members are also only used in
+ // deliverFrameQueue thread or during stall. Review and separate members
+ // that require a lock, drop lock when not needed.
+ //
mutable sys::Monitor lock;
+
// Local cluster state, cluster map
enum {
INIT, ///< Initial state, no CPG messages received.
@@ -223,15 +235,16 @@
UPDATER, ///< Offer accepted, sending a state update.
LEFT ///< Final state, left the cluster.
} state;
+
+ ConnectionMap connections;
ClusterMap map;
+ ClusterMap::Set elders;
size_t lastSize;
bool lastBroker;
- uint64_t sequence;
-
- // Update related
sys::Thread updateThread;
boost::optional<ClusterMap> updatedMap;
+
friend std::ostream& operator<<(std::ostream&, const Cluster&);
friend class ClusterDispatcher;
};
Modified: qpid/branches/qpid-1673/qpid/cpp/src/qpid/cluster/ClusterPlugin.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-1673/qpid/cpp/src/qpid/cluster/ClusterPlugin.cpp?rev=752300&r1=752299&r2=752300&view=diff
==============================================================================
--- qpid/branches/qpid-1673/qpid/cpp/src/qpid/cluster/ClusterPlugin.cpp (original)
+++ qpid/branches/qpid-1673/qpid/cpp/src/qpid/cluster/ClusterPlugin.cpp Tue Mar 10 23:10:57 2009
@@ -138,7 +138,6 @@
broker->setConnectionFactory(
boost::shared_ptr<sys::ConnectionCodec::Factory>(
new ConnectionCodec::Factory(broker->getConnectionFactory(), *cluster)));
- broker->getExchanges().registerExchange(cluster->getFailoverExchange());
ManagementBroker* mgmt = dynamic_cast<ManagementBroker*>(ManagementAgent::Singleton::getInstance());
if (mgmt) {
std::auto_ptr<IdAllocator> allocator(new UpdateClientIdAllocator());
Modified: qpid/branches/qpid-1673/qpid/cpp/src/qpid/cluster/ClusterSettings.h
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-1673/qpid/cpp/src/qpid/cluster/ClusterSettings.h?rev=752300&r1=752299&r2=752300&view=diff
==============================================================================
--- qpid/branches/qpid-1673/qpid/cpp/src/qpid/cluster/ClusterSettings.h (original)
+++ qpid/branches/qpid-1673/qpid/cpp/src/qpid/cluster/ClusterSettings.h Tue Mar 10 23:10:57 2009
@@ -35,7 +35,7 @@
size_t readMax, writeEstimate;
std::string username, password, mechanism;
- ClusterSettings() : quorum(false), readMax(10), writeEstimate(64), username("guest"), password("guest") {}
+ ClusterSettings() : quorum(false), readMax(10), writeEstimate(64) {}
Url getUrl(uint16_t port) const {
if (url.empty()) return Url::getIpAddressesUrl(port);
Modified: qpid/branches/qpid-1673/qpid/cpp/src/qpid/cluster/Connection.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-1673/qpid/cpp/src/qpid/cluster/Connection.cpp?rev=752300&r1=752299&r2=752300&view=diff
==============================================================================
--- qpid/branches/qpid-1673/qpid/cpp/src/qpid/cluster/Connection.cpp (original)
+++ qpid/branches/qpid-1673/qpid/cpp/src/qpid/cluster/Connection.cpp Tue Mar 10 23:10:57 2009
@@ -40,6 +40,7 @@
#include "qpid/framing/ConnectionCloseOkBody.h"
#include "qpid/log/Statement.h"
#include "qpid/sys/LatencyMetric.h"
+#include "qpid/sys/AtomicValue.h"
#include <boost/current_function.hpp>
@@ -58,27 +59,36 @@
NoOpConnectionOutputHandler Connection::discardHandler;
-// Shadow connections
-Connection::Connection(Cluster& c, sys::ConnectionOutputHandler& out,
- const std::string& wrappedId, ConnectionId myId)
- : cluster(c), self(myId), catchUp(false), output(*this, out),
- connection(&output, cluster.getBroker(), wrappedId), expectProtocolHeader(false)
+namespace {
+sys::AtomicValue<uint64_t> idCounter;
+}
+
+// Shadow connection
+Connection::Connection(Cluster& c, sys::ConnectionOutputHandler& out, const std::string& logId, const ConnectionId& id)
+ : cluster(c), self(id), catchUp(false), output(*this, out),
+ connection(&output, cluster.getBroker(), logId), expectProtocolHeader(false),
+ mcastFrameHandler(cluster.getMulticast(), self)
{ init(); }
-// Local connections
+// Local connection
Connection::Connection(Cluster& c, sys::ConnectionOutputHandler& out,
- const std::string& wrappedId, MemberId myId, bool isCatchUp, bool isLink)
- : cluster(c), self(myId, this), catchUp(isCatchUp), output(*this, out),
- connection(&output, cluster.getBroker(), wrappedId, isLink, catchUp ? ++catchUpId : 0),
- expectProtocolHeader(isLink)
+ const std::string& logId, MemberId member, bool isCatchUp, bool isLink)
+ : cluster(c), self(member, ++idCounter), catchUp(isCatchUp), output(*this, out),
+ connection(&output, cluster.getBroker(), logId, isLink, catchUp ? ++catchUpId : 0),
+ expectProtocolHeader(isLink), mcastFrameHandler(cluster.getMulticast(), self)
{ init(); }
void Connection::init() {
QPID_LOG(debug, cluster << " new connection: " << *this);
- if (isLocalClient()) {
+ if (isLocalClient()) {
+ connection.setClusterOrderOutput(mcastFrameHandler); // Actively send cluster-order frames from local node
cluster.addLocalConnection(this);
giveReadCredit(cluster.getReadMax());
}
+ else { // Shadow or catch-up connection
+ connection.setClusterOrderOutput(nullFrameHandler); // Passive, discard cluster-order frames
+ connection.setClientThrottling(false); // Disable client throttling, done by active node.
+ }
}
void Connection::giveReadCredit(int credit) {
@@ -140,10 +150,16 @@
void Connection::deliveredFrame(const EventFrame& f) {
assert(!catchUp);
currentChannel = f.frame.getChannel();
- if (!framing::invoke(*this, *f.frame.getBody()).wasHandled() // Connection contol.
+ if (f.frame.getBody() // frame can be emtpy with just readCredit
+ && !framing::invoke(*this, *f.frame.getBody()).wasHandled() // Connection contol.
&& !checkUnsupported(*f.frame.getBody())) // Unsupported operation.
{
- connection.received(const_cast<AMQFrame&>(f.frame)); // Pass to broker connection.
+ if (f.type == DATA) // incoming data frames to broker::Connection
+ connection.received(const_cast<AMQFrame&>(f.frame));
+ else { // frame control, send frame via SessionState
+ broker::SessionState* ss = connection.getChannel(f.frame.getChannel()).getSession();
+ if (ss) ss->out(const_cast<AMQFrame&>(f.frame));
+ }
}
giveReadCredit(f.readCredit);
}
@@ -186,12 +202,12 @@
connection.closed();
}
-// Decode data from local clients.
+// ConnectoinCodec::decode receives read buffers from directly-connected clients.
size_t Connection::decode(const char* buffer, size_t size) {
if (catchUp) { // Handle catch-up locally.
Buffer buf(const_cast<char*>(buffer), size);
while (localDecoder.decode(buf))
- received(localDecoder.frame);
+ received(localDecoder.getFrame());
}
else { // Multicast local connections.
assert(isLocal());
@@ -242,6 +258,7 @@
const SequenceSet& unknownCompleted,
const SequenceSet& receivedIncomplete)
{
+
sessionState().setState(
replayStart,
sendCommandPoint,
@@ -253,21 +270,23 @@
QPID_LOG(debug, cluster << " received session state update for " << sessionState().getId());
}
-void Connection::shadowReady(uint64_t memberId, uint64_t connectionId, const string& username) {
- ConnectionId shadow = ConnectionId(memberId, connectionId);
- QPID_LOG(debug, cluster << " catch-up connection " << *this << " becomes shadow " << shadow);
- self = shadow;
+void Connection::shadowReady(uint64_t memberId, uint64_t connectionId, const string& username, const string& fragment) {
+ ConnectionId shadowId = ConnectionId(memberId, connectionId);
+ QPID_LOG(debug, cluster << " catch-up connection " << *this << " becomes shadow " << shadowId);
+ self = shadowId;
connection.setUserId(username);
+ // OK to use decoder here because we are stalled for update.
+ cluster.getDecoder().get(self).setFragment(fragment.data(), fragment.size());
}
-void Connection::membership(const FieldTable& joiners, const FieldTable& members, uint64_t frameId) {
+void Connection::membership(const FieldTable& joiners, const FieldTable& members) {
QPID_LOG(debug, cluster << " incoming update complete on connection " << *this);
- cluster.updateInDone(ClusterMap(joiners, members), frameId);
+ cluster.updateInDone(ClusterMap(joiners, members));
self.second = 0; // Mark this as completed update connection.
}
bool Connection::isLocal() const {
- return self.first == cluster.getId() && self.second == this;
+ return self.first == cluster.getId() && self.second;
}
bool Connection::isShadow() const {
@@ -333,6 +352,10 @@
q->setPosition(position);
}
+void Connection::expiryId(uint64_t id) {
+ cluster.getExpiryPolicy().setId(id);
+}
+
std::ostream& operator<<(std::ostream& o, const Connection& c) {
const char* type="unknown";
if (c.isLocal()) type = "local";
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org