You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kp...@apache.org on 2012/07/25 14:03:35 UTC

svn commit: r1365545 - in /qpid/branches/asyncstore/cpp/src/tests: ./ storePerftools/asyncPerf/

Author: kpvdr
Date: Wed Jul 25 12:03:34 2012
New Revision: 1365545

URL: http://svn.apache.org/viewvc?rev=1365545&view=rev
Log:
QPID-3858: WIP: Removed PersistableQueuedMessage again. The non-durable transactional enqueues are broken.

Removed:
    qpid/branches/asyncstore/cpp/src/tests/storePerftools/asyncPerf/PersistableQueuedMessage.cpp
    qpid/branches/asyncstore/cpp/src/tests/storePerftools/asyncPerf/PersistableQueuedMessage.h
Modified:
    qpid/branches/asyncstore/cpp/src/tests/asyncstore.cmake
    qpid/branches/asyncstore/cpp/src/tests/storePerftools/asyncPerf/QueuedMessage.cpp
    qpid/branches/asyncstore/cpp/src/tests/storePerftools/asyncPerf/QueuedMessage.h
    qpid/branches/asyncstore/cpp/src/tests/storePerftools/asyncPerf/SimpleQueue.cpp
    qpid/branches/asyncstore/cpp/src/tests/storePerftools/asyncPerf/SimpleQueue.h
    qpid/branches/asyncstore/cpp/src/tests/storePerftools/asyncPerf/TxnPublish.cpp

Modified: qpid/branches/asyncstore/cpp/src/tests/asyncstore.cmake
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/tests/asyncstore.cmake?rev=1365545&r1=1365544&r2=1365545&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/tests/asyncstore.cmake (original)
+++ qpid/branches/asyncstore/cpp/src/tests/asyncstore.cmake Wed Jul 25 12:03:34 2012
@@ -58,7 +58,6 @@ set (asyncStorePerf_SOURCES
     storePerftools/asyncPerf/MessageDeque.cpp
     storePerftools/asyncPerf/MessageProducer.cpp
 	storePerftools/asyncPerf/PerfTest.cpp
-	storePerftools/asyncPerf/PersistableQueuedMessage.cpp
 	storePerftools/asyncPerf/QueuedMessage.cpp
 	storePerftools/asyncPerf/SimpleMessage.cpp
 	storePerftools/asyncPerf/SimpleQueue.cpp

Modified: qpid/branches/asyncstore/cpp/src/tests/storePerftools/asyncPerf/QueuedMessage.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/tests/storePerftools/asyncPerf/QueuedMessage.cpp?rev=1365545&r1=1365544&r2=1365545&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/tests/storePerftools/asyncPerf/QueuedMessage.cpp (original)
+++ qpid/branches/asyncstore/cpp/src/tests/storePerftools/asyncPerf/QueuedMessage.cpp Wed Jul 25 12:03:34 2012
@@ -41,18 +41,24 @@ QueuedMessage::QueuedMessage(SimpleQueue
         boost::enable_shared_from_this<QueuedMessage>(),
         m_queue(q),
         m_msg(msg)
-{}
+{
+    if (m_queue->getStore()) {
+        m_enqHandle = q->getStore()->createEnqueueHandle(msg->getHandle(), q->getHandle());
+    }
+}
 
 QueuedMessage::QueuedMessage(const QueuedMessage& qm) :
         boost::enable_shared_from_this<QueuedMessage>(),
         m_queue(qm.m_queue),
-        m_msg(qm.m_msg)
+        m_msg(qm.m_msg),
+        m_enqHandle(qm.m_enqHandle)
 {}
 
 QueuedMessage::QueuedMessage(QueuedMessage* const qm) :
         boost::enable_shared_from_this<QueuedMessage>(),
         m_queue(qm->m_queue),
-        m_msg(qm->m_msg)
+        m_msg(qm->m_msg),
+        m_enqHandle(qm->m_enqHandle)
 {}
 
 QueuedMessage::~QueuedMessage()
@@ -70,6 +76,18 @@ QueuedMessage::payload() const
     return m_msg;
 }
 
+const qpid::broker::EnqueueHandle&
+QueuedMessage::enqHandle() const
+{
+    return m_enqHandle;
+}
+
+qpid::broker::EnqueueHandle&
+QueuedMessage::enqHandle()
+{
+    return m_enqHandle;
+}
+
 void
 QueuedMessage::prepareEnqueue(qpid::broker::TxnHandle& th)
 {

Modified: qpid/branches/asyncstore/cpp/src/tests/storePerftools/asyncPerf/QueuedMessage.h
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/tests/storePerftools/asyncPerf/QueuedMessage.h?rev=1365545&r1=1365544&r2=1365545&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/tests/storePerftools/asyncPerf/QueuedMessage.h (original)
+++ qpid/branches/asyncstore/cpp/src/tests/storePerftools/asyncPerf/QueuedMessage.h Wed Jul 25 12:03:34 2012
@@ -25,6 +25,7 @@
 #define tests_storePerftools_asyncPerf_QueuedMessage_h_
 
 #include "qpid/broker/AsyncStore.h"
+#include "qpid/broker/EnqueueHandle.h"
 
 #include <boost/enable_shared_from_this.hpp>
 #include <boost/intrusive_ptr.hpp>
@@ -54,6 +55,8 @@ public:
     virtual ~QueuedMessage();
     SimpleQueue* getQueue() const;
     boost::intrusive_ptr<SimpleMessage> payload() const;
+    const qpid::broker::EnqueueHandle& enqHandle() const;
+    qpid::broker::EnqueueHandle& enqHandle();
 
     // --- Transaction handling ---
     void prepareEnqueue(qpid::broker::TxnHandle& th);
@@ -63,6 +66,7 @@ public:
 private:
     SimpleQueue* m_queue;
     boost::intrusive_ptr<SimpleMessage> m_msg;
+    qpid::broker::EnqueueHandle m_enqHandle;
 };
 
 }}} // namespace tests::storePerfTools

Modified: qpid/branches/asyncstore/cpp/src/tests/storePerftools/asyncPerf/SimpleQueue.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/tests/storePerftools/asyncPerf/SimpleQueue.cpp?rev=1365545&r1=1365544&r2=1365545&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/tests/storePerftools/asyncPerf/SimpleQueue.cpp (original)
+++ qpid/branches/asyncstore/cpp/src/tests/storePerftools/asyncPerf/SimpleQueue.cpp Wed Jul 25 12:03:34 2012
@@ -26,7 +26,7 @@
 #include "DeliveryRecord.h"
 #include "MessageConsumer.h"
 #include "MessageDeque.h"
-#include "PersistableQueuedMessage.h"
+#include "QueuedMessage.h"
 #include "SimpleMessage.h"
 
 #include "qpid/broker/AsyncResultHandle.h"
@@ -153,12 +153,7 @@ SimpleQueue::handleAsyncDestroyResult(co
 void
 SimpleQueue::deliver(boost::intrusive_ptr<SimpleMessage> msg)
 {
-    boost::shared_ptr<QueuedMessage> qm;
-    if (msg->isPersistent() && m_store) {
-        qm = boost::shared_ptr<PersistableQueuedMessage>(new PersistableQueuedMessage(this, msg));
-    } else {
-        qm = boost::shared_ptr<QueuedMessage>(new QueuedMessage(this, msg));
-    }
+    boost::shared_ptr<QueuedMessage> qm(boost::shared_ptr<QueuedMessage>(new QueuedMessage(this, msg)));
     enqueue(s_nullTxnHandle, qm);
     push(qm);
 }
@@ -191,7 +186,7 @@ SimpleQueue::enqueue(qpid::broker::TxnHa
     }
     if (qm->payload()->isPersistent() && m_store) {
         qm->payload()->enqueueAsync(shared_from_this(), m_store);
-        return asyncEnqueue(th, boost::dynamic_pointer_cast<PersistableQueuedMessage>(qm));
+        return asyncEnqueue(th, qm);
     }
     return false;
 }
@@ -212,7 +207,7 @@ SimpleQueue::dequeue(qpid::broker::TxnHa
     }
     if (qm->payload()->isPersistent() && m_store) {
         qm->payload()->dequeueAsync(shared_from_this(), m_store);
-        return asyncDequeue(th, boost::dynamic_pointer_cast<PersistableQueuedMessage>(qm));
+        return asyncDequeue(th, qm);
     }
     return true;
 }
@@ -220,13 +215,7 @@ SimpleQueue::dequeue(qpid::broker::TxnHa
 void
 SimpleQueue::process(boost::intrusive_ptr<SimpleMessage> msg)
 {
-    boost::shared_ptr<QueuedMessage> qm;
-    if (msg->isPersistent() && m_store) {
-        qm = boost::shared_ptr<PersistableQueuedMessage>(new PersistableQueuedMessage(this, msg));
-    } else {
-        qm = boost::shared_ptr<QueuedMessage>(new QueuedMessage(this, msg));
-    }
-    push(qm);
+    push(boost::shared_ptr<QueuedMessage>(new QueuedMessage(this, msg)));
 }
 
 void
@@ -356,12 +345,12 @@ SimpleQueue::push(boost::shared_ptr<Queu
 // private
 bool
 SimpleQueue::asyncEnqueue(qpid::broker::TxnHandle& th,
-                          boost::shared_ptr<PersistableQueuedMessage> pqm)
+                          boost::shared_ptr<QueuedMessage> qm)
 {
-    assert(pqm.get());
+    assert(qm.get());
 //    qm.payload()->setPersistenceId(m_store->getNextRid()); // TODO: rid is set by store itself - find way to do this
     boost::shared_ptr<qpid::broker::QueueAsyncContext> qac(new qpid::broker::QueueAsyncContext(shared_from_this(),
-                                                                                               pqm->payload(),
+                                                                                               qm->payload(),
                                                                                                th,
                                                                                                &handleAsyncEnqueueResult,
                                                                                                &m_resultQueue));
@@ -369,7 +358,7 @@ SimpleQueue::asyncEnqueue(qpid::broker::
     if (th.isValid()) {
         th.incrOpCnt();
     }
-    m_store->submitEnqueue(pqm->enqHandle(), th, qac);
+    m_store->submitEnqueue(qm->enqHandle(), th, qac);
     ++m_asyncOpCounter;
     return true;
 }
@@ -394,11 +383,11 @@ SimpleQueue::handleAsyncEnqueueResult(co
 // private
 bool
 SimpleQueue::asyncDequeue(qpid::broker::TxnHandle& th,
-                          boost::shared_ptr<PersistableQueuedMessage> pqm)
+                          boost::shared_ptr<QueuedMessage> qm)
 {
-    assert(pqm.get());
+    assert(qm.get());
     boost::shared_ptr<qpid::broker::QueueAsyncContext> qac(new qpid::broker::QueueAsyncContext(shared_from_this(),
-                                                                                               pqm->payload(),
+                                                                                               qm->payload(),
                                                                                                th,
                                                                                                &handleAsyncDequeueResult,
                                                                                                &m_resultQueue));
@@ -406,7 +395,7 @@ SimpleQueue::asyncDequeue(qpid::broker::
     if (th.isValid()) {
         th.incrOpCnt();
     }
-    m_store->submitDequeue(pqm->enqHandle(),
+    m_store->submitDequeue(qm->enqHandle(),
                            th,
                            qac);
     ++m_asyncOpCounter;

Modified: qpid/branches/asyncstore/cpp/src/tests/storePerftools/asyncPerf/SimpleQueue.h
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/tests/storePerftools/asyncPerf/SimpleQueue.h?rev=1365545&r1=1365544&r2=1365545&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/tests/storePerftools/asyncPerf/SimpleQueue.h (original)
+++ qpid/branches/asyncstore/cpp/src/tests/storePerftools/asyncPerf/SimpleQueue.h Wed Jul 25 12:03:34 2012
@@ -49,7 +49,6 @@ namespace asyncPerf {
 
 class MessageConsumer;
 class Messages;
-class PersistableQueuedMessage;
 class QueuedMessage;
 class SimpleMessage;
 
@@ -136,10 +135,10 @@ private:
 
     // -- Async ops ---
     bool asyncEnqueue(qpid::broker::TxnHandle& th,
-                      boost::shared_ptr<PersistableQueuedMessage> pqm);
+                      boost::shared_ptr<QueuedMessage> qm);
     static void handleAsyncEnqueueResult(const qpid::broker::AsyncResultHandle* const arh);
     bool asyncDequeue(qpid::broker::TxnHandle& th,
-                      boost::shared_ptr<PersistableQueuedMessage> pqm);
+                      boost::shared_ptr<QueuedMessage> qm);
     static void handleAsyncDequeueResult(const qpid::broker::AsyncResultHandle* const arh);
 
     // --- Async op counter ---

Modified: qpid/branches/asyncstore/cpp/src/tests/storePerftools/asyncPerf/TxnPublish.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/tests/storePerftools/asyncPerf/TxnPublish.cpp?rev=1365545&r1=1365544&r2=1365545&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/tests/storePerftools/asyncPerf/TxnPublish.cpp (original)
+++ qpid/branches/asyncstore/cpp/src/tests/storePerftools/asyncPerf/TxnPublish.cpp Wed Jul 25 12:03:34 2012
@@ -23,7 +23,6 @@
 
 #include "TxnPublish.h"
 
-#include "PersistableQueuedMessage.h"
 #include "QueuedMessage.h"
 #include "SimpleMessage.h"
 #include "SimpleQueue.h"
@@ -97,13 +96,7 @@ TxnPublish::contentSize()
 void
 TxnPublish::deliverTo(const boost::shared_ptr<SimpleQueue>& queue)
 {
-    boost::shared_ptr<QueuedMessage> qm;
-    if (m_msg->isPersistent() && queue->getStore()) {
-        qm = boost::shared_ptr<PersistableQueuedMessage>(new PersistableQueuedMessage(queue.get(), m_msg));
-    } else {
-        qm = boost::shared_ptr<QueuedMessage>(new QueuedMessage(queue.get(), m_msg));
-    }
-    m_queues.push_back(qm);
+    m_queues.push_back(boost::shared_ptr<QueuedMessage>(new QueuedMessage(queue.get(), m_msg)));
     m_delivered = true;
 }
 



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org