You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ri...@apache.org on 2009/10/28 16:31:05 UTC
svn commit: r830591 - in /qpid/branches/0.5.x-dev/qpid/cpp/src/qpid/broker:
Queue.cpp Queue.h RecoveredDequeue.cpp RecoveredEnqueue.cpp
Author: ritchiem
Date: Wed Oct 28 15:31:04 2009
New Revision: 830591
URL: http://svn.apache.org/viewvc?rev=830591&view=rev
Log:
r817742 (the fix for QPID-2102) did not cover the case for 2pc transactions recovered in the prepared state; this fixes that case.
Modified:
qpid/branches/0.5.x-dev/qpid/cpp/src/qpid/broker/Queue.cpp
qpid/branches/0.5.x-dev/qpid/cpp/src/qpid/broker/Queue.h
qpid/branches/0.5.x-dev/qpid/cpp/src/qpid/broker/RecoveredDequeue.cpp
qpid/branches/0.5.x-dev/qpid/cpp/src/qpid/broker/RecoveredEnqueue.cpp
Modified: qpid/branches/0.5.x-dev/qpid/cpp/src/qpid/broker/Queue.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/0.5.x-dev/qpid/cpp/src/qpid/broker/Queue.cpp?rev=830591&r1=830590&r2=830591&view=diff
==============================================================================
--- qpid/branches/0.5.x-dev/qpid/cpp/src/qpid/broker/Queue.cpp (original)
+++ qpid/branches/0.5.x-dev/qpid/cpp/src/qpid/broker/Queue.cpp Wed Oct 28 15:31:04 2009
@@ -179,6 +179,10 @@
}
}
+void Queue::recoverPrepared(boost::intrusive_ptr<Message>& msg)
+{
+ if (policy.get()) policy->recoverEnqueued(msg);
+}
void Queue::recover(boost::intrusive_ptr<Message>& msg){
if (policy.get()) policy->recoverEnqueued(msg);
Modified: qpid/branches/0.5.x-dev/qpid/cpp/src/qpid/broker/Queue.h
URL: http://svn.apache.org/viewvc/qpid/branches/0.5.x-dev/qpid/cpp/src/qpid/broker/Queue.h?rev=830591&r1=830590&r2=830591&view=diff
==============================================================================
--- qpid/branches/0.5.x-dev/qpid/cpp/src/qpid/broker/Queue.h (original)
+++ qpid/branches/0.5.x-dev/qpid/cpp/src/qpid/broker/Queue.h Wed Oct 28 15:31:04 2009
@@ -336,6 +336,12 @@
// For cluster update
QueueListeners& getListeners();
+
+ /**
+ * Reserve space in policy for an enqueued message that
+ * has been recovered in the prepared state (dtx only)
+ */
+ void recoverPrepared(boost::intrusive_ptr<Message>& msg);
};
}
}
Modified: qpid/branches/0.5.x-dev/qpid/cpp/src/qpid/broker/RecoveredDequeue.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/0.5.x-dev/qpid/cpp/src/qpid/broker/RecoveredDequeue.cpp?rev=830591&r1=830590&r2=830591&view=diff
==============================================================================
--- qpid/branches/0.5.x-dev/qpid/cpp/src/qpid/broker/RecoveredDequeue.cpp (original)
+++ qpid/branches/0.5.x-dev/qpid/cpp/src/qpid/broker/RecoveredDequeue.cpp Wed Oct 28 15:31:04 2009
@@ -23,17 +23,24 @@
using boost::intrusive_ptr;
using namespace qpid::broker;
-RecoveredDequeue::RecoveredDequeue(Queue::shared_ptr _queue, intrusive_ptr<Message> _msg) : queue(_queue), msg(_msg) {}
+RecoveredDequeue::RecoveredDequeue(Queue::shared_ptr _queue, intrusive_ptr<Message> _msg) : queue(_queue), msg(_msg)
+{
+ queue->recoverPrepared(msg);
+}
-bool RecoveredDequeue::prepare(TransactionContext*) throw(){
+bool RecoveredDequeue::prepare(TransactionContext*) throw()
+{
//should never be called; transaction has already prepared if an enqueue is recovered
return false;
}
-void RecoveredDequeue::commit() throw(){
+void RecoveredDequeue::commit() throw()
+{
+ queue->enqueueAborted(msg);
}
-void RecoveredDequeue::rollback() throw(){
+void RecoveredDequeue::rollback() throw()
+{
msg->enqueueComplete();
queue->process(msg);
}
Modified: qpid/branches/0.5.x-dev/qpid/cpp/src/qpid/broker/RecoveredEnqueue.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/0.5.x-dev/qpid/cpp/src/qpid/broker/RecoveredEnqueue.cpp?rev=830591&r1=830590&r2=830591&view=diff
==============================================================================
--- qpid/branches/0.5.x-dev/qpid/cpp/src/qpid/broker/RecoveredEnqueue.cpp (original)
+++ qpid/branches/0.5.x-dev/qpid/cpp/src/qpid/broker/RecoveredEnqueue.cpp Wed Oct 28 15:31:04 2009
@@ -23,7 +23,10 @@
using boost::intrusive_ptr;
using namespace qpid::broker;
-RecoveredEnqueue::RecoveredEnqueue(Queue::shared_ptr _queue, intrusive_ptr<Message> _msg) : queue(_queue), msg(_msg) {}
+RecoveredEnqueue::RecoveredEnqueue(Queue::shared_ptr _queue, intrusive_ptr<Message> _msg) : queue(_queue), msg(_msg)
+{
+ queue->recoverPrepared(msg);
+}
bool RecoveredEnqueue::prepare(TransactionContext*) throw(){
//should never be called; transaction has already prepared if an enqueue is recovered
@@ -36,5 +39,6 @@
}
void RecoveredEnqueue::rollback() throw(){
+ queue->enqueueAborted(msg);
}
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org