You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by sh...@apache.org on 2009/11/06 00:13:26 UTC
svn commit: r833230 - in /qpid/trunk/qpid/cpp/src/qpid/store/ms-sql:
BlobRecordset.cpp BlobRecordset.h MSSqlProvider.cpp MessageMapRecordset.cpp
MessageMapRecordset.h MessageRecordset.cpp MessageRecordset.h
Author: shuston
Date: Thu Nov 5 23:13:25 2009
New Revision: 833230
URL: http://svn.apache.org/viewvc?rev=833230&view=rev
Log:
Fix restoration of durable messages at startup. Remove all references to a queue that's about to be deleted from the database. Fixes QPID-2183.
Modified:
qpid/trunk/qpid/cpp/src/qpid/store/ms-sql/BlobRecordset.cpp
qpid/trunk/qpid/cpp/src/qpid/store/ms-sql/BlobRecordset.h
qpid/trunk/qpid/cpp/src/qpid/store/ms-sql/MSSqlProvider.cpp
qpid/trunk/qpid/cpp/src/qpid/store/ms-sql/MessageMapRecordset.cpp
qpid/trunk/qpid/cpp/src/qpid/store/ms-sql/MessageMapRecordset.h
qpid/trunk/qpid/cpp/src/qpid/store/ms-sql/MessageRecordset.cpp
qpid/trunk/qpid/cpp/src/qpid/store/ms-sql/MessageRecordset.h
Modified: qpid/trunk/qpid/cpp/src/qpid/store/ms-sql/BlobRecordset.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/store/ms-sql/BlobRecordset.cpp?rev=833230&r1=833229&r2=833230&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/store/ms-sql/BlobRecordset.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/store/ms-sql/BlobRecordset.cpp Thu Nov 5 23:13:25 2009
@@ -31,6 +31,17 @@
namespace ms_sql {
void
+BlobRecordset::add(const qpid::broker::Persistable& item)
+{
+ BlobEncoder blob (item); // Marshall item info to a blob
+ rs->AddNew();
+ rs->Fields->GetItem("fieldTableBlob")->AppendChunk(blob);
+ rs->Update();
+ uint64_t id = rs->Fields->Item["persistenceId"]->Value;
+ item.setPersistenceId(id);
+}
+
+void
BlobRecordset::remove(uint64_t id)
{
// Look up the item by its persistenceId
@@ -45,17 +56,6 @@
}
void
-BlobRecordset::add(const qpid::broker::Persistable& item)
-{
- BlobEncoder blob (item); // Marshall item info to a blob
- rs->AddNew();
- rs->Fields->GetItem("fieldTableBlob")->AppendChunk(blob);
- rs->Update();
- uint64_t id = rs->Fields->Item["persistenceId"]->Value;
- item.setPersistenceId(id);
-}
-
-void
BlobRecordset::remove(const qpid::broker::Persistable& item)
{
remove(item.getPersistenceId());
Modified: qpid/trunk/qpid/cpp/src/qpid/store/ms-sql/BlobRecordset.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/store/ms-sql/BlobRecordset.h?rev=833230&r1=833229&r2=833230&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/store/ms-sql/BlobRecordset.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/store/ms-sql/BlobRecordset.h Thu Nov 5 23:13:25 2009
@@ -37,11 +37,12 @@
*/
class BlobRecordset : public Recordset {
protected:
- // Remove a record given its Id.
- void remove(uint64_t id);
public:
void add(const qpid::broker::Persistable& item);
+
+ // Remove a record given its Id.
+ void remove(uint64_t id);
void remove(const qpid::broker::Persistable& item);
// Dump table contents; useful for debugging.
Modified: qpid/trunk/qpid/cpp/src/qpid/store/ms-sql/MSSqlProvider.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/store/ms-sql/MSSqlProvider.cpp?rev=833230&r1=833229&r2=833230&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/store/ms-sql/MSSqlProvider.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/store/ms-sql/MSSqlProvider.cpp Thu Nov 5 23:13:25 2009
@@ -399,8 +399,8 @@
const qpid::framing::FieldTable& /*args needed for jrnl*/)
{
DatabaseConnection *db = initConnection();
+ BlobRecordset rsQueues;
try {
- BlobRecordset rsQueues;
db->beginTransaction();
rsQueues.open(db, TblQueue);
rsQueues.add(queue);
@@ -418,16 +418,36 @@
void
MSSqlProvider::destroy(PersistableQueue& queue)
{
+ // MessageDeleter class for use with for_each, below.
+ class MessageDeleter {
+ BlobRecordset& msgs;
+ public:
+ explicit MessageDeleter(BlobRecordset& _msgs) : msgs(_msgs) {}
+ void operator()(uint64_t msgId) { msgs.remove(msgId); }
+ };
+
DatabaseConnection *db = initConnection();
+ BlobRecordset rsQueues;
+ BindingRecordset rsBindings;
+ MessageRecordset rsMessages;
+ MessageMapRecordset rsMessageMaps;
try {
- BlobRecordset rsQueues;
- BindingRecordset rsBindings;
db->beginTransaction();
rsQueues.open(db, TblQueue);
rsBindings.open(db, TblBinding);
+ rsMessages.open(db, TblMessage);
+ rsMessageMaps.open(db, TblMessageMap);
// Remove bindings first; the queue IDs can't be ripped out from
- // under the references in the bindings table.
+ // under the references in the bindings table. Then remove the
+ // message->queue entries for the queue, also because the queue can't
+ // be deleted while there are references to it. If there are messages
+ // orphaned by removing the queue references, those messages can
+ // also be deleted. Lastly, the queue record can be removed.
rsBindings.removeForQueue(queue.getPersistenceId());
+ std::vector<uint64_t> orphans;
+ rsMessageMaps.removeForQueue(queue.getPersistenceId(), orphans);
+ std::for_each(orphans.begin(), orphans.end(),
+ MessageDeleter(rsMessages));
rsQueues.remove(queue);
db->commitTransaction();
}
@@ -445,8 +465,8 @@
const qpid::framing::FieldTable& args)
{
DatabaseConnection *db = initConnection();
+ BlobRecordset rsExchanges;
try {
- BlobRecordset rsExchanges;
db->beginTransaction();
rsExchanges.open(db, TblExchange);
rsExchanges.add(exchange);
@@ -465,9 +485,9 @@
MSSqlProvider::destroy(const PersistableExchange& exchange)
{
DatabaseConnection *db = initConnection();
+ BlobRecordset rsExchanges;
+ BindingRecordset rsBindings;
try {
- BlobRecordset rsExchanges;
- BindingRecordset rsBindings;
db->beginTransaction();
rsExchanges.open(db, TblExchange);
rsBindings.open(db, TblBinding);
@@ -493,8 +513,8 @@
const qpid::framing::FieldTable& args)
{
DatabaseConnection *db = initConnection();
+ BindingRecordset rsBindings;
try {
- BindingRecordset rsBindings;
db->beginTransaction();
rsBindings.open(db, TblBinding);
rsBindings.add(exchange.getPersistenceId(),
@@ -520,8 +540,8 @@
const qpid::framing::FieldTable& args)
{
DatabaseConnection *db = initConnection();
+ BindingRecordset rsBindings;
try {
- BindingRecordset rsBindings;
db->beginTransaction();
rsBindings.open(db, TblBinding);
rsBindings.remove(exchange.getPersistenceId(),
@@ -544,8 +564,8 @@
MSSqlProvider::create(const PersistableConfig& config)
{
DatabaseConnection *db = initConnection();
+ BlobRecordset rsConfigs;
try {
- BlobRecordset rsConfigs;
db->beginTransaction();
rsConfigs.open(db, TblConfig);
rsConfigs.add(config);
@@ -564,8 +584,8 @@
MSSqlProvider::destroy(const PersistableConfig& config)
{
DatabaseConnection *db = initConnection();
+ BlobRecordset rsConfigs;
try {
- BlobRecordset rsConfigs;
db->beginTransaction();
rsConfigs.open(db, TblConfig);
rsConfigs.remove(config);
@@ -590,8 +610,8 @@
MSSqlProvider::stage(const boost::intrusive_ptr<PersistableMessage>& msg)
{
DatabaseConnection *db = initConnection();
+ MessageRecordset rsMessages;
try {
- MessageRecordset rsMessages;
db->beginTransaction();
rsMessages.open(db, TblMessage);
rsMessages.add(msg);
@@ -613,8 +633,8 @@
MSSqlProvider::destroy(PersistableMessage& msg)
{
DatabaseConnection *db = initConnection();
+ BlobRecordset rsMessages;
try {
- BlobRecordset rsMessages;
db->beginTransaction();
rsMessages.open(db, TblMessage);
rsMessages.remove(msg);
@@ -634,8 +654,8 @@
const std::string& data)
{
DatabaseConnection *db = initConnection();
+ MessageRecordset rsMessages;
try {
- MessageRecordset rsMessages;
db->beginTransaction();
rsMessages.open(db, TblMessage);
rsMessages.append(msg, data);
@@ -665,8 +685,8 @@
// SQL store keeps all messages in one table, so we don't need the
// queue reference.
DatabaseConnection *db = initConnection();
+ MessageRecordset rsMessages;
try {
- MessageRecordset rsMessages;
rsMessages.open(db, TblMessage);
rsMessages.loadContent(msg, data, offset, length);
}
@@ -710,13 +730,13 @@
}
}
+ MessageRecordset rsMessages;
+ MessageMapRecordset rsMap;
try {
if (msg->getPersistenceId() == 0) { // Message itself not yet saved
- MessageRecordset rsMessages;
rsMessages.open(db, TblMessage);
rsMessages.add(msg);
}
- MessageMapRecordset rsMap;
rsMap.open(db, TblMessageMap);
rsMap.add(msg->getPersistenceId(), queue.getPersistenceId());
if (atxn)
@@ -769,13 +789,13 @@
}
}
+ MessageMapRecordset rsMap;
+ MessageRecordset rsMessages;
try {
- MessageMapRecordset rsMap;
rsMap.open(db, TblMessageMap);
bool more = rsMap.remove(msg->getPersistenceId(),
queue.getPersistenceId());
if (!more) {
- MessageRecordset rsMessages;
rsMessages.open(db, TblMessage);
rsMessages.remove(msg);
}
Modified: qpid/trunk/qpid/cpp/src/qpid/store/ms-sql/MessageMapRecordset.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/store/ms-sql/MessageMapRecordset.cpp?rev=833230&r1=833229&r2=833230&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/store/ms-sql/MessageMapRecordset.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/store/ms-sql/MessageMapRecordset.cpp Thu Nov 5 23:13:25 2009
@@ -48,12 +48,15 @@
std::ostringstream filter;
filter << "messageId = " << messageId << std::ends;
rs->PutFilter (VariantHelper<std::string>(filter.str()));
+ if (rs->RecordCount == 0)
+ return false;
MessageMap m;
IADORecordBinding *piAdoRecordBinding;
- rs->QueryInterface(__uuidof(IADORecordBinding),
+ rs->QueryInterface(__uuidof(IADORecordBinding),
(LPVOID *)&piAdoRecordBinding);
piAdoRecordBinding->BindToRecordset(&m);
bool moreEntries = false, deleted = false;
+ rs->MoveFirst();
// If the desired mapping gets deleted, and we already know there are
// other mappings for the message, don't bother finishing the scan.
while (!rs->EndOfFile && !(deleted && moreEntries)) {
@@ -68,10 +71,47 @@
rs->MoveNext();
}
piAdoRecordBinding->Release();
+ rs->Filter = "";
return moreEntries;
}
void
+MessageMapRecordset::removeForQueue(uint64_t queueId,
+ std::vector<uint64_t>& orphaned)
+{
+ // Read all the messages queued on queueId and add them to the orphaned
+ // list. Then remove each one and learn if there are references to it
+ // from other queues. The ones without references are left in the
+ // orphaned list, others are removed.
+ std::ostringstream filter;
+ filter << "queueId = " << queueId << std::ends;
+ rs->PutFilter (VariantHelper<std::string>(filter.str()));
+ MessageMap m;
+ IADORecordBinding *piAdoRecordBinding;
+ rs->QueryInterface(__uuidof(IADORecordBinding),
+ (LPVOID *)&piAdoRecordBinding);
+ piAdoRecordBinding->BindToRecordset(&m);
+ while (!rs->EndOfFile) {
+ orphaned.push_back(m.messageId);
+ rs->MoveNext();
+ }
+ piAdoRecordBinding->Release();
+ rs->Filter = ""; // Remove filter on queueId
+ rs->Requery(adOptionUnspecified); // Get the entire map again
+
+ // Now delete all the messages on this queue; any message that still has
+ // references from other queue(s) is removed from orphaned.
+ for (std::vector<uint64_t>::iterator i = orphaned.begin();
+ i != orphaned.end();
+ ) {
+ if (remove(*i, queueId))
+ i = orphaned.erase(i); // There are other refs to message *i
+ else
+ ++i;
+ }
+}
+
+void
MessageMapRecordset::recover(MessageQueueMap& msgMap)
{
if (rs->BOF && rs->EndOfFile)
Modified: qpid/trunk/qpid/cpp/src/qpid/store/ms-sql/MessageMapRecordset.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/store/ms-sql/MessageMapRecordset.h?rev=833230&r1=833229&r2=833230&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/store/ms-sql/MessageMapRecordset.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/store/ms-sql/MessageMapRecordset.h Thu Nov 5 23:13:25 2009
@@ -23,6 +23,7 @@
*/
#include <icrsint.h>
+#include <vector>
#include "Recordset.h"
#include <qpid/broker/RecoveryManager.h>
@@ -40,7 +41,7 @@
class MessageMap : public CADORecordBinding {
BEGIN_ADO_BINDING(MessageMap)
ADO_FIXED_LENGTH_ENTRY2(1, adBigInt, messageId, FALSE)
- ADO_FIXED_LENGTH_ENTRY2(1, adBigInt, queueId, FALSE)
+ ADO_FIXED_LENGTH_ENTRY2(2, adBigInt, queueId, FALSE)
END_ADO_BINDING()
public:
@@ -57,6 +58,13 @@
// exists on any other queues.
bool remove(uint64_t messageId, uint64_t queueId);
+ // Remove mappings for all messages on a specified queue. If there are
+ // messages that were only on the specified queue and are, therefore,
+ // orphaned now, return them in the orphaned vector. The orphaned
+ // messages can be deleted permanently as they are not referenced on
+ // any other queues.
+ void removeForQueue(uint64_t queueId, std::vector<uint64_t>& orphaned);
+
// Recover the mappings of message ID -> vector<queue ID>.
void recover(MessageQueueMap& msgMap);
Modified: qpid/trunk/qpid/cpp/src/qpid/store/ms-sql/MessageRecordset.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/store/ms-sql/MessageRecordset.cpp?rev=833230&r1=833229&r2=833230&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/store/ms-sql/MessageRecordset.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/store/ms-sql/MessageRecordset.cpp Thu Nov 5 23:13:25 2009
@@ -113,7 +113,7 @@
void
MessageRecordset::recover(qpid::broker::RecoveryManager& recoverer,
- std::map<uint64_t, broker::RecoverableMessage::shared_ptr> messageMap)
+ std::map<uint64_t, broker::RecoverableMessage::shared_ptr>& messageMap)
{
if (rs->BOF && rs->EndOfFile)
return; // Nothing to do
Modified: qpid/trunk/qpid/cpp/src/qpid/store/ms-sql/MessageRecordset.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/store/ms-sql/MessageRecordset.h?rev=833230&r1=833229&r2=833230&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/store/ms-sql/MessageRecordset.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/store/ms-sql/MessageRecordset.h Thu Nov 5 23:13:25 2009
@@ -74,7 +74,7 @@
// Recover messages and save a map of those recovered.
void recover(qpid::broker::RecoveryManager& recoverer,
- std::map<uint64_t, broker::RecoverableMessage::shared_ptr> messageMap);
+ std::map<uint64_t, broker::RecoverableMessage::shared_ptr>& messageMap);
// Dump table contents; useful for debugging.
void dump();
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org