You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by gs...@apache.org on 2008/09/09 19:15:17 UTC
svn commit: r693518 - in /incubator/qpid/trunk/qpid/cpp/src: qpid/broker/
tests/
Author: gsim
Date: Tue Sep 9 10:15:17 2008
New Revision: 693518
URL: http://svn.apache.org/viewvc?rev=693518&view=rev
Log:
QPID-1261: initial fix (this degrades performance for shared queues with more than one consumer; I'll work on fixing that asap). This also moves the lock refered to in QQPID-1265 which I will update accordingly.
Modified:
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Consumer.h
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.h
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.h
incubator/qpid/trunk/qpid/cpp/src/tests/QueueTest.cpp
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Consumer.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Consumer.h?rev=693518&r1=693517&r2=693518&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Consumer.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Consumer.h Tue Sep 9 10:15:17 2008
@@ -47,7 +47,7 @@
class Consumer {
const bool acquires;
public:
- typedef shared_ptr<Consumer> ptr;
+ typedef shared_ptr<Consumer> shared_ptr;
framing::SequenceNumber position;
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp?rev=693518&r1=693517&r2=693518&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp Tue Sep 9 10:15:17 2008
@@ -90,8 +90,12 @@
void Queue::notifyDurableIOComplete()
{
- Mutex::ScopedLock locker(messageLock);
- notify();
+ Listeners copy;
+ {
+ Mutex::ScopedLock locker(messageLock);
+ listeners.swap(copy);
+ }
+ for_each(copy.begin(), copy.end(), boost::mem_fn(&Consumer::notify));
}
bool isLocalTo(const OwnershipToken* token, boost::intrusive_ptr<Message>& msg)
@@ -181,10 +185,14 @@
}
void Queue::requeue(const QueuedMessage& msg){
- Mutex::ScopedLock locker(messageLock);
- msg.payload->enqueueComplete(); // mark the message as enqueued
- messages.push_front(msg);
- notify();
+ Listeners copy;
+ {
+ Mutex::ScopedLock locker(messageLock);
+ msg.payload->enqueueComplete(); // mark the message as enqueued
+ messages.push_front(msg);
+ listeners.swap(copy);
+ }
+ for_each(copy.begin(), copy.end(), boost::mem_fn(&Consumer::notify));
}
bool Queue::acquire(const QueuedMessage& msg) {
@@ -203,16 +211,16 @@
return false;
}
-bool Queue::getNextMessage(QueuedMessage& m, Consumer& c)
+bool Queue::getNextMessage(QueuedMessage& m, Consumer::shared_ptr c)
{
- if (c.preAcquires()) {
+ if (c->preAcquires()) {
return consumeNextMessage(m, c);
} else {
return browseNextMessage(m, c);
}
}
-bool Queue::checkForMessages(Consumer& c)
+bool Queue::checkForMessages(Consumer::shared_ptr c)
{
Mutex::ScopedLock locker(messageLock);
if (messages.empty()) {
@@ -233,12 +241,12 @@
//message (if it does not, no need to register it for
//notification as the consumer itself will handle the
//credit allocation required to change this condition).
- return c.accept(msg.payload);
+ return c->accept(msg.payload);
}
}
}
-bool Queue::consumeNextMessage(QueuedMessage& m, Consumer& c)
+bool Queue::consumeNextMessage(QueuedMessage& m, Consumer::shared_ptr c)
{
while (true) {
Mutex::ScopedLock locker(messageLock);
@@ -254,8 +262,8 @@
return false;
}
- if (c.filter(msg.payload)) {
- if (c.accept(msg.payload)) {
+ if (c->filter(msg.payload)) {
+ if (c->accept(msg.payload)) {
m = msg;
messages.pop_front();
return true;
@@ -274,14 +282,14 @@
}
-bool Queue::browseNextMessage(QueuedMessage& m, Consumer& c)
+bool Queue::browseNextMessage(QueuedMessage& m, Consumer::shared_ptr c)
{
QueuedMessage msg(this);
while (seek(msg, c)) {
- if (c.filter(msg.payload)) {
- if (c.accept(msg.payload)) {
+ if (c->filter(msg.payload)) {
+ if (c->accept(msg.payload)) {
//consumer wants the message
- c.position = msg.position;
+ c->position = msg.position;
m = msg;
return true;
} else {
@@ -291,59 +299,47 @@
}
} else {
//consumer will never want this message, continue seeking
- c.position = msg.position;
+ c->position = msg.position;
QPID_LOG(debug, "Browser skipping message from '" << name << "'");
}
}
return false;
}
-/**
- * notify listeners that there may be messages to process
- */
-void Queue::notify()
-{
- if (listeners.empty()) return;
-
- Listeners copy(listeners);
- listeners.clear();
- for_each(copy.begin(), copy.end(), mem_fun(&Consumer::notify));
-}
-
-void Queue::removeListener(Consumer& c)
+void Queue::removeListener(Consumer::shared_ptr c)
{
Mutex::ScopedLock locker(messageLock);
- Listeners::iterator i = std::find(listeners.begin(), listeners.end(), &c);
+ Listeners::iterator i = std::find(listeners.begin(), listeners.end(), c);
if (i != listeners.end()) listeners.erase(i);
}
-void Queue::addListener(Consumer& c)
+void Queue::addListener(Consumer::shared_ptr c)
{
- Listeners::iterator i = std::find(listeners.begin(), listeners.end(), &c);
- if (i == listeners.end()) listeners.push_back(&c);
+ Listeners::iterator i = std::find(listeners.begin(), listeners.end(), c);
+ if (i == listeners.end()) listeners.push_back(c);
}
-bool Queue::dispatch(Consumer& c)
+bool Queue::dispatch(Consumer::shared_ptr c)
{
QueuedMessage msg(this);
if (getNextMessage(msg, c)) {
- c.deliver(msg);
+ c->deliver(msg);
return true;
} else {
return false;
}
}
-bool Queue::seek(QueuedMessage& msg, Consumer& c) {
+bool Queue::seek(QueuedMessage& msg, Consumer::shared_ptr c) {
Mutex::ScopedLock locker(messageLock);
- if (!messages.empty() && messages.back().position > c.position) {
- if (c.position < messages.front().position) {
+ if (!messages.empty() && messages.back().position > c->position) {
+ if (c->position < messages.front().position) {
msg = messages.front();
return true;
} else {
//TODO: can improve performance of this search, for now just searching linearly from end
Messages::reverse_iterator pos;
- for (Messages::reverse_iterator i = messages.rbegin(); i != messages.rend() && i->position > c.position; i++) {
+ for (Messages::reverse_iterator i = messages.rbegin(); i != messages.rend() && i->position > c->position; i++) {
pos = i;
}
msg = *pos;
@@ -354,7 +350,7 @@
return false;
}
-void Queue::consume(Consumer& c, bool requestExclusive){
+void Queue::consume(Consumer::shared_ptr c, bool requestExclusive){
Mutex::ScopedLock locker(consumerLock);
if(exclusive) {
throw ResourceLockedException(
@@ -364,7 +360,7 @@
throw ResourceLockedException(
QPID_MSG("Queue " << getName() << " already has consumers. Exclusive access denied."));
} else {
- exclusive = c.getSession();
+ exclusive = c->getSession();
}
}
consumerCount++;
@@ -372,7 +368,7 @@
mgmtObject->inc_consumerCount ();
}
-void Queue::cancel(Consumer& c){
+void Queue::cancel(Consumer::shared_ptr c){
removeListener(c);
Mutex::ScopedLock locker(consumerLock);
consumerCount--;
@@ -415,35 +411,40 @@
}
void Queue::push(boost::intrusive_ptr<Message>& msg){
- Mutex::ScopedLock locker(messageLock);
- messages.push_back(QueuedMessage(this, msg, ++sequence));
- if (policy.get()) {
- policy->enqueued(msg->contentSize());
- if (policy->limitExceeded()) {
- if (!policyExceeded) {
- policyExceeded = true;
- QPID_LOG(info, "Queue size exceeded policy for " << name);
- }
- if (store) {
- QPID_LOG(debug, "Message " << msg << " on " << name << " released from memory");
- msg->releaseContent(store);
+ Listeners copy;
+ {
+ Mutex::ScopedLock locker(messageLock);
+ messages.push_back(QueuedMessage(this, msg, ++sequence));
+ if (policy.get()) {
+ policy->enqueued(msg->contentSize());
+ if (policy->limitExceeded()) {
+ if (!policyExceeded) {
+ policyExceeded = true;
+ QPID_LOG(info, "Queue size exceeded policy for " << name);
+ }
+ if (store) {
+ QPID_LOG(debug, "Message " << msg << " on " << name << " released from memory");
+ msg->releaseContent(store);
+ } else {
+ QPID_LOG(error, "Message " << msg << " on " << name
+ << " exceeds the policy for the queue but can't be released from memory as the queue is not durable");
+ throw ResourceLimitExceededException(QPID_MSG("Policy exceeded for " << name << " " << *policy));
+ }
} else {
- QPID_LOG(error, "Message " << msg << " on " << name
- << " exceeds the policy for the queue but can't be released from memory as the queue is not durable");
- throw ResourceLimitExceededException(QPID_MSG("Policy exceeded for " << name << " " << *policy));
- }
- } else {
- if (policyExceeded) {
- policyExceeded = false;
- QPID_LOG(info, "Queue size within policy for " << name);
+ if (policyExceeded) {
+ policyExceeded = false;
+ QPID_LOG(info, "Queue size within policy for " << name);
+ }
}
}
+ listeners.swap(copy);
}
- notify();
+ for_each(copy.begin(), copy.end(), boost::mem_fn(&Consumer::notify));
}
/** function only provided for unit tests, or code not in critical message path */
-uint32_t Queue::getMessageCount() const{
+uint32_t Queue::getMessageCount() const
+{
Mutex::ScopedLock locker(messageLock);
uint32_t count =0;
@@ -454,12 +455,14 @@
return count;
}
-uint32_t Queue::getConsumerCount() const{
+uint32_t Queue::getConsumerCount() const
+{
Mutex::ScopedLock locker(consumerLock);
return consumerCount;
}
-bool Queue::canAutoDelete() const{
+bool Queue::canAutoDelete() const
+{
Mutex::ScopedLock locker(consumerLock);
return autodelete && !consumerCount;
}
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h?rev=693518&r1=693517&r2=693518&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h Tue Sep 9 10:15:17 2008
@@ -34,6 +34,7 @@
#include "qpid/management/Queue.h"
#include "qpid/framing/amqp_types.h"
+#include <list>
#include <vector>
#include <memory>
#include <deque>
@@ -60,7 +61,8 @@
*/
class Queue : public boost::enable_shared_from_this<Queue>,
public PersistableQueue, public management::Manageable {
- typedef qpid::InlineVector<Consumer*, 5> Listeners;
+
+ typedef std::list<Consumer::shared_ptr> Listeners;
typedef std::deque<QueuedMessage> Messages;
const string name;
@@ -88,14 +90,13 @@
void push(boost::intrusive_ptr<Message>& msg);
void setPolicy(std::auto_ptr<QueuePolicy> policy);
- bool seek(QueuedMessage& msg, Consumer& position);
- bool getNextMessage(QueuedMessage& msg, Consumer& c);
- bool consumeNextMessage(QueuedMessage& msg, Consumer& c);
- bool browseNextMessage(QueuedMessage& msg, Consumer& c);
-
- void notify();
- void removeListener(Consumer&);
- void addListener(Consumer&);
+ bool seek(QueuedMessage& msg, Consumer::shared_ptr position);
+ bool getNextMessage(QueuedMessage& msg, Consumer::shared_ptr c);
+ bool consumeNextMessage(QueuedMessage& msg, Consumer::shared_ptr c);
+ bool browseNextMessage(QueuedMessage& msg, Consumer::shared_ptr c);
+
+ void removeListener(Consumer::shared_ptr);
+ void addListener(Consumer::shared_ptr);
bool isExcluded(boost::intrusive_ptr<Message>& msg);
@@ -115,14 +116,14 @@
management::Manageable* parent = 0);
~Queue();
- bool dispatch(Consumer&);
+ bool dispatch(Consumer::shared_ptr);
/**
* Check whether there would be a message available for
* dispatch to this consumer. If not, the consumer will be
* notified of events that may have changed this
* situation.
*/
- bool checkForMessages(Consumer&);
+ bool checkForMessages(Consumer::shared_ptr);
void create(const qpid::framing::FieldTable& settings);
void configure(const qpid::framing::FieldTable& settings);
@@ -154,8 +155,8 @@
*/
void recover(boost::intrusive_ptr<Message>& msg);
- void consume(Consumer& c, bool exclusive = false);
- void cancel(Consumer& c);
+ void consume(Consumer::shared_ptr c, bool exclusive = false);
+ void cancel(Consumer::shared_ptr c);
uint32_t purge(const uint32_t purge_request = 0); //defaults to all messages
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp?rev=693518&r1=693517&r2=693518&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp Tue Sep 9 10:15:17 2008
@@ -72,7 +72,7 @@
SemanticState::~SemanticState() {
//cancel all consumers
for (ConsumerImplMap::iterator i = consumers.begin(); i != consumers.end(); i++) {
- cancel(*ptr_map_ptr(i));
+ cancel(i->second);
}
if (dtxBuffer.get()) {
@@ -91,16 +91,16 @@
{
if(tagInOut.empty())
tagInOut = tagGenerator.generate();
- std::auto_ptr<ConsumerImpl> c(new ConsumerImpl(this, token, tagInOut, queue, ackRequired, nolocal, acquire));
- queue->consume(*c, exclusive);//may throw exception
+ ConsumerImpl::shared_ptr c(new ConsumerImpl(this, token, tagInOut, queue, ackRequired, nolocal, acquire));
+ queue->consume(c, exclusive);//may throw exception
outputTasks.addOutputTask(c.get());
- consumers.insert(tagInOut, c.release());
+ consumers[tagInOut] = c;
}
void SemanticState::cancel(const string& tag){
ConsumerImplMap::iterator i = consumers.find(tag);
if (i != consumers.end()) {
- cancel(*ptr_map_ptr(i));
+ cancel(i->second);
consumers.erase(i);
//should cancel all unacked messages for this consumer so that
//they are not redelivered on recovery
@@ -260,7 +260,8 @@
blocked(true),
windowing(true),
msgCredit(0),
- byteCredit(0){}
+ byteCredit(0),
+ notifyEnabled(true) {}
OwnershipToken* SemanticState::ConsumerImpl::getSession()
{
@@ -324,10 +325,11 @@
SemanticState::ConsumerImpl::~ConsumerImpl() {}
-void SemanticState::cancel(ConsumerImpl& c)
+void SemanticState::cancel(ConsumerImpl::shared_ptr c)
{
- outputTasks.removeOutputTask(&c);
- Queue::shared_ptr queue = c.getQueue();
+ c->disableNotify();
+ outputTasks.removeOutputTask(c.get());
+ Queue::shared_ptr queue = c->getQueue();
if(queue) {
queue->cancel(c);
if (queue->canAutoDelete() && !queue->hasExclusiveOwner()) {
@@ -358,10 +360,10 @@
cacheExchange = session.getBroker().getExchanges().get(exchangeName);
}
- if (acl && acl->doTransferAcl())
- {
- if (!acl->authorise(getSession().getConnection().getUserId(),acl::PUBLISH,acl::EXCHANGE,exchangeName, msg->getRoutingKey() ))
- throw NotAllowedException("ACL denied exhange publish request");
+ if (acl && acl->doTransferAcl())
+ {
+ if (!acl->authorise(getSession().getConnection().getUserId(),acl::PUBLISH,acl::EXCHANGE,exchangeName, msg->getRoutingKey() ))
+ throw NotAllowedException("ACL denied exhange publish request");
}
cacheExchange->route(strategy, msg->getRoutingKey(), msg->getApplicationHeaders());
@@ -382,7 +384,7 @@
void SemanticState::requestDispatch()
{
for (ConsumerImplMap::iterator i = consumers.begin(); i != consumers.end(); i++) {
- requestDispatch(*ptr_map_ptr(i));
+ requestDispatch(*(i->second));
}
}
@@ -402,7 +404,7 @@
delivery.subtractFrom(outstanding);
ConsumerImplMap::iterator i = consumers.find(delivery.getTag());
if (i != consumers.end()) {
- ptr_map_ptr(i)->complete(delivery);
+ i->second->complete(delivery);
}
}
@@ -460,7 +462,7 @@
if (i == consumers.end()) {
throw NotFoundException(QPID_MSG("Unknown destination " << destination));
} else {
- return *ptr_map_ptr(i);
+ return *(i->second);
}
}
@@ -526,7 +528,7 @@
void SemanticState::ConsumerImpl::flush()
{
- while(queue->dispatch(*this))
+ while(queue->dispatch(shared_from_this()))
;
stop();
}
@@ -591,19 +593,34 @@
}
bool SemanticState::ConsumerImpl::hasOutput() {
- return queue->checkForMessages(*this);
+ return queue->checkForMessages(shared_from_this());
}
bool SemanticState::ConsumerImpl::doOutput()
{
- //TODO: think through properly
- return queue->dispatch(*this);
+ return queue->dispatch(shared_from_this());
+}
+
+void SemanticState::ConsumerImpl::enableNotify()
+{
+ Mutex::ScopedLock l(lock);
+ notifyEnabled = true;
+}
+
+void SemanticState::ConsumerImpl::disableNotify()
+{
+ Mutex::ScopedLock l(lock);
+ notifyEnabled = true;
}
void SemanticState::ConsumerImpl::notify()
{
- //TODO: think through properly
- parent->outputTasks.activateOutput();
+ //TODO: alter this, don't want to hold locks across external
+ //calls; for now its is required to protect the notify() from
+ //having part of the object chain of the invocation being
+ //concurrently deleted
+ Mutex::ScopedLock l(lock);
+ if (notifyEnabled) parent->outputTasks.activateOutput();
}
@@ -644,4 +661,18 @@
requestDispatch();
}
+void SemanticState::attached()
+{
+ for (ConsumerImplMap::iterator i = consumers.begin(); i != consumers.end(); i++) {
+ i->second->enableNotify();
+ }
+}
+
+void SemanticState::detached()
+{
+ for (ConsumerImplMap::iterator i = consumers.begin(); i != consumers.end(); i++) {
+ i->second->disableNotify();
+ }
+}
+
}} // namespace qpid::broker
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.h?rev=693518&r1=693517&r2=693518&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.h Tue Sep 9 10:15:17 2008
@@ -37,6 +37,7 @@
#include "qpid/framing/SequenceSet.h"
#include "qpid/framing/Uuid.h"
#include "qpid/sys/AggregateOutput.h"
+#include "qpid/sys/Mutex.h"
#include "qpid/shared_ptr.h"
#include "AclModule.h"
@@ -58,8 +59,10 @@
class SemanticState : public sys::OutputTask,
private boost::noncopyable
{
- class ConsumerImpl : public Consumer, public sys::OutputTask
+ class ConsumerImpl : public Consumer, public sys::OutputTask,
+ public boost::enable_shared_from_this<ConsumerImpl>
{
+ qpid::sys::Mutex lock;
SemanticState* const parent;
const DeliveryToken::shared_ptr token;
const string name;
@@ -71,11 +74,14 @@
bool windowing;
uint32_t msgCredit;
uint32_t byteCredit;
+ bool notifyEnabled;
bool checkCredit(boost::intrusive_ptr<Message>& msg);
void allocateCredit(boost::intrusive_ptr<Message>& msg);
public:
+ typedef boost::shared_ptr<ConsumerImpl> shared_ptr;
+
ConsumerImpl(SemanticState* parent, DeliveryToken::shared_ptr token,
const string& name, Queue::shared_ptr queue,
bool ack, bool nolocal, bool acquire);
@@ -84,6 +90,9 @@
bool deliver(QueuedMessage& msg);
bool filter(boost::intrusive_ptr<Message> msg);
bool accept(boost::intrusive_ptr<Message> msg);
+
+ void disableNotify();
+ void enableNotify();
void notify();
void setWindowMode();
@@ -100,7 +109,7 @@
bool doOutput();
};
- typedef boost::ptr_map<std::string,ConsumerImpl> ConsumerImplMap;
+ typedef std::map<std::string, ConsumerImpl::shared_ptr> ConsumerImplMap;
typedef std::map<std::string, DtxBuffer::shared_ptr> DtxBufferMap;
SessionContext& session;
@@ -130,7 +139,7 @@
AckRange findRange(DeliveryId first, DeliveryId last);
void requestDispatch();
void requestDispatch(ConsumerImpl&);
- void cancel(ConsumerImpl&);
+ void cancel(ConsumerImpl::shared_ptr);
public:
SemanticState(DeliveryAdapter&, SessionContext&);
@@ -187,6 +196,9 @@
//final 0-10 spec (completed and accepted are distinct):
void completed(DeliveryId deliveryTag, DeliveryId endTag);
void accepted(DeliveryId deliveryTag, DeliveryId endTag);
+
+ void attached();
+ void detached();
};
}} // namespace qpid::broker
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.cpp?rev=693518&r1=693517&r2=693518&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.cpp Tue Sep 9 10:15:17 2008
@@ -92,9 +92,8 @@
}
void SessionState::detach() {
- // activateOutput can be called in a different thread, lock to protect attached status
- Mutex::ScopedLock l(lock);
QPID_LOG(debug, getId() << ": detached on broker.");
+ semanticState.detached();//prevents further activateOutput calls until reattached
getConnection().outputTasks.removeOutputTask(&semanticState);
handler = 0;
if (mgmtObject != 0)
@@ -102,8 +101,6 @@
}
void SessionState::attach(SessionHandler& h) {
- // activateOutput can be called in a different thread, lock to protect attached status
- Mutex::ScopedLock l(lock);
QPID_LOG(debug, getId() << ": attached on broker.");
handler = &h;
if (mgmtObject != 0)
@@ -115,8 +112,6 @@
}
void SessionState::activateOutput() {
- // activateOutput can be called in a different thread, lock to protect attached status
- Mutex::ScopedLock l(lock);
if (isAttached())
getConnection().outputTasks.activateOutput();
}
@@ -273,6 +268,7 @@
void SessionState::readyToSend() {
QPID_LOG(debug, getId() << ": ready to send, activating output.");
assert(handler);
+ semanticState.attached();
sys::AggregateOutput& tasks = handler->getConnection().outputTasks;
tasks.addOutputTask(&semanticState);
tasks.activateOutput();
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.h?rev=693518&r1=693517&r2=693518&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.h Tue Sep 9 10:15:17 2008
@@ -25,7 +25,6 @@
#include "qpid/SessionState.h"
#include "qpid/framing/FrameHandler.h"
#include "qpid/framing/SequenceSet.h"
-#include "qpid/sys/Mutex.h"
#include "qpid/sys/Time.h"
#include "qpid/management/Manageable.h"
#include "qpid/management/Session.h"
@@ -117,7 +116,6 @@
Broker& broker;
SessionHandler* handler;
sys::AbsTime expiry; // Used by SessionManager.
- sys::Mutex lock;
bool ignoring;
std::string name;
SemanticState semanticState;
Modified: incubator/qpid/trunk/qpid/cpp/src/tests/QueueTest.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/QueueTest.cpp?rev=693518&r1=693517&r2=693518&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/QueueTest.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/QueueTest.cpp Tue Sep 9 10:15:17 2008
@@ -78,7 +78,7 @@
Queue::shared_ptr queue(new Queue("my_test_queue", true));
intrusive_ptr<Message> received;
- TestConsumer c1;
+ TestConsumer::shared_ptr c1(new TestConsumer());
queue->consume(c1);
@@ -88,7 +88,7 @@
queue->process(msg1);
sleep(2);
- BOOST_CHECK(!c1.received);
+ BOOST_CHECK(!c1->received);
msg1->enqueueComplete();
received = queue->get().payload;
@@ -114,8 +114,8 @@
Queue::shared_ptr queue(new Queue("my_queue", true));
//Test adding consumers:
- TestConsumer c1;
- TestConsumer c2;
+ TestConsumer::shared_ptr c1(new TestConsumer());
+ TestConsumer::shared_ptr c2(new TestConsumer());
queue->consume(c1);
queue->consume(c2);
@@ -128,16 +128,16 @@
queue->deliver(msg1);
BOOST_CHECK(queue->dispatch(c1));
- BOOST_CHECK_EQUAL(msg1.get(), c1.last.get());
+ BOOST_CHECK_EQUAL(msg1.get(), c1->last.get());
queue->deliver(msg2);
BOOST_CHECK(queue->dispatch(c2));
- BOOST_CHECK_EQUAL(msg2.get(), c2.last.get());
+ BOOST_CHECK_EQUAL(msg2.get(), c2->last.get());
- c1.received = false;
+ c1->received = false;
queue->deliver(msg3);
BOOST_CHECK(queue->dispatch(c1));
- BOOST_CHECK_EQUAL(msg3.get(), c1.last.get());
+ BOOST_CHECK_EQUAL(msg3.get(), c1->last.get());
//Test cancellation:
queue->cancel(c1);
@@ -187,13 +187,13 @@
BOOST_CHECK_EQUAL(msg2.get(), received.get());
BOOST_CHECK_EQUAL(uint32_t(1), queue->getMessageCount());
- TestConsumer consumer;
+ TestConsumer::shared_ptr consumer(new TestConsumer());
queue->consume(consumer);
queue->dispatch(consumer);
- if (!consumer.received)
+ if (!consumer->received)
sleep(2);
- BOOST_CHECK_EQUAL(msg3.get(), consumer.last.get());
+ BOOST_CHECK_EQUAL(msg3.get(), consumer->last.get());
BOOST_CHECK_EQUAL(uint32_t(0), queue->getMessageCount());
received = queue->get().payload;