You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by cc...@apache.org on 2007/07/18 20:03:55 UTC

svn commit: r557343 - in /incubator/qpid/trunk/qpid/cpp/src: qpid/broker/ tests/

Author: cctrieloff
Date: Wed Jul 18 11:03:54 2007
New Revision: 557343

URL: http://svn.apache.org/viewvc?view=rev&rev=557343
Log:

Fixed MT safety issues pointed out by Gordon.


Modified:
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerAdapter.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerChannel.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerQueue.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerQueue.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageHandlerImpl.cpp
    incubator/qpid/trunk/qpid/cpp/src/tests/BrokerChannelTest.cpp
    incubator/qpid/trunk/qpid/cpp/src/tests/QueueTest.cpp

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerAdapter.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerAdapter.cpp?view=diff&rev=557343&r1=557342&r2=557343
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerAdapter.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerAdapter.cpp Wed Jul 18 11:03:54 2007
@@ -311,7 +311,7 @@
     if(!nowait) client.consumeOk(newTag, context.getRequestId());
 
     //allow messages to be dispatched if required as there is now a consumer:
-    queue->dispatch();
+    queue->requestDispatch();
 } 
         
 void BrokerAdapter::BasicHandlerImpl::cancel(const MethodContext& context, const string& consumerTag, bool nowait){

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerChannel.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerChannel.cpp?view=diff&rev=557343&r1=557342&r2=557343
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerChannel.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerChannel.cpp Wed Jul 18 11:03:54 2007
@@ -253,7 +253,7 @@
 
 void Channel::ConsumerImpl::requestDispatch(){
     if(blocked)
-        queue->dispatch();
+        queue->requestDispatch();
 }
 
 void Channel::handleInlineTransfer(Message::shared_ptr msg){

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerQueue.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerQueue.cpp?view=diff&rev=557343&r1=557342&r2=557343
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerQueue.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerQueue.cpp Wed Jul 18 11:03:54 2007
@@ -87,6 +87,11 @@
 }
 
 
+void Queue::requestDispatch(){
+    serializer.execute(boost::bind(&Queue::dispatch, this));
+}
+
+
 bool Queue::dispatch(Message::shared_ptr& msg){
 
  

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerQueue.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerQueue.h?view=diff&rev=557343&r1=557342&r2=557343
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerQueue.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerQueue.h Wed Jul 18 11:03:54 2007
@@ -81,7 +81,11 @@
             void push(Message::shared_ptr& msg);
             bool dispatch(Message::shared_ptr& msg);
             void setPolicy(std::auto_ptr<QueuePolicy> policy);
-
+            /**
+             * only called by serilizer
+	     */
+            void dispatch();
+ 
         public:
             
             typedef boost::shared_ptr<Queue> shared_ptr;
@@ -120,12 +124,12 @@
              */
             void recover(Message::shared_ptr& msg);
             /**
-             * Dispatch any queued messages providing there are
+             * Request dispatch any queued messages providing there are
              * consumers for them. Only one thread can be dispatching
-             * at any time, but this method (rather than the caller)
-             * is responsible for ensuring that.
+             * at any time, so this call schedules the despatch based on
+	     * the serilizer policy.
              */
-            void dispatch();
+            void requestDispatch();
             void consume(Consumer* c, bool exclusive = false);
             void cancel(Consumer* c);
             uint32_t purge();

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageHandlerImpl.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageHandlerImpl.cpp?view=diff&rev=557343&r1=557342&r2=557343
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageHandlerImpl.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageHandlerImpl.cpp Wed Jul 18 11:03:54 2007
@@ -134,7 +134,7 @@
         noLocal ? &connection : 0, &filter);
     client.ok(context.getRequestId());
     // Dispatch messages as there is now a consumer.
-    queue->dispatch();
+    queue->requestDispatch();
 }
 
 void

Modified: incubator/qpid/trunk/qpid/cpp/src/tests/BrokerChannelTest.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/BrokerChannelTest.cpp?view=diff&rev=557343&r1=557342&r2=557343
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/BrokerChannelTest.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/BrokerChannelTest.cpp Wed Jul 18 11:03:54 2007
@@ -212,6 +212,7 @@
         string tag("test");
         channel.consume(recorder.createAdapter(), tag, queue, false, false, 0);
         queue->deliver(msg);
+	sleep(2);
 
         CPPUNIT_ASSERT_EQUAL((size_t) 1, recorder.delivered.size());
         CPPUNIT_ASSERT_EQUAL(msg, recorder.delivered.front().first);
@@ -293,6 +294,7 @@
         queue->deliver(msg1);
         queue->deliver(msg2);
         queue->deliver(msg3);
+	sleep(2);
         
         Message::shared_ptr next = queue->dequeue();
         CPPUNIT_ASSERT_EQUAL(msg1, next);
@@ -336,6 +338,7 @@
         CPPUNIT_ASSERT_EQUAL((size_t) 0, recorder.delivered.size());
 
         channel.flow(true);
+	sleep(2);
         //ensure no messages have been delivered
         CPPUNIT_ASSERT_EQUAL((size_t) 1, recorder.delivered.size());
         CPPUNIT_ASSERT_EQUAL(msg, recorder.delivered.front().first);

Modified: incubator/qpid/trunk/qpid/cpp/src/tests/QueueTest.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/QueueTest.cpp?view=diff&rev=557343&r1=557342&r2=557343
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/QueueTest.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/QueueTest.cpp Wed Jul 18 11:03:54 2007
@@ -37,8 +37,14 @@
 class TestConsumer : public virtual Consumer{
 public:
     Message::shared_ptr last;
+    bool received;
+    TestConsumer(): received(false) {};
 
-    virtual bool deliver(Message::shared_ptr& msg);
+    virtual bool deliver(Message::shared_ptr& msg){
+    last = msg;
+    received = true;
+    return true;
+    };
 };
 
 class FailOnDeliver : public Deliverable
@@ -84,16 +90,19 @@
         Message::shared_ptr msg3 = message("e", "C");
 
         queue->deliver(msg1);
-	/** if dispatched on diff thread, force dispatch so don't have to wait for thread. Only do in text */
-        queue->dispatch();  
-       CPPUNIT_ASSERT_EQUAL(msg1.get(), c1.last.get());
+	if (!c1.received)
+	    sleep(2);
+        CPPUNIT_ASSERT_EQUAL(msg1.get(), c1.last.get());
 
         queue->deliver(msg2);
-        queue->dispatch();
+	if (!c2.received)
+	    sleep(2);
         CPPUNIT_ASSERT_EQUAL(msg2.get(), c2.last.get());
         
+	c1.received = false;
         queue->deliver(msg3);
-        queue->dispatch();
+	if (!c1.received)
+	    sleep(2);
         CPPUNIT_ASSERT_EQUAL(msg3.get(), c1.last.get());        
     
         //Test cancellation:
@@ -146,7 +155,10 @@
 
         TestConsumer consumer; 
         queue->consume(&consumer);
-        queue->dispatch();
+        queue->requestDispatch();
+	if (!consumer.received)
+	    sleep(2);
+
         CPPUNIT_ASSERT_EQUAL(msg3.get(), consumer.last.get());
         CPPUNIT_ASSERT_EQUAL(uint32_t(0), queue->getMessageCount());
 
@@ -195,9 +207,4 @@
 CPPUNIT_PLUGIN_IMPLEMENT();
 CPPUNIT_TEST_SUITE_REGISTRATION(QueueTest);
 
-//TestConsumer
-bool TestConsumer::deliver(Message::shared_ptr& msg){
-    last = msg;
-    return true;
-}