You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kp...@apache.org on 2012/07/25 14:03:35 UTC
svn commit: r1365545 - in /qpid/branches/asyncstore/cpp/src/tests: ./
storePerftools/asyncPerf/
Author: kpvdr
Date: Wed Jul 25 12:03:34 2012
New Revision: 1365545
URL: http://svn.apache.org/viewvc?rev=1365545&view=rev
Log:
QPID-3858: WIP: Removed PersistableQueuedMessage again. The non-durable transactional enqueues are broken.
Removed:
qpid/branches/asyncstore/cpp/src/tests/storePerftools/asyncPerf/PersistableQueuedMessage.cpp
qpid/branches/asyncstore/cpp/src/tests/storePerftools/asyncPerf/PersistableQueuedMessage.h
Modified:
qpid/branches/asyncstore/cpp/src/tests/asyncstore.cmake
qpid/branches/asyncstore/cpp/src/tests/storePerftools/asyncPerf/QueuedMessage.cpp
qpid/branches/asyncstore/cpp/src/tests/storePerftools/asyncPerf/QueuedMessage.h
qpid/branches/asyncstore/cpp/src/tests/storePerftools/asyncPerf/SimpleQueue.cpp
qpid/branches/asyncstore/cpp/src/tests/storePerftools/asyncPerf/SimpleQueue.h
qpid/branches/asyncstore/cpp/src/tests/storePerftools/asyncPerf/TxnPublish.cpp
Modified: qpid/branches/asyncstore/cpp/src/tests/asyncstore.cmake
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/tests/asyncstore.cmake?rev=1365545&r1=1365544&r2=1365545&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/tests/asyncstore.cmake (original)
+++ qpid/branches/asyncstore/cpp/src/tests/asyncstore.cmake Wed Jul 25 12:03:34 2012
@@ -58,7 +58,6 @@ set (asyncStorePerf_SOURCES
storePerftools/asyncPerf/MessageDeque.cpp
storePerftools/asyncPerf/MessageProducer.cpp
storePerftools/asyncPerf/PerfTest.cpp
- storePerftools/asyncPerf/PersistableQueuedMessage.cpp
storePerftools/asyncPerf/QueuedMessage.cpp
storePerftools/asyncPerf/SimpleMessage.cpp
storePerftools/asyncPerf/SimpleQueue.cpp
Modified: qpid/branches/asyncstore/cpp/src/tests/storePerftools/asyncPerf/QueuedMessage.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/tests/storePerftools/asyncPerf/QueuedMessage.cpp?rev=1365545&r1=1365544&r2=1365545&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/tests/storePerftools/asyncPerf/QueuedMessage.cpp (original)
+++ qpid/branches/asyncstore/cpp/src/tests/storePerftools/asyncPerf/QueuedMessage.cpp Wed Jul 25 12:03:34 2012
@@ -41,18 +41,24 @@ QueuedMessage::QueuedMessage(SimpleQueue
boost::enable_shared_from_this<QueuedMessage>(),
m_queue(q),
m_msg(msg)
-{}
+{
+ if (m_queue->getStore()) {
+ m_enqHandle = q->getStore()->createEnqueueHandle(msg->getHandle(), q->getHandle());
+ }
+}
QueuedMessage::QueuedMessage(const QueuedMessage& qm) :
boost::enable_shared_from_this<QueuedMessage>(),
m_queue(qm.m_queue),
- m_msg(qm.m_msg)
+ m_msg(qm.m_msg),
+ m_enqHandle(qm.m_enqHandle)
{}
QueuedMessage::QueuedMessage(QueuedMessage* const qm) :
boost::enable_shared_from_this<QueuedMessage>(),
m_queue(qm->m_queue),
- m_msg(qm->m_msg)
+ m_msg(qm->m_msg),
+ m_enqHandle(qm->m_enqHandle)
{}
QueuedMessage::~QueuedMessage()
@@ -70,6 +76,18 @@ QueuedMessage::payload() const
return m_msg;
}
+const qpid::broker::EnqueueHandle&
+QueuedMessage::enqHandle() const
+{
+ return m_enqHandle;
+}
+
+qpid::broker::EnqueueHandle&
+QueuedMessage::enqHandle()
+{
+ return m_enqHandle;
+}
+
void
QueuedMessage::prepareEnqueue(qpid::broker::TxnHandle& th)
{
Modified: qpid/branches/asyncstore/cpp/src/tests/storePerftools/asyncPerf/QueuedMessage.h
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/tests/storePerftools/asyncPerf/QueuedMessage.h?rev=1365545&r1=1365544&r2=1365545&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/tests/storePerftools/asyncPerf/QueuedMessage.h (original)
+++ qpid/branches/asyncstore/cpp/src/tests/storePerftools/asyncPerf/QueuedMessage.h Wed Jul 25 12:03:34 2012
@@ -25,6 +25,7 @@
#define tests_storePerftools_asyncPerf_QueuedMessage_h_
#include "qpid/broker/AsyncStore.h"
+#include "qpid/broker/EnqueueHandle.h"
#include <boost/enable_shared_from_this.hpp>
#include <boost/intrusive_ptr.hpp>
@@ -54,6 +55,8 @@ public:
virtual ~QueuedMessage();
SimpleQueue* getQueue() const;
boost::intrusive_ptr<SimpleMessage> payload() const;
+ const qpid::broker::EnqueueHandle& enqHandle() const;
+ qpid::broker::EnqueueHandle& enqHandle();
// --- Transaction handling ---
void prepareEnqueue(qpid::broker::TxnHandle& th);
@@ -63,6 +66,7 @@ public:
private:
SimpleQueue* m_queue;
boost::intrusive_ptr<SimpleMessage> m_msg;
+ qpid::broker::EnqueueHandle m_enqHandle;
};
}}} // namespace tests::storePerfTools
Modified: qpid/branches/asyncstore/cpp/src/tests/storePerftools/asyncPerf/SimpleQueue.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/tests/storePerftools/asyncPerf/SimpleQueue.cpp?rev=1365545&r1=1365544&r2=1365545&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/tests/storePerftools/asyncPerf/SimpleQueue.cpp (original)
+++ qpid/branches/asyncstore/cpp/src/tests/storePerftools/asyncPerf/SimpleQueue.cpp Wed Jul 25 12:03:34 2012
@@ -26,7 +26,7 @@
#include "DeliveryRecord.h"
#include "MessageConsumer.h"
#include "MessageDeque.h"
-#include "PersistableQueuedMessage.h"
+#include "QueuedMessage.h"
#include "SimpleMessage.h"
#include "qpid/broker/AsyncResultHandle.h"
@@ -153,12 +153,7 @@ SimpleQueue::handleAsyncDestroyResult(co
void
SimpleQueue::deliver(boost::intrusive_ptr<SimpleMessage> msg)
{
- boost::shared_ptr<QueuedMessage> qm;
- if (msg->isPersistent() && m_store) {
- qm = boost::shared_ptr<PersistableQueuedMessage>(new PersistableQueuedMessage(this, msg));
- } else {
- qm = boost::shared_ptr<QueuedMessage>(new QueuedMessage(this, msg));
- }
+ boost::shared_ptr<QueuedMessage> qm(boost::shared_ptr<QueuedMessage>(new QueuedMessage(this, msg)));
enqueue(s_nullTxnHandle, qm);
push(qm);
}
@@ -191,7 +186,7 @@ SimpleQueue::enqueue(qpid::broker::TxnHa
}
if (qm->payload()->isPersistent() && m_store) {
qm->payload()->enqueueAsync(shared_from_this(), m_store);
- return asyncEnqueue(th, boost::dynamic_pointer_cast<PersistableQueuedMessage>(qm));
+ return asyncEnqueue(th, qm);
}
return false;
}
@@ -212,7 +207,7 @@ SimpleQueue::dequeue(qpid::broker::TxnHa
}
if (qm->payload()->isPersistent() && m_store) {
qm->payload()->dequeueAsync(shared_from_this(), m_store);
- return asyncDequeue(th, boost::dynamic_pointer_cast<PersistableQueuedMessage>(qm));
+ return asyncDequeue(th, qm);
}
return true;
}
@@ -220,13 +215,7 @@ SimpleQueue::dequeue(qpid::broker::TxnHa
void
SimpleQueue::process(boost::intrusive_ptr<SimpleMessage> msg)
{
- boost::shared_ptr<QueuedMessage> qm;
- if (msg->isPersistent() && m_store) {
- qm = boost::shared_ptr<PersistableQueuedMessage>(new PersistableQueuedMessage(this, msg));
- } else {
- qm = boost::shared_ptr<QueuedMessage>(new QueuedMessage(this, msg));
- }
- push(qm);
+ push(boost::shared_ptr<QueuedMessage>(new QueuedMessage(this, msg)));
}
void
@@ -356,12 +345,12 @@ SimpleQueue::push(boost::shared_ptr<Queu
// private
bool
SimpleQueue::asyncEnqueue(qpid::broker::TxnHandle& th,
- boost::shared_ptr<PersistableQueuedMessage> pqm)
+ boost::shared_ptr<QueuedMessage> qm)
{
- assert(pqm.get());
+ assert(qm.get());
// qm.payload()->setPersistenceId(m_store->getNextRid()); // TODO: rid is set by store itself - find way to do this
boost::shared_ptr<qpid::broker::QueueAsyncContext> qac(new qpid::broker::QueueAsyncContext(shared_from_this(),
- pqm->payload(),
+ qm->payload(),
th,
&handleAsyncEnqueueResult,
&m_resultQueue));
@@ -369,7 +358,7 @@ SimpleQueue::asyncEnqueue(qpid::broker::
if (th.isValid()) {
th.incrOpCnt();
}
- m_store->submitEnqueue(pqm->enqHandle(), th, qac);
+ m_store->submitEnqueue(qm->enqHandle(), th, qac);
++m_asyncOpCounter;
return true;
}
@@ -394,11 +383,11 @@ SimpleQueue::handleAsyncEnqueueResult(co
// private
bool
SimpleQueue::asyncDequeue(qpid::broker::TxnHandle& th,
- boost::shared_ptr<PersistableQueuedMessage> pqm)
+ boost::shared_ptr<QueuedMessage> qm)
{
- assert(pqm.get());
+ assert(qm.get());
boost::shared_ptr<qpid::broker::QueueAsyncContext> qac(new qpid::broker::QueueAsyncContext(shared_from_this(),
- pqm->payload(),
+ qm->payload(),
th,
&handleAsyncDequeueResult,
&m_resultQueue));
@@ -406,7 +395,7 @@ SimpleQueue::asyncDequeue(qpid::broker::
if (th.isValid()) {
th.incrOpCnt();
}
- m_store->submitDequeue(pqm->enqHandle(),
+ m_store->submitDequeue(qm->enqHandle(),
th,
qac);
++m_asyncOpCounter;
Modified: qpid/branches/asyncstore/cpp/src/tests/storePerftools/asyncPerf/SimpleQueue.h
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/tests/storePerftools/asyncPerf/SimpleQueue.h?rev=1365545&r1=1365544&r2=1365545&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/tests/storePerftools/asyncPerf/SimpleQueue.h (original)
+++ qpid/branches/asyncstore/cpp/src/tests/storePerftools/asyncPerf/SimpleQueue.h Wed Jul 25 12:03:34 2012
@@ -49,7 +49,6 @@ namespace asyncPerf {
class MessageConsumer;
class Messages;
-class PersistableQueuedMessage;
class QueuedMessage;
class SimpleMessage;
@@ -136,10 +135,10 @@ private:
// -- Async ops ---
bool asyncEnqueue(qpid::broker::TxnHandle& th,
- boost::shared_ptr<PersistableQueuedMessage> pqm);
+ boost::shared_ptr<QueuedMessage> qm);
static void handleAsyncEnqueueResult(const qpid::broker::AsyncResultHandle* const arh);
bool asyncDequeue(qpid::broker::TxnHandle& th,
- boost::shared_ptr<PersistableQueuedMessage> pqm);
+ boost::shared_ptr<QueuedMessage> qm);
static void handleAsyncDequeueResult(const qpid::broker::AsyncResultHandle* const arh);
// --- Async op counter ---
Modified: qpid/branches/asyncstore/cpp/src/tests/storePerftools/asyncPerf/TxnPublish.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/tests/storePerftools/asyncPerf/TxnPublish.cpp?rev=1365545&r1=1365544&r2=1365545&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/tests/storePerftools/asyncPerf/TxnPublish.cpp (original)
+++ qpid/branches/asyncstore/cpp/src/tests/storePerftools/asyncPerf/TxnPublish.cpp Wed Jul 25 12:03:34 2012
@@ -23,7 +23,6 @@
#include "TxnPublish.h"
-#include "PersistableQueuedMessage.h"
#include "QueuedMessage.h"
#include "SimpleMessage.h"
#include "SimpleQueue.h"
@@ -97,13 +96,7 @@ TxnPublish::contentSize()
void
TxnPublish::deliverTo(const boost::shared_ptr<SimpleQueue>& queue)
{
- boost::shared_ptr<QueuedMessage> qm;
- if (m_msg->isPersistent() && queue->getStore()) {
- qm = boost::shared_ptr<PersistableQueuedMessage>(new PersistableQueuedMessage(queue.get(), m_msg));
- } else {
- qm = boost::shared_ptr<QueuedMessage>(new QueuedMessage(queue.get(), m_msg));
- }
- m_queues.push_back(qm);
+ m_queues.push_back(boost::shared_ptr<QueuedMessage>(new QueuedMessage(queue.get(), m_msg)));
m_delivered = true;
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org