You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ai...@apache.org on 2009/10/12 01:22:20 UTC
svn commit: r824198 [3/9] - in /qpid/branches/java-network-refactor: ./
qpid/cpp/ qpid/cpp/bindings/qmf/ qpid/cpp/bindings/qmf/python/
qpid/cpp/bindings/qmf/ruby/ qpid/cpp/bindings/qmf/tests/
qpid/cpp/boost-1.32-support/ qpid/cpp/etc/ qpid/cpp/examples...
Modified: qpid/branches/java-network-refactor/qpid/cpp/src/qpid/broker/PersistableMessage.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-network-refactor/qpid/cpp/src/qpid/broker/PersistableMessage.cpp?rev=824198&r1=824197&r2=824198&view=diff
==============================================================================
--- qpid/branches/java-network-refactor/qpid/cpp/src/qpid/broker/PersistableMessage.cpp (original)
+++ qpid/branches/java-network-refactor/qpid/cpp/src/qpid/broker/PersistableMessage.cpp Sun Oct 11 23:22:08 2009
@@ -36,7 +36,6 @@
PersistableMessage::PersistableMessage() :
asyncEnqueueCounter(0),
asyncDequeueCounter(0),
- contentReleased(false),
store(0)
{}
@@ -59,9 +58,15 @@
}
}
-void PersistableMessage::setContentReleased() {contentReleased = true; }
+void PersistableMessage::setContentReleased()
+{
+ contentReleaseState.released = true;
+}
-bool PersistableMessage::isContentReleased()const { return contentReleased; }
+bool PersistableMessage::isContentReleased() const
+{
+ return contentReleaseState.released;
+}
bool PersistableMessage::isEnqueueComplete() {
sys::ScopedLock<sys::Mutex> l(asyncEnqueueLock);
@@ -153,6 +158,26 @@
asyncDequeueCounter++;
}
+PersistableMessage::ContentReleaseState::ContentReleaseState() : blocked(false), requested(false), released(false) {}
+
+void PersistableMessage::setStore(MessageStore* s)
+{
+ store = s;
+}
+
+void PersistableMessage::requestContentRelease()
+{
+ contentReleaseState.requested = true;
+}
+void PersistableMessage::blockContentRelease()
+{
+ contentReleaseState.blocked = true;
+}
+bool PersistableMessage::checkContentReleasable()
+{
+ return contentReleaseState.requested && !contentReleaseState.blocked;
+}
+
}}
Modified: qpid/branches/java-network-refactor/qpid/cpp/src/qpid/broker/PersistableMessage.h
URL: http://svn.apache.org/viewvc/qpid/branches/java-network-refactor/qpid/cpp/src/qpid/broker/PersistableMessage.h?rev=824198&r1=824197&r2=824198&view=diff
==============================================================================
--- qpid/branches/java-network-refactor/qpid/cpp/src/qpid/broker/PersistableMessage.h (original)
+++ qpid/branches/java-network-refactor/qpid/cpp/src/qpid/broker/PersistableMessage.h Sun Oct 11 23:22:08 2009
@@ -68,8 +68,16 @@
void enqueueAsync();
void dequeueAsync();
- bool contentReleased;
syncList synclist;
+ struct ContentReleaseState
+ {
+ bool blocked;
+ bool requested;
+ bool released;
+
+ ContentReleaseState();
+ };
+ ContentReleaseState contentReleaseState;
protected:
/** Called when all enqueues are complete for this message. */
@@ -96,8 +104,15 @@
void flush();
- bool isContentReleased() const;
-
+ bool QPID_BROKER_EXTERN isContentReleased() const;
+
+ void QPID_BROKER_EXTERN setStore(MessageStore*);
+ void requestContentRelease();
+ void blockContentRelease();
+ bool checkContentReleasable();
+
+ virtual QPID_BROKER_EXTERN bool isPersistent() const = 0;
+
QPID_BROKER_EXTERN bool isEnqueueComplete();
QPID_BROKER_EXTERN void enqueueComplete();
Modified: qpid/branches/java-network-refactor/qpid/cpp/src/qpid/broker/Queue.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-network-refactor/qpid/cpp/src/qpid/broker/Queue.cpp?rev=824198&r1=824197&r2=824198&view=diff
==============================================================================
--- qpid/branches/java-network-refactor/qpid/cpp/src/qpid/broker/Queue.cpp (original)
+++ qpid/branches/java-network-refactor/qpid/cpp/src/qpid/broker/Queue.cpp Sun Oct 11 23:22:08 2009
@@ -181,6 +181,8 @@
void Queue::recover(boost::intrusive_ptr<Message>& msg){
+ if (policy.get()) policy->recoverEnqueued(msg);
+
push(msg, true);
if (store){
// setup synclist for recovered messages, so they don't get re-stored on lastNodeFailure
@@ -206,11 +208,10 @@
}
void Queue::requeue(const QueuedMessage& msg){
- if (!isEnqueued(msg)) return;
-
QueueListeners::NotificationSet copy;
{
Mutex::ScopedLock locker(messageLock);
+ if (!isEnqueued(msg)) return;
msg.payload->enqueueComplete(); // mark the message as enqueued
messages.push_front(msg);
listeners.populate(copy);
@@ -563,16 +564,10 @@
}
void Queue::push(boost::intrusive_ptr<Message>& msg, bool isRecovery){
- Messages dequeues;
QueueListeners::NotificationSet copy;
{
Mutex::ScopedLock locker(messageLock);
QueuedMessage qm(this, msg, ++sequence);
- if (policy.get()) {
- policy->tryEnqueue(qm);
- //depending on policy, may have some dequeues
- if (!isRecovery) pendingDequeues.swap(dequeues);
- }
if (insertSeqNo) msg->getOrInsertHeaders().setInt64(seqNoKey, sequence);
LVQ::iterator i;
@@ -606,12 +601,11 @@
if (eventMgr) eventMgr->enqueued(qm);
else QPID_LOG(warning, "Enqueue manager not set, events not generated for " << getName());
}
+ if (policy.get()) {
+ policy->enqueued(qm);
+ }
}
copy.notify();
- if (!dequeues.empty()) {
- //depending on policy, may have some dequeues
- for_each(dequeues.begin(), dequeues.end(), boost::bind(&Queue::dequeue, this, (TransactionContext*) 0, _1));
- }
}
QueuedMessage Queue::getFront()
@@ -697,8 +691,19 @@
}
// return true if store exists,
-bool Queue::enqueue(TransactionContext* ctxt, boost::intrusive_ptr<Message> msg)
+bool Queue::enqueue(TransactionContext* ctxt, boost::intrusive_ptr<Message> msg, bool suppressPolicyCheck)
{
+ if (policy.get() && !suppressPolicyCheck) {
+ Messages dequeues;
+ {
+ Mutex::ScopedLock locker(messageLock);
+ policy->tryEnqueue(msg);
+ policy->getPendingDequeues(dequeues);
+ }
+ //depending on policy, may have some dequeues that need to performed without holding the lock
+ for_each(dequeues.begin(), dequeues.end(), boost::bind(&Queue::dequeue, this, (TransactionContext*) 0, _1));
+ }
+
if (inLastNodeFailure && persistLastNode){
msg->forcePersistent();
}
@@ -707,15 +712,27 @@
msg->addTraceId(traceId);
}
- if (msg->isPersistent() && store) {
+ if ((msg->isPersistent() || msg->checkContentReleasable()) && store) {
msg->enqueueAsync(shared_from_this(), store); //increment to async counter -- for message sent to more than one queue
boost::intrusive_ptr<PersistableMessage> pmsg = boost::static_pointer_cast<PersistableMessage>(msg);
store->enqueue(ctxt, pmsg, *this);
return true;
}
+ if (!store) {
+ //Messages enqueued on a transient queue should be prevented
+ //from having their content released as it may not be
+ //recoverable by these queue for delivery
+ msg->blockContentRelease();
+ }
return false;
}
+void Queue::enqueueAborted(boost::intrusive_ptr<Message> msg)
+{
+ Mutex::ScopedLock locker(messageLock);
+ if (policy.get()) policy->enqueueAborted(msg);
+}
+
// return true if store exists,
bool Queue::dequeue(TransactionContext* ctxt, const QueuedMessage& msg)
{
@@ -726,7 +743,7 @@
dequeued(msg);
}
}
- if (msg.payload->isPersistent() && store) {
+ if ((msg.payload->isPersistent() || msg.payload->checkContentReleasable()) && store) {
msg.payload->dequeueAsync(shared_from_this(), store); //increment to async counter -- for message sent to more than one queue
boost::intrusive_ptr<PersistableMessage> pmsg = boost::static_pointer_cast<PersistableMessage>(msg.payload);
store->dequeue(ctxt, pmsg, *this);
@@ -781,22 +798,37 @@
void Queue::configure(const FieldTable& _settings, bool recovering)
{
- setPolicy(QueuePolicy::createQueuePolicy(_settings));
+
+ eventMode = _settings.getAsInt(qpidQueueEventGeneration);
+
+ if (QueuePolicy::getType(_settings) == QueuePolicy::FLOW_TO_DISK &&
+ (!store || NullMessageStore::isNullStore(store) || (eventMode && eventMgr && !eventMgr->isSync()) )) {
+ if ( NullMessageStore::isNullStore(store)) {
+ QPID_LOG(warning, "Flow to disk not valid for non-persisted queue:" << getName());
+ } else if (eventMgr && !eventMgr->isSync() ) {
+ QPID_LOG(warning, "Flow to disk not valid with async Queue Events:" << getName());
+ }
+ FieldTable copy(_settings);
+ copy.erase(QueuePolicy::typeKey);
+ setPolicy(QueuePolicy::createQueuePolicy(getName(), copy));
+ } else {
+ setPolicy(QueuePolicy::createQueuePolicy(getName(), _settings));
+ }
//set this regardless of owner to allow use of no-local with exclusive consumers also
noLocal = _settings.get(qpidNoLocal);
- QPID_LOG(debug, "Configured queue with no-local=" << noLocal);
+ QPID_LOG(debug, "Configured queue " << getName() << " with no-local=" << noLocal);
lastValueQueue= _settings.get(qpidLastValueQueue);
- if (lastValueQueue) QPID_LOG(debug, "Configured queue as Last Value Queue");
+ if (lastValueQueue) QPID_LOG(debug, "Configured queue as Last Value Queue for: " << getName());
lastValueQueueNoBrowse = _settings.get(qpidLastValueQueueNoBrowse);
if (lastValueQueueNoBrowse){
- QPID_LOG(debug, "Configured queue as Last Value Queue No Browse");
+ QPID_LOG(debug, "Configured queue as Last Value Queue No Browse for: " << getName());
lastValueQueue = lastValueQueueNoBrowse;
}
persistLastNode= _settings.get(qpidPersistLastNode);
- if (persistLastNode) QPID_LOG(debug, "Configured queue to Persist data if cluster fails to one node");
+ if (persistLastNode) QPID_LOG(debug, "Configured queue to Persist data if cluster fails to one node for: " << getName());
traceId = _settings.getAsString(qpidTraceIdentity);
std::string excludeList = _settings.getAsString(qpidTraceExclude);
@@ -806,8 +838,6 @@
QPID_LOG(debug, "Configured queue " << getName() << " with qpid.trace.id='" << traceId
<< "' and qpid.trace.exclude='"<< excludeList << "' i.e. " << traceExclude.size() << " elements");
- eventMode = _settings.getAsInt(qpidQueueEventGeneration);
-
FieldTable::ValuePtr p =_settings.get(qpidInsertSequenceNumbers);
if (p && p->convertsTo<std::string>()) insertSequenceNumbers(p->get<std::string>());
@@ -975,19 +1005,6 @@
}
}
-bool Queue::releaseMessageContent(const QueuedMessage& m)
-{
- if (store && !NullMessageStore::isNullStore(store)) {
- QPID_LOG(debug, "Message " << m.position << " on " << name << " released from memory");
- m.payload->releaseContent(store);
- return true;
- } else {
- QPID_LOG(warning, "Message " << m.position << " on " << name
- << " cannot be released from memory as the queue is not durable");
- return false;
- }
-}
-
ManagementObject* Queue::GetManagementObject (void) const
{
return (ManagementObject*) mgmtObject;
@@ -1044,11 +1061,12 @@
void Queue::enqueued(const QueuedMessage& m)
{
if (m.payload) {
- if (policy.get()) policy->tryEnqueue(m);
- mgntEnqStats(m.payload);
- if (m.payload->isPersistent()) {
- enqueue ( 0, m.payload );
+ if (policy.get()) {
+ policy->recoverEnqueued(m.payload);
+ policy->enqueued(m);
}
+ mgntEnqStats(m.payload);
+ enqueue ( 0, m.payload, true );
} else {
QPID_LOG(warning, "Queue informed of enqueued message that has no payload");
}
@@ -1059,10 +1077,4 @@
return !policy.get() || policy->isEnqueued(msg);
}
-void Queue::addPendingDequeue(const QueuedMessage& msg)
-{
- //assumes lock is held - true at present but rather nasty as this is a public method
- pendingDequeues.push_back(msg);
-}
-
QueueListeners& Queue::getListeners() { return listeners; }
Modified: qpid/branches/java-network-refactor/qpid/cpp/src/qpid/broker/Queue.h
URL: http://svn.apache.org/viewvc/qpid/branches/java-network-refactor/qpid/cpp/src/qpid/broker/Queue.h?rev=824198&r1=824197&r2=824198&view=diff
==============================================================================
--- qpid/branches/java-network-refactor/qpid/cpp/src/qpid/broker/Queue.h (original)
+++ qpid/branches/java-network-refactor/qpid/cpp/src/qpid/broker/Queue.h Sun Oct 11 23:22:08 2009
@@ -239,7 +239,8 @@
QPID_BROKER_EXTERN void setLastNodeFailure();
QPID_BROKER_EXTERN void clearLastNodeFailure();
- bool enqueue(TransactionContext* ctxt, boost::intrusive_ptr<Message> msg);
+ bool enqueue(TransactionContext* ctxt, boost::intrusive_ptr<Message> msg, bool suppressPolicyCheck = false);
+ void enqueueAborted(boost::intrusive_ptr<Message> msg);
/**
* dequeue from store (only done once messages is acknowledged)
*/
@@ -315,8 +316,6 @@
bindings.eachBinding(f);
}
- bool releaseMessageContent(const QueuedMessage&);
-
void popMsg(QueuedMessage& qmsg);
/** Set the position sequence number for the next message on the queue.
@@ -335,18 +334,6 @@
*/
void recoveryComplete();
- /**
- * This is a hack to avoid deadlocks in durable ring
- * queues. It is used for dequeueing messages in response
- * to an enqueue while avoid holding lock over call to
- * store.
- *
- * Assumes messageLock is held - true for curent use case
- * (QueuePolicy::tryEnqueue()) but rather nasty as this is a public
- * method
- **/
- void addPendingDequeue(const QueuedMessage &msg);
-
// For cluster update
QueueListeners& getListeners();
};
Modified: qpid/branches/java-network-refactor/qpid/cpp/src/qpid/broker/QueueEvents.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-network-refactor/qpid/cpp/src/qpid/broker/QueueEvents.cpp?rev=824198&r1=824197&r2=824198&view=diff
==============================================================================
--- qpid/branches/java-network-refactor/qpid/cpp/src/qpid/broker/QueueEvents.cpp (original)
+++ qpid/branches/java-network-refactor/qpid/cpp/src/qpid/broker/QueueEvents.cpp Sun Oct 11 23:22:08 2009
@@ -25,25 +25,41 @@
namespace qpid {
namespace broker {
-QueueEvents::QueueEvents(const boost::shared_ptr<sys::Poller>& poller) :
- eventQueue(boost::bind(&QueueEvents::handle, this, _1), poller), enabled(true)
+QueueEvents::QueueEvents(const boost::shared_ptr<sys::Poller>& poller, bool isSync) :
+ eventQueue(boost::bind(&QueueEvents::handle, this, _1), poller), enabled(true), sync(isSync)
{
- eventQueue.start();
+ if (!sync) eventQueue.start();
}
QueueEvents::~QueueEvents()
{
- eventQueue.stop();
+ if (!sync) eventQueue.stop();
}
void QueueEvents::enqueued(const QueuedMessage& m)
{
- if (enabled) eventQueue.push(Event(ENQUEUE, m));
+ if (enabled) {
+ Event enq(ENQUEUE, m);
+ if (sync) {
+ for (Listeners::iterator j = listeners.begin(); j != listeners.end(); j++)
+ j->second(enq);
+ } else {
+ eventQueue.push(enq);
+ }
+ }
}
void QueueEvents::dequeued(const QueuedMessage& m)
{
- if (enabled) eventQueue.push(Event(DEQUEUE, m));
+ if (enabled) {
+ Event deq(DEQUEUE, m);
+ if (sync) {
+ for (Listeners::iterator j = listeners.begin(); j != listeners.end(); j++)
+ j->second(deq);
+ } else {
+ eventQueue.push(Event(DEQUEUE, m));
+ }
+ }
}
void QueueEvents::registerListener(const std::string& id, const EventListener& listener)
@@ -70,15 +86,16 @@
QueueEvents::handle(const EventQueue::Batch& events) {
qpid::sys::Mutex::ScopedLock l(lock);
for (EventQueue::Batch::const_iterator i = events.begin(); i != events.end(); ++i) {
- for (Listeners::iterator j = listeners.begin(); j != listeners.end(); j++)
- j->second(*i);
+ for (Listeners::iterator j = listeners.begin(); j != listeners.end(); j++) {
+ j->second(*i);
+ }
}
return events.end();
}
void QueueEvents::shutdown()
{
- if (!eventQueue.empty() && !listeners.empty()) eventQueue.shutdown();
+ if (!sync && !eventQueue.empty() && !listeners.empty()) eventQueue.shutdown();
}
void QueueEvents::enable()
@@ -93,6 +110,12 @@
QPID_LOG(debug, "Queue events disabled");
}
+bool QueueEvents::isSync()
+{
+ return sync;
+}
+
+
QueueEvents::Event::Event(EventType t, const QueuedMessage& m) : type(t), msg(m) {}
Modified: qpid/branches/java-network-refactor/qpid/cpp/src/qpid/broker/QueueEvents.h
URL: http://svn.apache.org/viewvc/qpid/branches/java-network-refactor/qpid/cpp/src/qpid/broker/QueueEvents.h?rev=824198&r1=824197&r2=824198&view=diff
==============================================================================
--- qpid/branches/java-network-refactor/qpid/cpp/src/qpid/broker/QueueEvents.h (original)
+++ qpid/branches/java-network-refactor/qpid/cpp/src/qpid/broker/QueueEvents.h Sun Oct 11 23:22:08 2009
@@ -54,7 +54,7 @@
typedef boost::function<void (Event)> EventListener;
- QPID_BROKER_EXTERN QueueEvents(const boost::shared_ptr<sys::Poller>& poller);
+ QPID_BROKER_EXTERN QueueEvents(const boost::shared_ptr<sys::Poller>& poller, bool isSync = false);
QPID_BROKER_EXTERN ~QueueEvents();
QPID_BROKER_EXTERN void enqueued(const QueuedMessage&);
QPID_BROKER_EXTERN void dequeued(const QueuedMessage&);
@@ -65,6 +65,7 @@
void disable();
//process all outstanding events
QPID_BROKER_EXTERN void shutdown();
+ QPID_BROKER_EXTERN bool isSync();
private:
typedef qpid::sys::PollableQueue<Event> EventQueue;
typedef std::map<std::string, EventListener> Listeners;
@@ -73,6 +74,7 @@
Listeners listeners;
volatile bool enabled;
qpid::sys::Mutex lock;//protect listeners from concurrent access
+ bool sync;
EventQueue::Batch::const_iterator handle(const EventQueue::Batch& e);
Modified: qpid/branches/java-network-refactor/qpid/cpp/src/qpid/broker/QueuePolicy.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-network-refactor/qpid/cpp/src/qpid/broker/QueuePolicy.cpp?rev=824198&r1=824197&r2=824198&view=diff
==============================================================================
--- qpid/branches/java-network-refactor/qpid/cpp/src/qpid/broker/QueuePolicy.cpp (original)
+++ qpid/branches/java-network-refactor/qpid/cpp/src/qpid/broker/QueuePolicy.cpp Sun Oct 11 23:22:08 2009
@@ -28,8 +28,8 @@
using namespace qpid::broker;
using namespace qpid::framing;
-QueuePolicy::QueuePolicy(uint32_t _maxCount, uint64_t _maxSize, const std::string& _type) :
- maxCount(_maxCount), maxSize(_maxSize), type(_type), count(0), size(0), policyExceeded(false) {}
+QueuePolicy::QueuePolicy(const std::string& _name, uint32_t _maxCount, uint64_t _maxSize, const std::string& _type) :
+ maxCount(_maxCount), maxSize(_maxSize), type(_type), count(0), size(0), policyExceeded(false), name(_name) {}
void QueuePolicy::enqueued(uint64_t _size)
{
@@ -39,18 +39,15 @@
void QueuePolicy::dequeued(uint64_t _size)
{
- //Note: underflow detection is not reliable in the face of
- //concurrent updates (at present locking in Queue.cpp prevents
- //these anyway); updates are atomic and are safe regardless.
if (maxCount) {
- if (count.get() > 0) {
+ if (count > 0) {
--count;
} else {
throw Exception(QPID_MSG("Attempted count underflow on dequeue(" << _size << "): " << *this));
}
}
if (maxSize) {
- if (_size > size.get()) {
+ if (_size > size) {
throw Exception(QPID_MSG("Attempted size underflow on dequeue(" << _size << "): " << *this));
} else {
size -= _size;
@@ -58,47 +55,47 @@
}
}
-bool QueuePolicy::checkLimit(const QueuedMessage& m)
+bool QueuePolicy::checkLimit(boost::intrusive_ptr<Message> m)
{
- bool sizeExceeded = maxSize && (size.get() + m.payload->contentSize()) > maxSize;
- bool countExceeded = maxCount && (count.get() + 1) > maxCount;
+ bool sizeExceeded = maxSize && (size + m->contentSize()) > maxSize;
+ bool countExceeded = maxCount && (count + 1) > maxCount;
bool exceeded = sizeExceeded || countExceeded;
if (exceeded) {
if (!policyExceeded) {
- policyExceeded = true;
- if (m.queue) {
- if (sizeExceeded) QPID_LOG(info, "Queue cumulative message size exceeded policy for " << m.queue->getName());
- if (countExceeded) QPID_LOG(info, "Queue message count exceeded policy for " << m.queue->getName());
- }
+ policyExceeded = true;
+ if (sizeExceeded) QPID_LOG(info, "Queue cumulative message size exceeded policy for " << name);
+ if (countExceeded) QPID_LOG(info, "Queue message count exceeded policy for " << name);
}
} else {
if (policyExceeded) {
policyExceeded = false;
- if (m.queue) {
- QPID_LOG(info, "Queue cumulative message size and message count within policy for " << m.queue->getName());
- }
+ QPID_LOG(info, "Queue cumulative message size and message count within policy for " << name);
}
}
return !exceeded;
}
-void QueuePolicy::tryEnqueue(const QueuedMessage& m)
+void QueuePolicy::tryEnqueue(boost::intrusive_ptr<Message> m)
{
if (checkLimit(m)) {
- enqueued(m);
+ enqueued(m->contentSize());
} else {
- std::string queue = m.queue ? m.queue->getName() : std::string("unknown queue");
- throw ResourceLimitExceededException(
- QPID_MSG("Policy exceeded on " << queue << " by message " << m.position
- << " of size " << m.payload->contentSize() << " , policy: " << *this));
+ throw ResourceLimitExceededException(QPID_MSG("Policy exceeded on " << name << ", policy: " << *this));
}
}
-void QueuePolicy::enqueued(const QueuedMessage& m)
+void QueuePolicy::recoverEnqueued(boost::intrusive_ptr<Message> m)
{
- enqueued(m.payload->contentSize());
+ enqueued(m->contentSize());
}
+void QueuePolicy::enqueueAborted(boost::intrusive_ptr<Message> m)
+{
+ dequeued(m->contentSize());
+}
+
+void QueuePolicy::enqueued(const QueuedMessage&) {}
+
void QueuePolicy::dequeued(const QueuedMessage& m)
{
dequeued(m.payload->contentSize());
@@ -132,7 +129,7 @@
std::transform(t.begin(), t.end(), t.begin(), tolower);
if (t == REJECT || t == FLOW_TO_DISK || t == RING || t == RING_STRICT) return t;
}
- return FLOW_TO_DISK;
+ return REJECT;
}
void QueuePolicy::setDefaultMaxSize(uint64_t s)
@@ -140,6 +137,7 @@
defaultMaxSize = s;
}
+void QueuePolicy::getPendingDequeues(Messages&) {}
@@ -148,8 +146,8 @@
{
buffer.putLong(maxCount);
buffer.putLongLong(maxSize);
- buffer.putLong(count.get());
- buffer.putLongLong(size.get());
+ buffer.putLong(count);
+ buffer.putLongLong(size);
}
void QueuePolicy::decode ( Buffer& buffer )
@@ -179,16 +177,18 @@
const std::string QueuePolicy::RING_STRICT("ring_strict");
uint64_t QueuePolicy::defaultMaxSize(0);
-FlowToDiskPolicy::FlowToDiskPolicy(uint32_t _maxCount, uint64_t _maxSize) :
- QueuePolicy(_maxCount, _maxSize, FLOW_TO_DISK) {}
+FlowToDiskPolicy::FlowToDiskPolicy(const std::string& _name, uint32_t _maxCount, uint64_t _maxSize) :
+ QueuePolicy(_name, _maxCount, _maxSize, FLOW_TO_DISK) {}
-bool FlowToDiskPolicy::checkLimit(const QueuedMessage& m)
+bool FlowToDiskPolicy::checkLimit(boost::intrusive_ptr<Message> m)
{
- return QueuePolicy::checkLimit(m) || m.queue->releaseMessageContent(m);
+ if (!QueuePolicy::checkLimit(m)) m->requestContentRelease();
+ return true;
}
-RingQueuePolicy::RingQueuePolicy(uint32_t _maxCount, uint64_t _maxSize, const std::string& _type) :
- QueuePolicy(_maxCount, _maxSize, _type), strict(_type == RING_STRICT) {}
+RingQueuePolicy::RingQueuePolicy(const std::string& _name,
+ uint32_t _maxCount, uint64_t _maxSize, const std::string& _type) :
+ QueuePolicy(_name, _maxCount, _maxSize, _type), strict(_type == RING_STRICT) {}
bool before(const QueuedMessage& a, const QueuedMessage& b)
{
@@ -197,15 +197,12 @@
void RingQueuePolicy::enqueued(const QueuedMessage& m)
{
- QueuePolicy::enqueued(m);
- qpid::sys::Mutex::ScopedLock l(lock);
//need to insert in correct location based on position
queue.insert(lower_bound(queue.begin(), queue.end(), m, before), m);
}
void RingQueuePolicy::dequeued(const QueuedMessage& m)
{
- qpid::sys::Mutex::ScopedLock l(lock);
//find and remove m from queue
if (find(m, pendingDequeues, true) || find(m, queue, true)) {
//now update count and size
@@ -215,49 +212,32 @@
bool RingQueuePolicy::isEnqueued(const QueuedMessage& m)
{
- qpid::sys::Mutex::ScopedLock l(lock);
//for non-strict ring policy, a message can be replaced (and
//therefore dequeued) before it is accepted or released by
//subscriber; need to detect this
return find(m, pendingDequeues, false) || find(m, queue, false);
}
-bool RingQueuePolicy::checkLimit(const QueuedMessage& m)
+bool RingQueuePolicy::checkLimit(boost::intrusive_ptr<Message> m)
{
if (QueuePolicy::checkLimit(m)) return true;//if haven't hit limit, ok to accept
QueuedMessage oldest;
- {
- qpid::sys::Mutex::ScopedLock l(lock);
- if (queue.empty()) {
- QPID_LOG(debug, "Message too large for ring queue "
- << (m.queue ? m.queue->getName() : std::string("unknown queue"))
- << " [" << *this << "] "
- << ": message size = " << m.payload->contentSize() << " bytes");
- return false;
- }
- oldest = queue.front();
+ if (queue.empty()) {
+ QPID_LOG(debug, "Message too large for ring queue " << name
+ << " [" << *this << "] "
+ << ": message size = " << m->contentSize() << " bytes");
+ return false;
}
+ oldest = queue.front();
if (oldest.queue->acquire(oldest) || !strict) {
- {
- //TODO: fix this! In the current code, this method is
- //only ever called with the Queue lock already taken. This
- //should not be relied upon going forward however and
- //clearly the locking in this class is insufficient as
- //there is no guarantee that the message previously atthe
- //front is still there.
- qpid::sys::Mutex::ScopedLock l(lock);
- queue.pop_front();
- pendingDequeues.push_back(oldest);
- }
- oldest.queue->addPendingDequeue(oldest);
- QPID_LOG(debug, "Ring policy triggered in queue "
- << (m.queue ? m.queue->getName() : std::string("unknown queue"))
- << ": removed message " << oldest.position << " to make way for " << m.position);
+ queue.pop_front();
+ pendingDequeues.push_back(oldest);
+ QPID_LOG(debug, "Ring policy triggered in " << name
+ << ": removed message " << oldest.position << " to make way for new message");
return true;
} else {
- QPID_LOG(debug, "Ring policy could not be triggered in queue "
- << (m.queue ? m.queue->getName() : std::string("unknown queue"))
+ QPID_LOG(debug, "Ring policy could not be triggered in " << name
<< ": oldest message (seq-no=" << oldest.position << ") has been delivered but not yet acknowledged or requeued");
//in strict mode, if oldest message has been delivered (hence
//cannot be acquired) but not yet acked, it should not be
@@ -266,6 +246,11 @@
}
}
+void RingQueuePolicy::getPendingDequeues(Messages& result)
+{
+ result = pendingDequeues;
+}
+
bool RingQueuePolicy::find(const QueuedMessage& m, Messages& q, bool remove)
{
for (Messages::iterator i = q.begin(); i != q.end(); i++) {
@@ -277,25 +262,36 @@
return false;
}
+std::auto_ptr<QueuePolicy> QueuePolicy::createQueuePolicy(uint32_t maxCount, uint64_t maxSize, const std::string& type)
+{
+ return createQueuePolicy("<unspecified>", maxCount, maxSize, type);
+}
+
std::auto_ptr<QueuePolicy> QueuePolicy::createQueuePolicy(const qpid::framing::FieldTable& settings)
{
+ return createQueuePolicy("<unspecified>", settings);
+}
+
+std::auto_ptr<QueuePolicy> QueuePolicy::createQueuePolicy(const std::string& name, const qpid::framing::FieldTable& settings)
+{
uint32_t maxCount = getInt(settings, maxCountKey, 0);
uint32_t maxSize = getInt(settings, maxSizeKey, defaultMaxSize);
if (maxCount || maxSize) {
- return createQueuePolicy(maxCount, maxSize, getType(settings));
+ return createQueuePolicy(name, maxCount, maxSize, getType(settings));
} else {
return std::auto_ptr<QueuePolicy>();
}
}
-std::auto_ptr<QueuePolicy> QueuePolicy::createQueuePolicy(uint32_t maxCount, uint64_t maxSize, const std::string& type)
+std::auto_ptr<QueuePolicy> QueuePolicy::createQueuePolicy(const std::string& name,
+ uint32_t maxCount, uint64_t maxSize, const std::string& type)
{
if (type == RING || type == RING_STRICT) {
- return std::auto_ptr<QueuePolicy>(new RingQueuePolicy(maxCount, maxSize, type));
+ return std::auto_ptr<QueuePolicy>(new RingQueuePolicy(name, maxCount, maxSize, type));
} else if (type == FLOW_TO_DISK) {
- return std::auto_ptr<QueuePolicy>(new FlowToDiskPolicy(maxCount, maxSize));
+ return std::auto_ptr<QueuePolicy>(new FlowToDiskPolicy(name, maxCount, maxSize));
} else {
- return std::auto_ptr<QueuePolicy>(new QueuePolicy(maxCount, maxSize, type));
+ return std::auto_ptr<QueuePolicy>(new QueuePolicy(name, maxCount, maxSize, type));
}
}
@@ -305,10 +301,10 @@
std::ostream& operator<<(std::ostream& out, const QueuePolicy& p)
{
- if (p.maxSize) out << "size: max=" << p.maxSize << ", current=" << p.size.get();
+ if (p.maxSize) out << "size: max=" << p.maxSize << ", current=" << p.size;
else out << "size: unlimited";
out << "; ";
- if (p.maxCount) out << "count: max=" << p.maxCount << ", current=" << p.count.get();
+ if (p.maxCount) out << "count: max=" << p.maxCount << ", current=" << p.count;
else out << "count: unlimited";
out << "; type=" << p.type;
return out;
Modified: qpid/branches/java-network-refactor/qpid/cpp/src/qpid/broker/QueuePolicy.h
URL: http://svn.apache.org/viewvc/qpid/branches/java-network-refactor/qpid/cpp/src/qpid/broker/QueuePolicy.h?rev=824198&r1=824197&r2=824198&view=diff
==============================================================================
--- qpid/branches/java-network-refactor/qpid/cpp/src/qpid/broker/QueuePolicy.h (original)
+++ qpid/branches/java-network-refactor/qpid/cpp/src/qpid/broker/QueuePolicy.h Sun Oct 11 23:22:08 2009
@@ -40,14 +40,14 @@
uint32_t maxCount;
uint64_t maxSize;
const std::string type;
- qpid::sys::AtomicValue<uint32_t> count;
- qpid::sys::AtomicValue<uint64_t> size;
+ uint32_t count;
+ uint64_t size;
bool policyExceeded;
static int getInt(const qpid::framing::FieldTable& settings, const std::string& key, int defaultValue);
- static std::string getType(const qpid::framing::FieldTable& settings);
public:
+ typedef std::deque<QueuedMessage> Messages;
static QPID_BROKER_EXTERN const std::string maxCountKey;
static QPID_BROKER_EXTERN const std::string maxSizeKey;
static QPID_BROKER_EXTERN const std::string typeKey;
@@ -57,27 +57,34 @@
static QPID_BROKER_EXTERN const std::string RING_STRICT;
virtual ~QueuePolicy() {}
- QPID_BROKER_EXTERN void tryEnqueue(const QueuedMessage&);
+ QPID_BROKER_EXTERN void tryEnqueue(boost::intrusive_ptr<Message> msg);
+ QPID_BROKER_EXTERN void recoverEnqueued(boost::intrusive_ptr<Message> msg);
+ QPID_BROKER_EXTERN void enqueueAborted(boost::intrusive_ptr<Message> msg);
+ virtual void enqueued(const QueuedMessage&);
virtual void dequeued(const QueuedMessage&);
virtual bool isEnqueued(const QueuedMessage&);
- virtual bool checkLimit(const QueuedMessage&);
QPID_BROKER_EXTERN void update(qpid::framing::FieldTable& settings);
uint32_t getMaxCount() const { return maxCount; }
uint64_t getMaxSize() const { return maxSize; }
void encode(framing::Buffer& buffer) const;
void decode ( framing::Buffer& buffer );
uint32_t encodedSize() const;
+ virtual void getPendingDequeues(Messages& result);
-
+ static QPID_BROKER_EXTERN std::auto_ptr<QueuePolicy> createQueuePolicy(const std::string& name, const qpid::framing::FieldTable& settings);
+ static QPID_BROKER_EXTERN std::auto_ptr<QueuePolicy> createQueuePolicy(const std::string& name, uint32_t maxCount, uint64_t maxSize, const std::string& type = REJECT);
static QPID_BROKER_EXTERN std::auto_ptr<QueuePolicy> createQueuePolicy(const qpid::framing::FieldTable& settings);
static QPID_BROKER_EXTERN std::auto_ptr<QueuePolicy> createQueuePolicy(uint32_t maxCount, uint64_t maxSize, const std::string& type = REJECT);
+ static std::string getType(const qpid::framing::FieldTable& settings);
static void setDefaultMaxSize(uint64_t);
friend QPID_BROKER_EXTERN std::ostream& operator<<(std::ostream&,
const QueuePolicy&);
protected:
- QueuePolicy(uint32_t maxCount, uint64_t maxSize, const std::string& type = REJECT);
+ const std::string name;
- virtual void enqueued(const QueuedMessage&);
+ QueuePolicy(const std::string& name, uint32_t maxCount, uint64_t maxSize, const std::string& type = REJECT);
+
+ virtual bool checkLimit(boost::intrusive_ptr<Message> msg);
void enqueued(uint64_t size);
void dequeued(uint64_t size);
};
@@ -86,21 +93,20 @@
class FlowToDiskPolicy : public QueuePolicy
{
public:
- FlowToDiskPolicy(uint32_t maxCount, uint64_t maxSize);
- bool checkLimit(const QueuedMessage&);
+ FlowToDiskPolicy(const std::string& name, uint32_t maxCount, uint64_t maxSize);
+ bool checkLimit(boost::intrusive_ptr<Message> msg);
};
class RingQueuePolicy : public QueuePolicy
{
public:
- RingQueuePolicy(uint32_t maxCount, uint64_t maxSize, const std::string& type = RING);
+ RingQueuePolicy(const std::string& name, uint32_t maxCount, uint64_t maxSize, const std::string& type = RING);
void enqueued(const QueuedMessage&);
void dequeued(const QueuedMessage&);
bool isEnqueued(const QueuedMessage&);
- bool checkLimit(const QueuedMessage&);
+ bool checkLimit(boost::intrusive_ptr<Message> msg);
+ void getPendingDequeues(Messages& result);
private:
- typedef std::deque<QueuedMessage> Messages;
- qpid::sys::Mutex lock;
Messages pendingDequeues;
Messages queue;
const bool strict;
Modified: qpid/branches/java-network-refactor/qpid/cpp/src/qpid/broker/SemanticState.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-network-refactor/qpid/cpp/src/qpid/broker/SemanticState.cpp?rev=824198&r1=824197&r2=824198&view=diff
==============================================================================
--- qpid/branches/java-network-refactor/qpid/cpp/src/qpid/broker/SemanticState.cpp (original)
+++ qpid/branches/java-network-refactor/qpid/cpp/src/qpid/broker/SemanticState.cpp Sun Oct 11 23:22:08 2009
@@ -65,7 +65,7 @@
tagGenerator("sgen"),
dtxSelected(false),
authMsg(getSession().getBroker().getOptions().auth && !getSession().getConnection().isFederationLink()),
- userID(getSession().getConnection().getUserId().substr(0,getSession().getConnection().getUserId().find('@')))
+ userID(getSession().getConnection().getUserId())
{
acl = getSession().getBroker().getAcl();
}
@@ -302,6 +302,18 @@
return !blocked;
}
+namespace {
+struct ConsumerName {
+ const SemanticState::ConsumerImpl& consumer;
+ ConsumerName(const SemanticState::ConsumerImpl& ci) : consumer(ci) {}
+};
+
+ostream& operator<<(ostream& o, const ConsumerName& pc) {
+ return o << pc.consumer.getName() << " on "
+ << pc.consumer.getParent().getSession().getSessionId();
+}
+}
+
void SemanticState::ConsumerImpl::allocateCredit(intrusive_ptr<Message>& msg)
{
uint32_t originalMsgCredit = msgCredit;
@@ -312,7 +324,7 @@
if (byteCredit != 0xFFFFFFFF) {
byteCredit -= msg->getRequiredCredit();
}
- QPID_LOG(debug, "Credit allocated for '" << name << "' on " << parent
+ QPID_LOG(debug, "Credit allocated for " << ConsumerName(*this)
<< ", was " << " bytes: " << originalByteCredit << " msgs: " << originalMsgCredit
<< " now bytes: " << byteCredit << " msgs: " << msgCredit);
@@ -320,15 +332,13 @@
bool SemanticState::ConsumerImpl::checkCredit(intrusive_ptr<Message>& msg)
{
- if (msgCredit == 0 || (byteCredit != 0xFFFFFFFF && byteCredit < msg->getRequiredCredit())) {
- QPID_LOG(debug, "Not enough credit for '" << name << "' on " << parent
- << ", bytes: " << byteCredit << " msgs: " << msgCredit);
- return false;
- } else {
- QPID_LOG(debug, "Credit available for '" << name << "' on " << parent
- << " bytes: " << byteCredit << " msgs: " << msgCredit);
- return true;
- }
+ bool enoughCredit = msgCredit > 0 &&
+ (byteCredit == 0xFFFFFFFF || byteCredit >= msg->getRequiredCredit());
+ QPID_LOG(debug, (enoughCredit ? "Sufficient credit for " : "Insufficient credit for ")
+ << ConsumerName(*this)
+ << ", have bytes: " << byteCredit << " msgs: " << msgCredit
+ << ", need " << msg->getRequiredCredit() << " bytes");
+ return enoughCredit;
}
SemanticState::ConsumerImpl::~ConsumerImpl() {}
@@ -356,6 +366,9 @@
} else {
DeliverableMessage deliverable(msg);
route(msg, deliverable);
+ if (msg->checkContentReleasable()) {
+ msg->releaseContent();
+ }
}
}
Modified: qpid/branches/java-network-refactor/qpid/cpp/src/qpid/broker/SemanticState.h
URL: http://svn.apache.org/viewvc/qpid/branches/java-network-refactor/qpid/cpp/src/qpid/broker/SemanticState.h?rev=824198&r1=824197&r2=824198&view=diff
==============================================================================
--- qpid/branches/java-network-refactor/qpid/cpp/src/qpid/broker/SemanticState.h (original)
+++ qpid/branches/java-network-refactor/qpid/cpp/src/qpid/broker/SemanticState.h Sun Oct 11 23:22:08 2009
@@ -129,6 +129,7 @@
const framing::FieldTable& getArguments() const { return arguments; }
SemanticState& getParent() { return *parent; }
+ const SemanticState& getParent() const { return *parent; }
};
private:
@@ -163,6 +164,7 @@
~SemanticState();
SessionContext& getSession() { return session; }
+ const SessionContext& getSession() const { return session; }
ConsumerImpl& find(const std::string& destination);
Modified: qpid/branches/java-network-refactor/qpid/cpp/src/qpid/broker/SessionAdapter.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-network-refactor/qpid/cpp/src/qpid/broker/SessionAdapter.cpp?rev=824198&r1=824197&r2=824198&view=diff
==============================================================================
--- qpid/branches/java-network-refactor/qpid/cpp/src/qpid/broker/SessionAdapter.cpp (original)
+++ qpid/branches/java-network-refactor/qpid/cpp/src/qpid/broker/SessionAdapter.cpp Sun Oct 11 23:22:08 2009
@@ -337,6 +337,10 @@
params.insert(make_pair(acl::PROP_DURABLE, std::string(durable ? _TRUE : _FALSE)));
params.insert(make_pair(acl::PROP_EXCLUSIVE, std::string(exclusive ? _TRUE : _FALSE)));
params.insert(make_pair(acl::PROP_AUTODELETE, std::string(autoDelete ? _TRUE : _FALSE)));
+ params.insert(make_pair(acl::PROP_POLICYTYPE, arguments.getAsString("qpid.policy_type")));
+ params.insert(make_pair(acl::PROP_MAXQUEUECOUNT, boost::lexical_cast<string>(arguments.getAsInt("qpid.max_count"))));
+ params.insert(make_pair(acl::PROP_MAXQUEUESIZE, boost::lexical_cast<string>(arguments.getAsInt64("qpid.max_size"))));
+
if (!acl->authorise(getConnection().getUserId(),acl::ACT_CREATE,acl::OBJ_QUEUE,name,¶ms) )
throw NotAllowedException(QPID_MSG("ACL denied queue create request from " << getConnection().getUserId()));
}
@@ -472,8 +476,7 @@
AclModule* acl = getBroker().getAcl();
if (acl)
- {
- // add flags as needed
+ {
if (!acl->authorise(getConnection().getUserId(),acl::ACT_CONSUME,acl::OBJ_QUEUE,queueName,NULL) )
throw NotAllowedException(QPID_MSG("ACL denied Queue subscribe request from " << getConnection().getUserId()));
}
Modified: qpid/branches/java-network-refactor/qpid/cpp/src/qpid/broker/SessionContext.h
URL: http://svn.apache.org/viewvc/qpid/branches/java-network-refactor/qpid/cpp/src/qpid/broker/SessionContext.h?rev=824198&r1=824197&r2=824198&view=diff
==============================================================================
--- qpid/branches/java-network-refactor/qpid/cpp/src/qpid/broker/SessionContext.h (original)
+++ qpid/branches/java-network-refactor/qpid/cpp/src/qpid/broker/SessionContext.h Sun Oct 11 23:22:08 2009
@@ -28,7 +28,7 @@
#include "qpid/sys/OutputControl.h"
#include "qpid/broker/ConnectionState.h"
#include "qpid/broker/OwnershipToken.h"
-
+#include "qpid/SessionId.h"
#include <boost/noncopyable.hpp>
@@ -45,6 +45,7 @@
virtual framing::AMQP_ClientProxy& getProxy() = 0;
virtual Broker& getBroker() = 0;
virtual uint16_t getChannel() const = 0;
+ virtual const SessionId& getSessionId() const = 0;
};
}} // namespace qpid::broker
Modified: qpid/branches/java-network-refactor/qpid/cpp/src/qpid/broker/SessionState.h
URL: http://svn.apache.org/viewvc/qpid/branches/java-network-refactor/qpid/cpp/src/qpid/broker/SessionState.h?rev=824198&r1=824197&r2=824198&view=diff
==============================================================================
--- qpid/branches/java-network-refactor/qpid/cpp/src/qpid/broker/SessionState.h (original)
+++ qpid/branches/java-network-refactor/qpid/cpp/src/qpid/broker/SessionState.h Sun Oct 11 23:22:08 2009
@@ -118,6 +118,8 @@
bool processSendCredit(uint32_t msgs);
+ const SessionId& getSessionId() const { return getId(); }
+
private:
void handleCommand(framing::AMQMethodBody* method, const framing::SequenceNumber& id);
Modified: qpid/branches/java-network-refactor/qpid/cpp/src/qpid/broker/SignalHandler.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-network-refactor/qpid/cpp/src/qpid/broker/SignalHandler.cpp?rev=824198&r1=824197&r2=824198&view=diff
==============================================================================
--- qpid/branches/java-network-refactor/qpid/cpp/src/qpid/broker/SignalHandler.cpp (original)
+++ qpid/branches/java-network-refactor/qpid/cpp/src/qpid/broker/SignalHandler.cpp Sun Oct 11 23:22:08 2009
@@ -38,6 +38,8 @@
signal(SIGCHLD,SIG_IGN);
}
+void SignalHandler::shutdown() { shutdownHandler(0); }
+
void SignalHandler::shutdownHandler(int) {
if (broker.get()) {
broker->shutdown();
Modified: qpid/branches/java-network-refactor/qpid/cpp/src/qpid/broker/SignalHandler.h
URL: http://svn.apache.org/viewvc/qpid/branches/java-network-refactor/qpid/cpp/src/qpid/broker/SignalHandler.h?rev=824198&r1=824197&r2=824198&view=diff
==============================================================================
--- qpid/branches/java-network-refactor/qpid/cpp/src/qpid/broker/SignalHandler.h (original)
+++ qpid/branches/java-network-refactor/qpid/cpp/src/qpid/broker/SignalHandler.h Sun Oct 11 23:22:08 2009
@@ -38,6 +38,9 @@
/** Set the broker to be shutdown on signals */
static void setBroker(const boost::intrusive_ptr<Broker>& broker);
+ /** Initiate shut-down of broker */
+ static void shutdown();
+
private:
static void shutdownHandler(int);
static boost::intrusive_ptr<Broker> broker;
Modified: qpid/branches/java-network-refactor/qpid/cpp/src/qpid/broker/TopicExchange.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-network-refactor/qpid/cpp/src/qpid/broker/TopicExchange.cpp?rev=824198&r1=824197&r2=824198&view=diff
==============================================================================
--- qpid/branches/java-network-refactor/qpid/cpp/src/qpid/broker/TopicExchange.cpp (original)
+++ qpid/branches/java-network-refactor/qpid/cpp/src/qpid/broker/TopicExchange.cpp Sun Oct 11 23:22:08 2009
@@ -293,44 +293,23 @@
return q != qv.end();
}
-void TopicExchange::route(Deliverable& msg, const string& routingKey, const FieldTable* /*args*/){
+void TopicExchange::route(Deliverable& msg, const string& routingKey, const FieldTable* /*args*/)
+{
Binding::vector mb;
+ BindingList b(new std::vector<boost::shared_ptr<qpid::broker::Exchange::Binding> >);
PreRoute pr(msg, this);
- uint32_t count(0);
-
{
- RWlock::ScopedRlock l(lock);
- for (BindingMap::iterator i = bindings.begin(); i != bindings.end(); ++i) {
- if (match(i->first, routingKey)) {
- Binding::vector& qv(i->second.bindingVector);
- for(Binding::vector::iterator j = qv.begin(); j != qv.end(); j++, count++){
- mb.push_back(*j);
+ RWlock::ScopedRlock l(lock);
+ for (BindingMap::iterator i = bindings.begin(); i != bindings.end(); ++i) {
+ if (match(i->first, routingKey)) {
+ Binding::vector& qv(i->second.bindingVector);
+ for(Binding::vector::iterator j = qv.begin(); j != qv.end(); j++){
+ b->push_back(*j);
+ }
}
}
}
- }
-
- for (Binding::vector::iterator j = mb.begin(); j != mb.end(); ++j) {
- msg.deliverTo((*j)->queue);
- if ((*j)->mgmtBinding != 0)
- (*j)->mgmtBinding->inc_msgMatched ();
- }
-
- if (mgmtExchange != 0)
- {
- mgmtExchange->inc_msgReceives ();
- mgmtExchange->inc_byteReceives (msg.contentSize ());
- if (count == 0)
- {
- mgmtExchange->inc_msgDrops ();
- mgmtExchange->inc_byteDrops (msg.contentSize ());
- }
- else
- {
- mgmtExchange->inc_msgRoutes (count);
- mgmtExchange->inc_byteRoutes (count * msg.contentSize ());
- }
- }
+ doRoute(msg, b);
}
bool TopicExchange::isBound(Queue::shared_ptr queue, const string* const routingKey, const FieldTable* const)
Modified: qpid/branches/java-network-refactor/qpid/cpp/src/qpid/broker/TxAccept.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-network-refactor/qpid/cpp/src/qpid/broker/TxAccept.cpp?rev=824198&r1=824197&r2=824198&view=diff
==============================================================================
--- qpid/branches/java-network-refactor/qpid/cpp/src/qpid/broker/TxAccept.cpp (original)
+++ qpid/branches/java-network-refactor/qpid/cpp/src/qpid/broker/TxAccept.cpp Sun Oct 11 23:22:08 2009
@@ -88,7 +88,13 @@
void TxAccept::commit() throw()
{
- ops.commit();
+ try {
+ ops.commit();
+ } catch (const std::exception& e) {
+ QPID_LOG(error, "Failed to commit: " << e.what());
+ } catch(...) {
+ QPID_LOG(error, "Failed to commit (unknown error)");
+ }
}
void TxAccept::rollback() throw() {}
Modified: qpid/branches/java-network-refactor/qpid/cpp/src/qpid/broker/TxPublish.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-network-refactor/qpid/cpp/src/qpid/broker/TxPublish.cpp?rev=824198&r1=824197&r2=824198&view=diff
==============================================================================
--- qpid/branches/java-network-refactor/qpid/cpp/src/qpid/broker/TxPublish.cpp (original)
+++ qpid/branches/java-network-refactor/qpid/cpp/src/qpid/broker/TxPublish.cpp Sun Oct 11 23:22:08 2009
@@ -26,9 +26,14 @@
TxPublish::TxPublish(intrusive_ptr<Message> _msg) : msg(_msg) {}
-bool TxPublish::prepare(TransactionContext* ctxt) throw(){
+bool TxPublish::prepare(TransactionContext* ctxt) throw()
+{
try{
- for_each(queues.begin(), queues.end(), Prepare(ctxt, msg));
+ while (!queues.empty()) {
+ prepare(ctxt, queues.front());
+ prepared.push_back(queues.front());
+ queues.pop_front();
+ }
return true;
}catch(const std::exception& e){
QPID_LOG(error, "Failed to prepare: " << e.what());
@@ -38,11 +43,30 @@
return false;
}
-void TxPublish::commit() throw(){
- for_each(queues.begin(), queues.end(), Commit(msg));
+void TxPublish::commit() throw()
+{
+ try {
+ for_each(prepared.begin(), prepared.end(), Commit(msg));
+ if (msg->checkContentReleasable()) {
+ msg->releaseContent();
+ }
+ } catch (const std::exception& e) {
+ QPID_LOG(error, "Failed to commit: " << e.what());
+ } catch(...) {
+ QPID_LOG(error, "Failed to commit (unknown error)");
+ }
}
-void TxPublish::rollback() throw(){
+void TxPublish::rollback() throw()
+{
+ try {
+ for_each(prepared.begin(), prepared.end(), Rollback(msg));
+ } catch (const std::exception& e) {
+ QPID_LOG(error, "Failed to complete rollback: " << e.what());
+ } catch(...) {
+ QPID_LOG(error, "Failed to complete rollback (unknown error)");
+ }
+
}
void TxPublish::deliverTo(const boost::shared_ptr<Queue>& queue){
@@ -54,16 +78,14 @@
}
}
-TxPublish::Prepare::Prepare(TransactionContext* _ctxt, intrusive_ptr<Message>& _msg)
- : ctxt(_ctxt), msg(_msg){}
-
-void TxPublish::Prepare::operator()(const boost::shared_ptr<Queue>& queue){
+void TxPublish::prepare(TransactionContext* ctxt, const boost::shared_ptr<Queue> queue)
+{
if (!queue->enqueue(ctxt, msg)){
/**
- * if not store then mark message for ack and deleivery once
- * commit happens, as async IO will never set it when no store
- * exists
- */
+ * if not store then mark message for ack and deleivery once
+ * commit happens, as async IO will never set it when no store
+ * exists
+ */
msg->enqueueComplete();
}
}
@@ -74,6 +96,12 @@
queue->process(msg);
}
+TxPublish::Rollback::Rollback(intrusive_ptr<Message>& _msg) : msg(_msg){}
+
+void TxPublish::Rollback::operator()(const boost::shared_ptr<Queue>& queue){
+ queue->enqueueAborted(msg);
+}
+
uint64_t TxPublish::contentSize ()
{
return msg->contentSize ();
Modified: qpid/branches/java-network-refactor/qpid/cpp/src/qpid/broker/TxPublish.h
URL: http://svn.apache.org/viewvc/qpid/branches/java-network-refactor/qpid/cpp/src/qpid/broker/TxPublish.h?rev=824198&r1=824197&r2=824198&view=diff
==============================================================================
--- qpid/branches/java-network-refactor/qpid/cpp/src/qpid/broker/TxPublish.h (original)
+++ qpid/branches/java-network-refactor/qpid/cpp/src/qpid/broker/TxPublish.h Sun Oct 11 23:22:08 2009
@@ -47,23 +47,25 @@
* dispatch or to be added to the in-memory queue.
*/
class TxPublish : public TxOp, public Deliverable{
- class Prepare{
- TransactionContext* ctxt;
+
+ class Commit{
boost::intrusive_ptr<Message>& msg;
public:
- Prepare(TransactionContext* ctxt, boost::intrusive_ptr<Message>& msg);
+ Commit(boost::intrusive_ptr<Message>& msg);
void operator()(const boost::shared_ptr<Queue>& queue);
};
-
- class Commit{
+ class Rollback{
boost::intrusive_ptr<Message>& msg;
public:
- Commit(boost::intrusive_ptr<Message>& msg);
+ Rollback(boost::intrusive_ptr<Message>& msg);
void operator()(const boost::shared_ptr<Queue>& queue);
};
boost::intrusive_ptr<Message> msg;
std::list<Queue::shared_ptr> queues;
+ std::list<Queue::shared_ptr> prepared;
+
+ void prepare(TransactionContext* ctxt, boost::shared_ptr<Queue>);
public:
QPID_BROKER_EXTERN TxPublish(boost::intrusive_ptr<Message> msg);
Modified: qpid/branches/java-network-refactor/qpid/cpp/src/qpid/client/ConnectionHandler.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-network-refactor/qpid/cpp/src/qpid/client/ConnectionHandler.cpp?rev=824198&r1=824197&r2=824198&view=diff
==============================================================================
--- qpid/branches/java-network-refactor/qpid/cpp/src/qpid/client/ConnectionHandler.cpp (original)
+++ qpid/branches/java-network-refactor/qpid/cpp/src/qpid/client/ConnectionHandler.cpp Sun Oct 11 23:22:08 2009
@@ -257,6 +257,7 @@
knownBrokersUrls.push_back(Url((*i)->get<std::string>()));
if (sasl.get()) {
securityLayer = sasl->getSecurityLayer(maxFrameSize);
+ operUserId = sasl->getUserId();
}
setState(OPEN);
QPID_LOG(debug, "Known-brokers for connection: " << log::formatList(knownBrokersUrls));
Modified: qpid/branches/java-network-refactor/qpid/cpp/src/qpid/client/ConnectionHandler.h
URL: http://svn.apache.org/viewvc/qpid/branches/java-network-refactor/qpid/cpp/src/qpid/client/ConnectionHandler.h?rev=824198&r1=824197&r2=824198&view=diff
==============================================================================
--- qpid/branches/java-network-refactor/qpid/cpp/src/qpid/client/ConnectionHandler.h (original)
+++ qpid/branches/java-network-refactor/qpid/cpp/src/qpid/client/ConnectionHandler.h Sun Oct 11 23:22:08 2009
@@ -71,6 +71,7 @@
std::auto_ptr<Sasl> sasl;
std::auto_ptr<qpid::sys::SecurityLayer> securityLayer;
boost::intrusive_ptr<qpid::sys::TimerTask> rcvTimeoutTask;
+ std::string operUserId;
void checkState(STATES s, const std::string& msg);
@@ -120,6 +121,7 @@
std::vector<Url> knownBrokersUrls;
static framing::connection::CloseCode convert(uint16_t replyCode);
+ const std::string& getUserId() const { return operUserId; }
};
}}
Modified: qpid/branches/java-network-refactor/qpid/cpp/src/qpid/client/ConnectionImpl.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-network-refactor/qpid/cpp/src/qpid/client/ConnectionImpl.cpp?rev=824198&r1=824197&r2=824198&view=diff
==============================================================================
--- qpid/branches/java-network-refactor/qpid/cpp/src/qpid/client/ConnectionImpl.cpp (original)
+++ qpid/branches/java-network-refactor/qpid/cpp/src/qpid/client/ConnectionImpl.cpp Sun Oct 11 23:22:08 2009
@@ -151,6 +151,12 @@
handler.waitForOpen();
+ // If the SASL layer has provided an "operational" userId for the connection,
+ // put it in the negotiated settings.
+ const std::string& userId(handler.getUserId());
+ if (!userId.empty())
+ handler.username = userId;
+
//enable security layer if one has been negotiated:
std::auto_ptr<SecurityLayer> securityLayer = handler.getSecurityLayer();
if (securityLayer.get()) {
Modified: qpid/branches/java-network-refactor/qpid/cpp/src/qpid/client/Connector.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-network-refactor/qpid/cpp/src/qpid/client/Connector.cpp?rev=824198&r1=824197&r2=824198&view=diff
==============================================================================
--- qpid/branches/java-network-refactor/qpid/cpp/src/qpid/client/Connector.cpp (original)
+++ qpid/branches/java-network-refactor/qpid/cpp/src/qpid/client/Connector.cpp Sun Oct 11 23:22:08 2009
@@ -51,10 +51,10 @@
// Stuff for the registry of protocol connectors (maybe should be moved to its own file)
namespace {
typedef std::map<std::string, Connector::Factory*> ProtocolRegistry;
-
+
ProtocolRegistry& theProtocolRegistry() {
static ProtocolRegistry protocolRegistry;
-
+
return protocolRegistry;
}
}
@@ -93,7 +93,7 @@
size_t lastEof; // Position after last EOF in frames
uint64_t currentSize;
Bounds* bounds;
-
+
framing::ProtocolVersion version;
bool initiated;
bool closed;
@@ -118,16 +118,17 @@
void run();
void handleClosed();
bool closeInternal();
-
+
+ void connected(const Socket&);
+ void connectFailed(const std::string& msg);
bool readbuff(qpid::sys::AsynchIO&, qpid::sys::AsynchIOBufferBase*);
void writebuff(qpid::sys::AsynchIO&);
void writeDataBlock(const framing::AMQDataBlock& data);
void eof(qpid::sys::AsynchIO&);
boost::weak_ptr<ConnectionImpl> impl;
-
+
void connect(const std::string& host, int port);
- void init();
void close();
void send(framing::AMQFrame& frame);
void abort();
@@ -142,7 +143,6 @@
size_t decode(const char* buffer, size_t size);
size_t encode(const char* buffer, size_t size);
bool canEncode();
-
public:
TCPConnector(framing::ProtocolVersion pVersion,
@@ -163,6 +163,11 @@
} init;
}
+struct TCPConnector::Buff : public AsynchIO::BufferBase {
+ Buff(size_t size) : AsynchIO::BufferBase(new char[size], size) {}
+ ~Buff() { delete [] bytes;}
+};
+
TCPConnector::TCPConnector(ProtocolVersion ver,
const ConnectionSettings& settings,
ConnectionImpl* cimpl)
@@ -189,15 +194,19 @@
void TCPConnector::connect(const std::string& host, int port){
Mutex::ScopedLock l(lock);
assert(closed);
- try {
- socket.connect(host, port);
- } catch (const std::exception& /*e*/) {
- socket.close();
- throw;
- }
-
- identifier = str(format("[%1% %2%]") % socket.getLocalPort() % socket.getPeerAddress());
+ assert(joined);
poller = Poller::shared_ptr(new Poller);
+ AsynchConnector::create(socket,
+ poller,
+ host, port,
+ boost::bind(&TCPConnector::connected, this, _1),
+ boost::bind(&TCPConnector::connectFailed, this, _3));
+ closed = false;
+ joined = false;
+ receiver = Thread(this);
+}
+
+void TCPConnector::connected(const Socket&) {
aio = AsynchIO::create(socket,
boost::bind(&TCPConnector::readbuff, this, _1, _2),
boost::bind(&TCPConnector::eof, this, _1),
@@ -205,16 +214,23 @@
0, // closed
0, // nobuffs
boost::bind(&TCPConnector::writebuff, this, _1));
- closed = false;
-}
+ for (int i = 0; i < 32; i++) {
+ aio->queueReadBuffer(new Buff(maxFrameSize));
+ }
+ aio->start(poller);
-void TCPConnector::init(){
- Mutex::ScopedLock l(lock);
- assert(joined);
+ identifier = str(format("[%1% %2%]") % socket.getLocalPort() % socket.getPeerAddress());
ProtocolInitiation init(version);
writeDataBlock(init);
- joined = false;
- receiver = Thread(this);
+}
+
+void TCPConnector::connectFailed(const std::string& msg) {
+ QPID_LOG(warning, "Connecting failed: " << msg);
+ closed = true;
+ poller->shutdown();
+ closeInternal();
+ if (shutdownHandler)
+ shutdownHandler->shutdown();
}
bool TCPConnector::closeInternal() {
@@ -235,7 +251,7 @@
receiver.join();
return ret;
}
-
+
void TCPConnector::close() {
closeInternal();
}
@@ -243,7 +259,13 @@
void TCPConnector::abort() {
// Can't abort a closed connection
if (!closed) {
- aio->requestCallback(boost::bind(&TCPConnector::eof, this, _1));
+ if (aio) {
+ // Established connection
+ aio->requestCallback(boost::bind(&TCPConnector::eof, this, _1));
+ } else {
+ // We're still connecting
+ connectFailed("Connection timedout");
+ }
}
}
@@ -288,18 +310,13 @@
shutdownHandler->shutdown();
}
-struct TCPConnector::Buff : public AsynchIO::BufferBase {
- Buff(size_t size) : AsynchIO::BufferBase(new char[size], size) {}
- ~Buff() { delete [] bytes;}
-};
-
void TCPConnector::writebuff(AsynchIO& /*aio*/)
{
Codec* codec = securityLayer.get() ? (Codec*) securityLayer.get() : (Codec*) this;
if (codec->canEncode()) {
std::auto_ptr<AsynchIO::BufferBase> buffer = std::auto_ptr<AsynchIO::BufferBase>(aio->getQueuedBuffer());
if (!buffer.get()) buffer = std::auto_ptr<AsynchIO::BufferBase>(new Buff(maxFrameSize));
-
+
size_t encoded = codec->encode(buffer->bytes, buffer->byteCount);
buffer->dataStart = 0;
@@ -395,11 +412,6 @@
try {
Dispatcher d(poller);
- for (int i = 0; i < 32; i++) {
- aio->queueReadBuffer(new Buff(maxFrameSize));
- }
-
- aio->start(poller);
d.run();
} catch (const std::exception& e) {
QPID_LOG(error, QPID_MSG("FAIL " << identifier << ": " << e.what()));
Modified: qpid/branches/java-network-refactor/qpid/cpp/src/qpid/client/RdmaConnector.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-network-refactor/qpid/cpp/src/qpid/client/RdmaConnector.cpp?rev=824198&r1=824197&r2=824198&view=diff
==============================================================================
--- qpid/branches/java-network-refactor/qpid/cpp/src/qpid/client/RdmaConnector.cpp (original)
+++ qpid/branches/java-network-refactor/qpid/cpp/src/qpid/client/RdmaConnector.cpp Sun Oct 11 23:22:08 2009
@@ -167,20 +167,9 @@
assert(joined);
poller = Poller::shared_ptr(new Poller);
- // This stuff needs to abstracted out of here to a platform specific file
- ::addrinfo *res;
- ::addrinfo hints;
- hints.ai_flags = 0;
- hints.ai_family = AF_INET;
- hints.ai_socktype = SOCK_STREAM;
- hints.ai_protocol = 0;
- int n = ::getaddrinfo(host.c_str(), boost::lexical_cast<std::string>(port).c_str(), &hints, &res);
- if (n<0) {
- throw Exception(QPID_MSG("Cannot resolve " << host << ": " << ::gai_strerror(n)));
- }
-
+ SocketAddress sa(host, boost::lexical_cast<std::string>(port));
Rdma::Connector* c = new Rdma::Connector(
- *res->ai_addr,
+ sa,
Rdma::ConnectionParams(maxFrameSize, Rdma::DEFAULT_WR_ENTRIES),
boost::bind(&RdmaConnector::connected, this, poller, _1, _2),
boost::bind(&RdmaConnector::connectionError, this, poller, _1, _2),
Modified: qpid/branches/java-network-refactor/qpid/cpp/src/qpid/client/Sasl.h
URL: http://svn.apache.org/viewvc/qpid/branches/java-network-refactor/qpid/cpp/src/qpid/client/Sasl.h?rev=824198&r1=824197&r2=824198&view=diff
==============================================================================
--- qpid/branches/java-network-refactor/qpid/cpp/src/qpid/client/Sasl.h (original)
+++ qpid/branches/java-network-refactor/qpid/cpp/src/qpid/client/Sasl.h Sun Oct 11 23:22:08 2009
@@ -45,6 +45,7 @@
virtual std::string start(const std::string& mechanisms) = 0;
virtual std::string step(const std::string& challenge) = 0;
virtual std::string getMechanism() = 0;
+ virtual std::string getUserId() = 0;
virtual std::auto_ptr<qpid::sys::SecurityLayer> getSecurityLayer(uint16_t maxFrameSize) = 0;
virtual ~Sasl() {}
};
Modified: qpid/branches/java-network-refactor/qpid/cpp/src/qpid/client/SaslFactory.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-network-refactor/qpid/cpp/src/qpid/client/SaslFactory.cpp?rev=824198&r1=824197&r2=824198&view=diff
==============================================================================
--- qpid/branches/java-network-refactor/qpid/cpp/src/qpid/client/SaslFactory.cpp (original)
+++ qpid/branches/java-network-refactor/qpid/cpp/src/qpid/client/SaslFactory.cpp Sun Oct 11 23:22:08 2009
@@ -82,6 +82,7 @@
std::string start(const std::string& mechanisms);
std::string step(const std::string& challenge);
std::string getMechanism();
+ std::string getUserId();
std::auto_ptr<SecurityLayer> getSecurityLayer(uint16_t maxFrameSize);
private:
sasl_conn_t* conn;
@@ -266,6 +267,18 @@
return mechanism;
}
+std::string CyrusSasl::getUserId()
+{
+ int propResult;
+ const void* operName;
+
+ propResult = sasl_getprop(conn, SASL_USERNAME, &operName);
+ if (propResult == SASL_OK)
+ return std::string((const char*) operName);
+
+ return std::string();
+}
+
void CyrusSasl::interact(sasl_interact_t* client_interact)
{
Modified: qpid/branches/java-network-refactor/qpid/cpp/src/qpid/client/SessionImpl.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-network-refactor/qpid/cpp/src/qpid/client/SessionImpl.cpp?rev=824198&r1=824197&r2=824198&view=diff
==============================================================================
--- qpid/branches/java-network-refactor/qpid/cpp/src/qpid/client/SessionImpl.cpp (original)
+++ qpid/branches/java-network-refactor/qpid/cpp/src/qpid/client/SessionImpl.cpp Sun Oct 11 23:22:08 2009
@@ -64,7 +64,8 @@
proxy(ioHandler),
nextIn(0),
nextOut(0),
- sendMsgCredit(0)
+ sendMsgCredit(0),
+ doClearDeliveryPropertiesExchange(true)
{
channel.next = connectionShared.get();
}
@@ -396,11 +397,16 @@
{
AMQFrame header(content.getHeader());
- // Client is not allowed to set the delivery-properties.exchange.
- AMQHeaderBody* headerp = static_cast<AMQHeaderBody*>(header.getBody());
- if (headerp && headerp->get<DeliveryProperties>())
- headerp->get<DeliveryProperties>(true)->clearExchangeFlag();
-
+ // doClearDeliveryPropertiesExchange is set by cluster update client so
+ // it can send messages with delivery-properties.exchange set.
+ //
+ if (doClearDeliveryPropertiesExchange) {
+ // Normal client is not allowed to set the delivery-properties.exchange
+ // so clear it here.
+ AMQHeaderBody* headerp = static_cast<AMQHeaderBody*>(header.getBody());
+ if (headerp && headerp->get<DeliveryProperties>())
+ headerp->get<DeliveryProperties>(true)->clearExchangeFlag();
+ }
header.setFirstSegment(false);
uint64_t data_length = content.getData().length();
if(data_length > 0){
Modified: qpid/branches/java-network-refactor/qpid/cpp/src/qpid/client/SessionImpl.h
URL: http://svn.apache.org/viewvc/qpid/branches/java-network-refactor/qpid/cpp/src/qpid/client/SessionImpl.h?rev=824198&r1=824197&r2=824198&view=diff
==============================================================================
--- qpid/branches/java-network-refactor/qpid/cpp/src/qpid/client/SessionImpl.h (original)
+++ qpid/branches/java-network-refactor/qpid/cpp/src/qpid/client/SessionImpl.h Sun Oct 11 23:22:08 2009
@@ -130,6 +130,8 @@
*/
boost::shared_ptr<ConnectionImpl> getConnection();
+ void setDoClearDeliveryPropertiesExchange(bool b=true) { doClearDeliveryPropertiesExchange = b; }
+
private:
enum State {
INACTIVE,
@@ -243,6 +245,8 @@
// Only keep track of message credit
sys::Semaphore* sendMsgCredit;
+ bool doClearDeliveryPropertiesExchange;
+
friend class client::SessionHandler;
};
Modified: qpid/branches/java-network-refactor/qpid/cpp/src/qpid/client/amqp0_10/AddressResolution.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-network-refactor/qpid/cpp/src/qpid/client/amqp0_10/AddressResolution.cpp?rev=824198&r1=824197&r2=824198&view=diff
==============================================================================
--- qpid/branches/java-network-refactor/qpid/cpp/src/qpid/client/amqp0_10/AddressResolution.cpp (original)
+++ qpid/branches/java-network-refactor/qpid/cpp/src/qpid/client/amqp0_10/AddressResolution.cpp Sun Oct 11 23:22:08 2009
@@ -362,21 +362,12 @@
void QueueSink::cancel(qpid::client::AsyncSession&, const std::string&) {}
-template <class T> void encode(qpid::messaging::Message& from)
-{
- T codec;
- from.encode(codec);
- from.setContentType(T::contentType);
-}
-
void translate(const Variant::Map& from, FieldTable& to);//implementation in Codecs.cpp
void convert(qpid::messaging::Message& from, qpid::client::Message& to)
{
//TODO: need to avoid copying as much as possible
- if (from.getContent().isList()) encode<ListCodec>(from);
- if (from.getContent().isMap()) encode<MapCodec>(from);
- to.setData(from.getBytes());
+ to.setData(from.getContent());
to.getDeliveryProperties().setRoutingKey(from.getSubject());
//TODO: set other delivery properties
to.getMessageProperties().setContentType(from.getContentType());
Modified: qpid/branches/java-network-refactor/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-network-refactor/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp?rev=824198&r1=824197&r2=824198&view=diff
==============================================================================
--- qpid/branches/java-network-refactor/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp (original)
+++ qpid/branches/java-network-refactor/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp Sun Oct 11 23:22:08 2009
@@ -269,18 +269,9 @@
//e.g. for rejecting.
MessageImplAccess::get(message).setInternalId(command.getId());
- command.getContent(message.getBytes());
+ command.getContent(message.getContent());
populateHeaders(message, command.getHeaders());
-
- //decode content if necessary
- if (message.getContentType() == ListCodec::contentType) {
- ListCodec codec;
- message.decode(codec);
- } else if (message.getContentType() == MapCodec::contentType) {
- MapCodec codec;
- message.decode(codec);
- }
}
Modified: qpid/branches/java-network-refactor/qpid/cpp/src/qpid/client/amqp0_10/OutgoingMessage.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-network-refactor/qpid/cpp/src/qpid/client/amqp0_10/OutgoingMessage.cpp?rev=824198&r1=824197&r2=824198&view=diff
==============================================================================
--- qpid/branches/java-network-refactor/qpid/cpp/src/qpid/client/amqp0_10/OutgoingMessage.cpp (original)
+++ qpid/branches/java-network-refactor/qpid/cpp/src/qpid/client/amqp0_10/OutgoingMessage.cpp Sun Oct 11 23:22:08 2009
@@ -33,24 +33,11 @@
using qpid::messaging::Address;
using qpid::messaging::MessageImplAccess;
-template <class T> void encode(const qpid::messaging::Message& from, qpid::client::Message& to)
-{
- T codec;
- MessageImplAccess::get(from).getEncodedContent(codec, to.getData());
- to.getMessageProperties().setContentType(T::contentType);
-}
-
void OutgoingMessage::convert(const qpid::messaging::Message& from)
{
//TODO: need to avoid copying as much as possible
- if (from.getContent().isList()) {
- encode<ListCodec>(from, message);
- } else if (from.getContent().isMap()) {
- encode<MapCodec>(from, message);
- } else {
- message.setData(from.getBytes());
- message.getMessageProperties().setContentType(from.getContentType());
- }
+ message.setData(from.getContent());
+ message.getMessageProperties().setContentType(from.getContentType());
const Address& address = from.getReplyTo();
if (!address.value.empty()) {
message.getMessageProperties().setReplyTo(AddressResolution::convert(address));
Modified: qpid/branches/java-network-refactor/qpid/cpp/src/qpid/client/windows/SaslFactory.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-network-refactor/qpid/cpp/src/qpid/client/windows/SaslFactory.cpp?rev=824198&r1=824197&r2=824198&view=diff
==============================================================================
--- qpid/branches/java-network-refactor/qpid/cpp/src/qpid/client/windows/SaslFactory.cpp (original)
+++ qpid/branches/java-network-refactor/qpid/cpp/src/qpid/client/windows/SaslFactory.cpp Sun Oct 11 23:22:08 2009
@@ -43,6 +43,7 @@
std::string start(const std::string& mechanisms);
std::string step(const std::string& challenge);
std::string getMechanism();
+ std::string getUserId();
std::auto_ptr<SecurityLayer> getSecurityLayer(uint16_t maxFrameSize);
private:
ConnectionSettings settings;
@@ -131,6 +132,11 @@
return mechanism;
}
+std::string WindowsSasl::getUserId()
+{
+ return std::string(); // TODO - when GSSAPI is supported, return userId for connection.
+}
+
std::auto_ptr<SecurityLayer> WindowsSasl::getSecurityLayer(uint16_t maxFrameSize)
{
return std::auto_ptr<SecurityLayer>(0);
Modified: qpid/branches/java-network-refactor/qpid/cpp/src/qpid/cluster/Cluster.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-network-refactor/qpid/cpp/src/qpid/cluster/Cluster.cpp?rev=824198&r1=824197&r2=824198&view=diff
==============================================================================
--- qpid/branches/java-network-refactor/qpid/cpp/src/qpid/cluster/Cluster.cpp (original)
+++ qpid/branches/java-network-refactor/qpid/cpp/src/qpid/cluster/Cluster.cpp Sun Oct 11 23:22:08 2009
@@ -99,6 +99,7 @@
#include "qpid/broker/Connection.h"
#include "qpid/broker/QueueRegistry.h"
#include "qpid/broker/SessionState.h"
+#include "qpid/broker/SignalHandler.h"
#include "qpid/framing/AMQFrame.h"
#include "qpid/framing/AMQP_AllOperations.h"
#include "qpid/framing/AllInvoker.h"
@@ -120,7 +121,6 @@
#include "qpid/management/ManagementAgent.h"
#include "qpid/memory.h"
#include "qpid/sys/Thread.h"
-#include "qpid/sys/LatencyTracker.h"
#include <boost/shared_ptr.hpp>
#include <boost/bind.hpp>
@@ -144,12 +144,16 @@
using qpid::management::Args;
namespace _qmf = ::qmf::org::apache::qpid::cluster;
-/** NOTE: increment this number whenever any incompatible changes in
+/**
+ * NOTE: must increment this number whenever any incompatible changes in
* cluster protocol/behavior are made. It allows early detection and
* sensible reporting of an attempt to mix different versions in a
* cluster.
+ *
+ * Currently use SVN revision to avoid clashes with versions from
+ * different branches.
*/
-const uint32_t Cluster::CLUSTER_VERSION = 2;
+const uint32_t Cluster::CLUSTER_VERSION = 820783;
struct ClusterDispatcher : public framing::AMQP_AllOperations::ClusterHandler {
qpid::cluster::Cluster& cluster;
@@ -308,7 +312,7 @@
// Finalize connections now now to avoid problems later in destructor.
LEAVE_TRY(localConnections.clear());
LEAVE_TRY(connections.clear());
- LEAVE_TRY(broker.shutdown());
+ LEAVE_TRY(broker::SignalHandler::shutdown());
}
}
@@ -324,20 +328,14 @@
MemberId from(nodeid, pid);
framing::Buffer buf(static_cast<char*>(msg), msg_len);
Event e(Event::decodeCopy(from, buf));
- LATENCY_TRACK(if (e.getConnectionId().getMember() == self) mcast.cpgLatency.finish());
deliverEvent(e);
}
-LATENCY_TRACK(sys::LatencyTracker<const char*> eventQueueLatencyTracker("EventQueue");)
- LATENCY_TRACK(sys::LatencyTracker<const AMQBody*> frameQueueLatencyTracker("FrameQueue");)
-
- void Cluster::deliverEvent(const Event& e) {
- LATENCY_TRACK(eventQueueLatencyTracker.start(e.getData());)
- deliverEventQueue.push(e);
+void Cluster::deliverEvent(const Event& e) {
+ deliverEventQueue.push(e);
}
void Cluster::deliverFrame(const EventFrame& e) {
- LATENCY_TRACK(frameQueueLatencyTracker.start(e.frame.getBody()));
deliverFrameQueue.push(e);
}
@@ -350,7 +348,6 @@
// Handler for deliverEventQueue.
// This thread decodes frames from events.
void Cluster::deliveredEvent(const Event& e) {
- LATENCY_TRACK(eventQueueLatencyTracker.finish(e.getData()));
if (e.isCluster()) {
QPID_LOG(trace, *this << " DLVR: " << e);
EventFrame ef(e, e.getFrame());
@@ -396,13 +393,9 @@
error.error(connection, type, map.getFrameSeq(), map.getMembers(), msg);
}
-LATENCY_TRACK(sys::LatencyTracker<const AMQBody*> doOutputTracker("DoOutput");)
-
// Handler for deliverFrameQueue.
// This thread executes the main logic.
- void Cluster::deliveredFrame(const EventFrame& efConst) {
- LATENCY_TRACK(frameQueueLatencyTracker.finish(e.frame.getBody()));
- LATENCY_TRACK(if (e.frame.getBody()->type() == CONTENT_BODY) doOutputTracker.start(e.frame.getBody()));
+void Cluster::deliveredFrame(const EventFrame& efConst) {
Mutex::ScopedLock l(lock);
if (state == LEFT) return;
EventFrame e(efConst);
@@ -434,7 +427,6 @@
throw Exception(QPID_MSG("Invalid cluster control"));
}
else if (state >= CATCHUP) {
- LATENCY_TRACK(LatencyScope ls(processLatency));
map.incrementFrameSeq();
ConnectionPtr connection = getConnection(e, l);
if (connection) {
Modified: qpid/branches/java-network-refactor/qpid/cpp/src/qpid/cluster/ErrorCheck.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-network-refactor/qpid/cpp/src/qpid/cluster/ErrorCheck.cpp?rev=824198&r1=824197&r2=824198&view=diff
==============================================================================
--- qpid/branches/java-network-refactor/qpid/cpp/src/qpid/cluster/ErrorCheck.cpp (original)
+++ qpid/branches/java-network-refactor/qpid/cpp/src/qpid/cluster/ErrorCheck.cpp Sun Oct 11 23:22:08 2009
@@ -45,7 +45,8 @@
}
void ErrorCheck::error(
- Connection& c, ErrorType t, framing::SequenceNumber seq, const MemberSet& ms, const std::string& msg)
+ Connection& c, ErrorType t, framing::SequenceNumber seq, const MemberSet& ms,
+ const std::string& msg)
{
// Detected a local error, inform cluster and set error state.
assert(t != ERROR_TYPE_NONE); // Must be an error.
@@ -54,10 +55,11 @@
unresolved = ms;
frameSeq = seq;
connection = &c;
- QPID_LOG(error, cluster
- << (type == ERROR_TYPE_SESSION ? " channel" : " connection")
- << " error " << frameSeq << " on " << c << ": " << msg
- << " must be resolved with: " << unresolved);
+ message = msg;
+ QPID_LOG(debug, cluster<< (type == ERROR_TYPE_SESSION ? " channel" : " connection")
+ << " error " << frameSeq << " on " << c
+ << " must be resolved with: " << unresolved
+ << ": " << message);
mcast.mcastControl(
ClusterErrorCheckBody(ProtocolVersion(), type, frameSeq), cluster.getId());
// If there are already frames queued up by a previous error, review
@@ -84,13 +86,15 @@
if (errorCheck->getFrameSeq() == frameSeq) { // Addresses current error
next = frames.erase(i); // Drop matching error check controls
if (errorCheck->getType() < type) { // my error is worse than his
- QPID_LOG(critical, cluster << " error " << frameSeq
- << " did not occur on " << i->getMemberId());
- throw Exception(QPID_MSG("Error " << frameSeq
- << " did not occur on all members"));
+ QPID_LOG(critical, cluster
+ << " local error " << frameSeq << " did not occur on member "
+ << i->getMemberId()
+ << ": " << message);
+ throw Exception(
+ QPID_MSG("local error did not occur on all cluster members " << ": " << message));
}
else { // his error is worse/same as mine.
- QPID_LOG(info, cluster << " error " << frameSeq
+ QPID_LOG(debug, cluster << " error " << frameSeq
<< " resolved with " << i->getMemberId());
unresolved.erase(i->getMemberId());
checkResolved();
@@ -128,10 +132,10 @@
void ErrorCheck::checkResolved() {
if (unresolved.empty()) { // No more potentially conflicted members, we're clear.
type = ERROR_TYPE_NONE;
- QPID_LOG(info, cluster << " error " << frameSeq << " resolved.");
+ QPID_LOG(debug, cluster << " error " << frameSeq << " resolved.");
}
else
- QPID_LOG(info, cluster << " error " << frameSeq
+ QPID_LOG(debug, cluster << " error " << frameSeq
<< " must be resolved with " << unresolved);
}
@@ -146,7 +150,7 @@
// Don't respond to non-errors or to my own errors.
if (type == ERROR_TYPE_NONE || from == cluster.getId())
return;
- QPID_LOG(info, cluster << " error " << frameSeq << " did not occur locally.");
+ QPID_LOG(debug, cluster << " error " << frameSeq << " did not occur locally.");
mcast.mcastControl(
ClusterErrorCheckBody(ProtocolVersion(), ERROR_TYPE_NONE, frameSeq),
cluster.getId()
Modified: qpid/branches/java-network-refactor/qpid/cpp/src/qpid/cluster/ErrorCheck.h
URL: http://svn.apache.org/viewvc/qpid/branches/java-network-refactor/qpid/cpp/src/qpid/cluster/ErrorCheck.h?rev=824198&r1=824197&r2=824198&view=diff
==============================================================================
--- qpid/branches/java-network-refactor/qpid/cpp/src/qpid/cluster/ErrorCheck.h (original)
+++ qpid/branches/java-network-refactor/qpid/cpp/src/qpid/cluster/ErrorCheck.h Sun Oct 11 23:22:08 2009
@@ -84,6 +84,7 @@
SequenceNumber frameSeq;
ErrorType type;
Connection* connection;
+ std::string message;
};
}} // namespace qpid::cluster
Modified: qpid/branches/java-network-refactor/qpid/cpp/src/qpid/cluster/Event.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-network-refactor/qpid/cpp/src/qpid/cluster/Event.cpp?rev=824198&r1=824197&r2=824198&view=diff
==============================================================================
--- qpid/branches/java-network-refactor/qpid/cpp/src/qpid/cluster/Event.cpp (original)
+++ qpid/branches/java-network-refactor/qpid/cpp/src/qpid/cluster/Event.cpp Sun Oct 11 23:22:08 2009
@@ -38,9 +38,6 @@
sizeof(uint8_t) + // type
sizeof(uint64_t) + // connection pointer only, CPG provides member ID.
sizeof(uint32_t) // payload size
-#ifdef QPID_LATENCY_METRIC
- + sizeof(int64_t) // timestamp
-#endif
;
EventHeader::EventHeader(EventType t, const ConnectionId& c, size_t s)
@@ -61,9 +58,6 @@
throw Exception("Invalid multicast event type");
connectionId = ConnectionId(m, buf.getLongLong());
size = buf.getLong();
-#ifdef QPID_LATENCY_METRIC
- latency_metric_timestamp = buf.getLongLong();
-#endif
}
Event Event::decodeCopy(const MemberId& m, framing::Buffer& buf) {
@@ -97,9 +91,6 @@
b.putOctet(type);
b.putLongLong(connectionId.getNumber());
b.putLong(size);
-#ifdef QPID_LATENCY_METRIC
- b.putLongLong(latency_metric_timestamp);
-#endif
}
// Encode my header in my buffer.
Modified: qpid/branches/java-network-refactor/qpid/cpp/src/qpid/cluster/Multicaster.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-network-refactor/qpid/cpp/src/qpid/cluster/Multicaster.cpp?rev=824198&r1=824197&r2=824198&view=diff
==============================================================================
--- qpid/branches/java-network-refactor/qpid/cpp/src/qpid/cluster/Multicaster.cpp (original)
+++ qpid/branches/java-network-refactor/qpid/cpp/src/qpid/cluster/Multicaster.cpp Sun Oct 11 23:22:08 2009
@@ -31,9 +31,6 @@
Multicaster::Multicaster(Cpg& cpg_,
const boost::shared_ptr<sys::Poller>& poller,
boost::function<void()> onError_) :
-#if defined (QPID_LATENCY_TRACKER)
- cpgLatency("CPG"),
-#endif
onError(onError_), cpg(cpg_),
queue(boost::bind(&Multicaster::sendMcast, this, _1), poller),
holding(true)
@@ -61,7 +58,6 @@
void Multicaster::mcast(const Event& e) {
{
sys::Mutex::ScopedLock l(lock);
- LATENCY_TRACK(cpgLatency.start());
if (e.isConnection() && holding) {
holdingQueue.push_back(e);
return;
Modified: qpid/branches/java-network-refactor/qpid/cpp/src/qpid/cluster/Multicaster.h
URL: http://svn.apache.org/viewvc/qpid/branches/java-network-refactor/qpid/cpp/src/qpid/cluster/Multicaster.h?rev=824198&r1=824197&r2=824198&view=diff
==============================================================================
--- qpid/branches/java-network-refactor/qpid/cpp/src/qpid/cluster/Multicaster.h (original)
+++ qpid/branches/java-network-refactor/qpid/cpp/src/qpid/cluster/Multicaster.h Sun Oct 11 23:22:08 2009
@@ -26,7 +26,6 @@
#include "qpid/cluster/Event.h"
#include "qpid/sys/PollableQueue.h"
#include "qpid/sys/Mutex.h"
-#include "qpid/sys/LatencyTracker.h"
#include <boost/shared_ptr.hpp>
#include <deque>
@@ -58,8 +57,6 @@
/** End holding mode, held events are mcast */
void release();
- LATENCY_TRACK(sys::LatencyCounter cpgLatency;)
-
private:
typedef sys::PollableQueue<Event> PollableEventQueue;
typedef std::deque<Event> PlainEventQueue;
Modified: qpid/branches/java-network-refactor/qpid/cpp/src/qpid/cluster/OutputInterceptor.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-network-refactor/qpid/cpp/src/qpid/cluster/OutputInterceptor.cpp?rev=824198&r1=824197&r2=824198&view=diff
==============================================================================
--- qpid/branches/java-network-refactor/qpid/cpp/src/qpid/cluster/OutputInterceptor.cpp (original)
+++ qpid/branches/java-network-refactor/qpid/cpp/src/qpid/cluster/OutputInterceptor.cpp Sun Oct 11 23:22:08 2009
@@ -24,7 +24,6 @@
#include "qpid/framing/ClusterConnectionDeliverDoOutputBody.h"
#include "qpid/framing/AMQFrame.h"
#include "qpid/log/Statement.h"
-#include "qpid/sys/LatencyTracker.h"
#include <boost/current_function.hpp>
@@ -40,16 +39,9 @@
: parent(p), closing(false), next(&h), sendMax(1), sent(0), sentDoOutput(false)
{}
-#if defined QPID_LATENCY_TRACKER
-extern sys::LatencyTracker<const AMQBody*> doOutputTracker;
-#endif
-
void OutputInterceptor::send(framing::AMQFrame& f) {
- LATENCY_TRACK(doOutputTracker.finish(f.getBody()));
- {
- sys::Mutex::ScopedLock l(lock);
- next->send(f);
- }
+ sys::Mutex::ScopedLock l(lock);
+ next->send(f);
}
void OutputInterceptor::activateOutput() {
Modified: qpid/branches/java-network-refactor/qpid/cpp/src/qpid/cluster/UpdateClient.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-network-refactor/qpid/cpp/src/qpid/cluster/UpdateClient.cpp?rev=824198&r1=824197&r2=824198&view=diff
==============================================================================
--- qpid/branches/java-network-refactor/qpid/cpp/src/qpid/cluster/UpdateClient.cpp (original)
+++ qpid/branches/java-network-refactor/qpid/cpp/src/qpid/cluster/UpdateClient.cpp Sun Oct 11 23:22:08 2009
@@ -209,9 +209,16 @@
ClusterConnectionProxy(session).expiryId(*expiryId);
}
+ // We can't send a broker::Message via the normal client API,
+ // and it would be expensive to copy it into a client::Message
+ // so we go a bit under the client API covers here.
+ //
SessionBase_0_10Access sb(session);
+ // Disable client code that clears the delivery-properties.exchange
+ sb.get()->setDoClearDeliveryPropertiesExchange(false);
framing::MessageTransferBody transfer(
- framing::ProtocolVersion(), UpdateClient::UPDATE, message::ACCEPT_MODE_NONE, message::ACQUIRE_MODE_PRE_ACQUIRED);
+ framing::ProtocolVersion(), UpdateClient::UPDATE, message::ACCEPT_MODE_NONE,
+ message::ACQUIRE_MODE_PRE_ACQUIRED);
sb.get()->send(transfer, message.payload->getFrames(), !message.payload->isContentReleased());
if (message.payload->isContentReleased()){
Propchange: qpid/branches/java-network-refactor/qpid/cpp/src/qpid/cluster/UpdateClient.cpp
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Sun Oct 11 23:22:08 2009
@@ -1,2 +1,2 @@
/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.cpp:443187-726139
-/qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.cpp:805429-816233
+/qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.cpp:805429-824132
Propchange: qpid/branches/java-network-refactor/qpid/cpp/src/qpid/cluster/UpdateClient.h
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Sun Oct 11 23:22:08 2009
@@ -1,2 +1,2 @@
/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.h:443187-726139
-/qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.h:805429-816233
+/qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.h:805429-824132
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org