You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by cc...@apache.org on 2007/07/18 20:03:55 UTC
svn commit: r557343 - in /incubator/qpid/trunk/qpid/cpp/src: qpid/broker/
tests/
Author: cctrieloff
Date: Wed Jul 18 11:03:54 2007
New Revision: 557343
URL: http://svn.apache.org/viewvc?view=rev&rev=557343
Log:
Fixed MT safety issues pointed out by Gordon.
Modified:
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerAdapter.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerChannel.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerQueue.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerQueue.h
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageHandlerImpl.cpp
incubator/qpid/trunk/qpid/cpp/src/tests/BrokerChannelTest.cpp
incubator/qpid/trunk/qpid/cpp/src/tests/QueueTest.cpp
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerAdapter.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerAdapter.cpp?view=diff&rev=557343&r1=557342&r2=557343
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerAdapter.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerAdapter.cpp Wed Jul 18 11:03:54 2007
@@ -311,7 +311,7 @@
if(!nowait) client.consumeOk(newTag, context.getRequestId());
//allow messages to be dispatched if required as there is now a consumer:
- queue->dispatch();
+ queue->requestDispatch();
}
void BrokerAdapter::BasicHandlerImpl::cancel(const MethodContext& context, const string& consumerTag, bool nowait){
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerChannel.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerChannel.cpp?view=diff&rev=557343&r1=557342&r2=557343
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerChannel.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerChannel.cpp Wed Jul 18 11:03:54 2007
@@ -253,7 +253,7 @@
void Channel::ConsumerImpl::requestDispatch(){
if(blocked)
- queue->dispatch();
+ queue->requestDispatch();
}
void Channel::handleInlineTransfer(Message::shared_ptr msg){
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerQueue.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerQueue.cpp?view=diff&rev=557343&r1=557342&r2=557343
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerQueue.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerQueue.cpp Wed Jul 18 11:03:54 2007
@@ -87,6 +87,11 @@
}
+void Queue::requestDispatch(){
+ serializer.execute(boost::bind(&Queue::dispatch, this));
+}
+
+
bool Queue::dispatch(Message::shared_ptr& msg){
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerQueue.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerQueue.h?view=diff&rev=557343&r1=557342&r2=557343
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerQueue.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerQueue.h Wed Jul 18 11:03:54 2007
@@ -81,7 +81,11 @@
void push(Message::shared_ptr& msg);
bool dispatch(Message::shared_ptr& msg);
void setPolicy(std::auto_ptr<QueuePolicy> policy);
-
+ /**
+ * only called by serilizer
+ */
+ void dispatch();
+
public:
typedef boost::shared_ptr<Queue> shared_ptr;
@@ -120,12 +124,12 @@
*/
void recover(Message::shared_ptr& msg);
/**
- * Dispatch any queued messages providing there are
+ * Request dispatch any queued messages providing there are
* consumers for them. Only one thread can be dispatching
- * at any time, but this method (rather than the caller)
- * is responsible for ensuring that.
+ * at any time, so this call schedules the despatch based on
+ * the serilizer policy.
*/
- void dispatch();
+ void requestDispatch();
void consume(Consumer* c, bool exclusive = false);
void cancel(Consumer* c);
uint32_t purge();
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageHandlerImpl.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageHandlerImpl.cpp?view=diff&rev=557343&r1=557342&r2=557343
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageHandlerImpl.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageHandlerImpl.cpp Wed Jul 18 11:03:54 2007
@@ -134,7 +134,7 @@
noLocal ? &connection : 0, &filter);
client.ok(context.getRequestId());
// Dispatch messages as there is now a consumer.
- queue->dispatch();
+ queue->requestDispatch();
}
void
Modified: incubator/qpid/trunk/qpid/cpp/src/tests/BrokerChannelTest.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/BrokerChannelTest.cpp?view=diff&rev=557343&r1=557342&r2=557343
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/BrokerChannelTest.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/BrokerChannelTest.cpp Wed Jul 18 11:03:54 2007
@@ -212,6 +212,7 @@
string tag("test");
channel.consume(recorder.createAdapter(), tag, queue, false, false, 0);
queue->deliver(msg);
+ sleep(2);
CPPUNIT_ASSERT_EQUAL((size_t) 1, recorder.delivered.size());
CPPUNIT_ASSERT_EQUAL(msg, recorder.delivered.front().first);
@@ -293,6 +294,7 @@
queue->deliver(msg1);
queue->deliver(msg2);
queue->deliver(msg3);
+ sleep(2);
Message::shared_ptr next = queue->dequeue();
CPPUNIT_ASSERT_EQUAL(msg1, next);
@@ -336,6 +338,7 @@
CPPUNIT_ASSERT_EQUAL((size_t) 0, recorder.delivered.size());
channel.flow(true);
+ sleep(2);
//ensure no messages have been delivered
CPPUNIT_ASSERT_EQUAL((size_t) 1, recorder.delivered.size());
CPPUNIT_ASSERT_EQUAL(msg, recorder.delivered.front().first);
Modified: incubator/qpid/trunk/qpid/cpp/src/tests/QueueTest.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/QueueTest.cpp?view=diff&rev=557343&r1=557342&r2=557343
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/QueueTest.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/QueueTest.cpp Wed Jul 18 11:03:54 2007
@@ -37,8 +37,14 @@
class TestConsumer : public virtual Consumer{
public:
Message::shared_ptr last;
+ bool received;
+ TestConsumer(): received(false) {};
- virtual bool deliver(Message::shared_ptr& msg);
+ virtual bool deliver(Message::shared_ptr& msg){
+ last = msg;
+ received = true;
+ return true;
+ };
};
class FailOnDeliver : public Deliverable
@@ -84,16 +90,19 @@
Message::shared_ptr msg3 = message("e", "C");
queue->deliver(msg1);
- /** if dispatched on diff thread, force dispatch so don't have to wait for thread. Only do in text */
- queue->dispatch();
- CPPUNIT_ASSERT_EQUAL(msg1.get(), c1.last.get());
+ if (!c1.received)
+ sleep(2);
+ CPPUNIT_ASSERT_EQUAL(msg1.get(), c1.last.get());
queue->deliver(msg2);
- queue->dispatch();
+ if (!c2.received)
+ sleep(2);
CPPUNIT_ASSERT_EQUAL(msg2.get(), c2.last.get());
+ c1.received = false;
queue->deliver(msg3);
- queue->dispatch();
+ if (!c1.received)
+ sleep(2);
CPPUNIT_ASSERT_EQUAL(msg3.get(), c1.last.get());
//Test cancellation:
@@ -146,7 +155,10 @@
TestConsumer consumer;
queue->consume(&consumer);
- queue->dispatch();
+ queue->requestDispatch();
+ if (!consumer.received)
+ sleep(2);
+
CPPUNIT_ASSERT_EQUAL(msg3.get(), consumer.last.get());
CPPUNIT_ASSERT_EQUAL(uint32_t(0), queue->getMessageCount());
@@ -195,9 +207,4 @@
CPPUNIT_PLUGIN_IMPLEMENT();
CPPUNIT_TEST_SUITE_REGISTRATION(QueueTest);
-//TestConsumer
-bool TestConsumer::deliver(Message::shared_ptr& msg){
- last = msg;
- return true;
-}