You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by cc...@apache.org on 2007/08/16 22:57:44 UTC

svn commit: r566846 - in /incubator/qpid/trunk/qpid/cpp/src: qpid/broker/BrokerQueue.cpp qpid/broker/RecoveryManagerImpl.cpp tests/TxPublishTest.cpp

Author: cctrieloff
Date: Thu Aug 16 13:57:43 2007
New Revision: 566846

URL: http://svn.apache.org/viewvc?view=rev&rev=566846
Log:

- Fix for asyncIO for store
- Fix for dtx async IO recover
- Temp patch for Tx commit ( existing bug uncovered )
- All store tests should be working again

Know issues:
- If a msg is sent to more than one queue, then the
  io complete is signaled on the first record written, not
  the last
- Open issues for tx begin then commit with no prepare using
  duarble msgs and async IO. async complete bit not set on 
  recovery. will be fixed with next commit.


Modified:
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerQueue.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/RecoveryManagerImpl.cpp
    incubator/qpid/trunk/qpid/cpp/src/tests/TxPublishTest.cpp

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerQueue.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerQueue.cpp?view=diff&rev=566846&r1=566845&r2=566846
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerQueue.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerQueue.cpp Thu Aug 16 13:57:43 2007
@@ -87,6 +87,7 @@
 
 void Queue::recover(Message::shared_ptr& msg){
     push(msg);
+    msg->enqueueComplete(); // mark the message as enqueued
     if (store && msg->expectedContentSize() != msg->encodedContentSize()) {
         //content has not been loaded, need to ensure that lazy loading mode is set:
         //TODO: find a nicer way to do this
@@ -189,10 +190,13 @@
     Message::shared_ptr msg;
     if(!messages.empty()){
         msg = messages.front();
-	if (msg->isEnqueueComplete())
+	if (msg->isEnqueueComplete()){
            pop();
+	   return msg;
+	}
     }
-    return msg;
+    Message::shared_ptr msg_empty;
+    return msg_empty;
 }
 
 uint32_t Queue::purge(){

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/RecoveryManagerImpl.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/RecoveryManagerImpl.cpp?view=diff&rev=566846&r1=566845&r2=566846
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/RecoveryManagerImpl.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/RecoveryManagerImpl.cpp Thu Aug 16 13:57:43 2007
@@ -194,6 +194,7 @@
 
 void RecoverableMessageImpl::enqueue(DtxBuffer::shared_ptr buffer, Queue::shared_ptr queue)
 {
+    msg->enqueueComplete(); // recoved nmessage to enqueued in store already
     buffer->enlist(TxOp::shared_ptr(new RecoveredEnqueue(queue, msg)));
 }
 

Modified: incubator/qpid/trunk/qpid/cpp/src/tests/TxPublishTest.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/TxPublishTest.cpp?view=diff&rev=566846&r1=566845&r2=566846
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/TxPublishTest.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/TxPublishTest.cpp Thu Aug 16 13:57:43 2007
@@ -44,7 +44,8 @@
         
         void enqueue(TransactionContext*, PersistableMessage& msg, const PersistableQueue& queue)
         {
-            enqueued.push_back(msg_queue_pair(queue.getName(), &msg));
+            msg.enqueueComplete(); 
+ 	    enqueued.push_back(msg_queue_pair(queue.getName(), &msg));
         }
         
         //dont care about any of the other methods:
@@ -61,7 +62,7 @@
     TestMessageStore store;
     Queue::shared_ptr queue1;
     Queue::shared_ptr queue2;
-    Message::shared_ptr const msg;
+    Message::shared_ptr msg;
     TxPublish op;
     
 public:
@@ -88,14 +89,21 @@
         CPPUNIT_ASSERT_EQUAL((PersistableMessage*) msg.get(), store.enqueued[0].second);
         CPPUNIT_ASSERT_EQUAL(string("queue2"), store.enqueued[1].first);
         CPPUNIT_ASSERT_EQUAL((PersistableMessage*) msg.get(), store.enqueued[1].second);
+	CPPUNIT_ASSERT_EQUAL( true, ((PersistableMessage*) msg.get())->isEnqueueComplete());
+	
+
     }
 
     void testCommit()
     {
         //ensure messages are delivered to queue
+        op.prepare(0);
         op.commit();
         CPPUNIT_ASSERT_EQUAL((uint32_t) 1, queue1->getMessageCount());
-        CPPUNIT_ASSERT_EQUAL(msg, queue1->dequeue());
+	Message::shared_ptr msg_dequeue = queue1->dequeue();
+
+ 	CPPUNIT_ASSERT_EQUAL( true, ((PersistableMessage*) msg_dequeue.get())->isEnqueueComplete());
+        CPPUNIT_ASSERT_EQUAL(msg, msg_dequeue);
 
         CPPUNIT_ASSERT_EQUAL((uint32_t) 1, queue2->getMessageCount());
         CPPUNIT_ASSERT_EQUAL(msg, queue2->dequeue());