You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by sh...@apache.org on 2009/11/06 00:13:26 UTC

svn commit: r833230 - in /qpid/trunk/qpid/cpp/src/qpid/store/ms-sql: BlobRecordset.cpp BlobRecordset.h MSSqlProvider.cpp MessageMapRecordset.cpp MessageMapRecordset.h MessageRecordset.cpp MessageRecordset.h

Author: shuston
Date: Thu Nov  5 23:13:25 2009
New Revision: 833230

URL: http://svn.apache.org/viewvc?rev=833230&view=rev
Log:
Fix restoration of durable messages at startup. Remove all references to a queue that's about to be deleted from the database. Fixes QPID-2183.

Modified:
    qpid/trunk/qpid/cpp/src/qpid/store/ms-sql/BlobRecordset.cpp
    qpid/trunk/qpid/cpp/src/qpid/store/ms-sql/BlobRecordset.h
    qpid/trunk/qpid/cpp/src/qpid/store/ms-sql/MSSqlProvider.cpp
    qpid/trunk/qpid/cpp/src/qpid/store/ms-sql/MessageMapRecordset.cpp
    qpid/trunk/qpid/cpp/src/qpid/store/ms-sql/MessageMapRecordset.h
    qpid/trunk/qpid/cpp/src/qpid/store/ms-sql/MessageRecordset.cpp
    qpid/trunk/qpid/cpp/src/qpid/store/ms-sql/MessageRecordset.h

Modified: qpid/trunk/qpid/cpp/src/qpid/store/ms-sql/BlobRecordset.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/store/ms-sql/BlobRecordset.cpp?rev=833230&r1=833229&r2=833230&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/store/ms-sql/BlobRecordset.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/store/ms-sql/BlobRecordset.cpp Thu Nov  5 23:13:25 2009
@@ -31,6 +31,17 @@
 namespace ms_sql {
 
 void
+BlobRecordset::add(const qpid::broker::Persistable& item)
+{
+    BlobEncoder blob (item);   // Marshall item info to a blob
+    rs->AddNew();
+    rs->Fields->GetItem("fieldTableBlob")->AppendChunk(blob);
+    rs->Update();
+    uint64_t id = rs->Fields->Item["persistenceId"]->Value;
+    item.setPersistenceId(id);
+}
+
+void
 BlobRecordset::remove(uint64_t id)
 {
     // Look up the item by its persistenceId
@@ -45,17 +56,6 @@
 }
 
 void
-BlobRecordset::add(const qpid::broker::Persistable& item)
-{
-    BlobEncoder blob (item);   // Marshall item info to a blob
-    rs->AddNew();
-    rs->Fields->GetItem("fieldTableBlob")->AppendChunk(blob);
-    rs->Update();
-    uint64_t id = rs->Fields->Item["persistenceId"]->Value;
-    item.setPersistenceId(id);
-}
-
-void
 BlobRecordset::remove(const qpid::broker::Persistable& item)
 {
     remove(item.getPersistenceId());

Modified: qpid/trunk/qpid/cpp/src/qpid/store/ms-sql/BlobRecordset.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/store/ms-sql/BlobRecordset.h?rev=833230&r1=833229&r2=833230&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/store/ms-sql/BlobRecordset.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/store/ms-sql/BlobRecordset.h Thu Nov  5 23:13:25 2009
@@ -37,11 +37,12 @@
  */
 class BlobRecordset : public Recordset {
 protected:
-    // Remove a record given its Id.
-    void remove(uint64_t id);
 
 public:
     void add(const qpid::broker::Persistable& item);
+
+    // Remove a record given its Id.
+    void remove(uint64_t id);
     void remove(const qpid::broker::Persistable& item);
 
     // Dump table contents; useful for debugging.

Modified: qpid/trunk/qpid/cpp/src/qpid/store/ms-sql/MSSqlProvider.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/store/ms-sql/MSSqlProvider.cpp?rev=833230&r1=833229&r2=833230&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/store/ms-sql/MSSqlProvider.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/store/ms-sql/MSSqlProvider.cpp Thu Nov  5 23:13:25 2009
@@ -399,8 +399,8 @@
                       const qpid::framing::FieldTable& /*args needed for jrnl*/)
 {
     DatabaseConnection *db = initConnection();
+    BlobRecordset rsQueues;
     try {
-        BlobRecordset rsQueues;
         db->beginTransaction();
         rsQueues.open(db, TblQueue);
         rsQueues.add(queue);
@@ -418,16 +418,36 @@
 void
 MSSqlProvider::destroy(PersistableQueue& queue)
 {
+    // MessageDeleter class for use with for_each, below.
+    class MessageDeleter {
+        BlobRecordset& msgs;
+    public:
+        explicit MessageDeleter(BlobRecordset& _msgs) : msgs(_msgs) {}
+        void operator()(uint64_t msgId) { msgs.remove(msgId); }
+    };
+
     DatabaseConnection *db = initConnection();
+    BlobRecordset rsQueues;
+    BindingRecordset rsBindings;
+    MessageRecordset rsMessages;
+    MessageMapRecordset rsMessageMaps;
     try {
-        BlobRecordset rsQueues;
-        BindingRecordset rsBindings;
         db->beginTransaction();
         rsQueues.open(db, TblQueue);
         rsBindings.open(db, TblBinding);
+        rsMessages.open(db, TblMessage);
+        rsMessageMaps.open(db, TblMessageMap);
         // Remove bindings first; the queue IDs can't be ripped out from
-        // under the references in the bindings table.
+        // under the references in the bindings table. Then remove the
+        // message->queue entries for the queue, also because the queue can't
+        // be deleted while there are references to it. If there are messages
+        // orphaned by removing the queue references, those messages can
+        // also be deleted. Lastly, the queue record can be removed.
         rsBindings.removeForQueue(queue.getPersistenceId());
+        std::vector<uint64_t> orphans;
+        rsMessageMaps.removeForQueue(queue.getPersistenceId(), orphans);
+        std::for_each(orphans.begin(), orphans.end(),
+                      MessageDeleter(rsMessages));
         rsQueues.remove(queue);
         db->commitTransaction();
     }
@@ -445,8 +465,8 @@
                       const qpid::framing::FieldTable& args)
 {
     DatabaseConnection *db = initConnection();
+    BlobRecordset rsExchanges;
     try {
-        BlobRecordset rsExchanges;
         db->beginTransaction();
         rsExchanges.open(db, TblExchange);
         rsExchanges.add(exchange);
@@ -465,9 +485,9 @@
 MSSqlProvider::destroy(const PersistableExchange& exchange)
 {
     DatabaseConnection *db = initConnection();
+    BlobRecordset rsExchanges;
+    BindingRecordset rsBindings;
     try {
-        BlobRecordset rsExchanges;
-        BindingRecordset rsBindings;
         db->beginTransaction();
         rsExchanges.open(db, TblExchange);
         rsBindings.open(db, TblBinding);
@@ -493,8 +513,8 @@
                     const qpid::framing::FieldTable& args)
 {
     DatabaseConnection *db = initConnection();
+    BindingRecordset rsBindings;
     try {
-        BindingRecordset rsBindings;
         db->beginTransaction();
         rsBindings.open(db, TblBinding);
         rsBindings.add(exchange.getPersistenceId(),
@@ -520,8 +540,8 @@
                       const qpid::framing::FieldTable& args)
 {
     DatabaseConnection *db = initConnection();
+    BindingRecordset rsBindings;
     try {
-        BindingRecordset rsBindings;
         db->beginTransaction();
         rsBindings.open(db, TblBinding);
         rsBindings.remove(exchange.getPersistenceId(),
@@ -544,8 +564,8 @@
 MSSqlProvider::create(const PersistableConfig& config)
 {
     DatabaseConnection *db = initConnection();
+    BlobRecordset rsConfigs;
     try {
-        BlobRecordset rsConfigs;
         db->beginTransaction();
         rsConfigs.open(db, TblConfig);
         rsConfigs.add(config);
@@ -564,8 +584,8 @@
 MSSqlProvider::destroy(const PersistableConfig& config)
 {
     DatabaseConnection *db = initConnection();
+    BlobRecordset rsConfigs;
     try {
-        BlobRecordset rsConfigs;
         db->beginTransaction();
         rsConfigs.open(db, TblConfig);
         rsConfigs.remove(config);
@@ -590,8 +610,8 @@
 MSSqlProvider::stage(const boost::intrusive_ptr<PersistableMessage>& msg)
 {
     DatabaseConnection *db = initConnection();
+    MessageRecordset rsMessages;
     try {
-        MessageRecordset rsMessages;
         db->beginTransaction();
         rsMessages.open(db, TblMessage);
         rsMessages.add(msg);
@@ -613,8 +633,8 @@
 MSSqlProvider::destroy(PersistableMessage& msg)
 {
     DatabaseConnection *db = initConnection();
+    BlobRecordset rsMessages;
     try {
-        BlobRecordset rsMessages;
         db->beginTransaction();
         rsMessages.open(db, TblMessage);
         rsMessages.remove(msg);
@@ -634,8 +654,8 @@
                              const std::string& data)
 {
     DatabaseConnection *db = initConnection();
+    MessageRecordset rsMessages;
     try {
-        MessageRecordset rsMessages;
         db->beginTransaction();
         rsMessages.open(db, TblMessage);
         rsMessages.append(msg, data);
@@ -665,8 +685,8 @@
     // SQL store keeps all messages in one table, so we don't need the
     // queue reference.
     DatabaseConnection *db = initConnection();
+    MessageRecordset rsMessages;
     try {
-        MessageRecordset rsMessages;
         rsMessages.open(db, TblMessage);
         rsMessages.loadContent(msg, data, offset, length);
     }
@@ -710,13 +730,13 @@
         }
     }
 
+    MessageRecordset rsMessages;
+    MessageMapRecordset rsMap;
     try {
         if (msg->getPersistenceId() == 0) {    // Message itself not yet saved
-            MessageRecordset rsMessages;
             rsMessages.open(db, TblMessage);
             rsMessages.add(msg);
         }
-        MessageMapRecordset rsMap;
         rsMap.open(db, TblMessageMap);
         rsMap.add(msg->getPersistenceId(), queue.getPersistenceId());
         if (atxn)
@@ -769,13 +789,13 @@
         }
     }
 
+    MessageMapRecordset rsMap;
+    MessageRecordset rsMessages;
     try {
-        MessageMapRecordset rsMap;
         rsMap.open(db, TblMessageMap);
         bool more = rsMap.remove(msg->getPersistenceId(),
                                  queue.getPersistenceId());
         if (!more) {
-            MessageRecordset rsMessages;
             rsMessages.open(db, TblMessage);
             rsMessages.remove(msg);
         }

Modified: qpid/trunk/qpid/cpp/src/qpid/store/ms-sql/MessageMapRecordset.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/store/ms-sql/MessageMapRecordset.cpp?rev=833230&r1=833229&r2=833230&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/store/ms-sql/MessageMapRecordset.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/store/ms-sql/MessageMapRecordset.cpp Thu Nov  5 23:13:25 2009
@@ -48,12 +48,15 @@
     std::ostringstream filter;
     filter << "messageId = " << messageId << std::ends;
     rs->PutFilter (VariantHelper<std::string>(filter.str()));
+    if (rs->RecordCount == 0)
+        return false;
     MessageMap m;
     IADORecordBinding *piAdoRecordBinding;
-    rs->QueryInterface(__uuidof(IADORecordBinding), 
+    rs->QueryInterface(__uuidof(IADORecordBinding),
                        (LPVOID *)&piAdoRecordBinding);
     piAdoRecordBinding->BindToRecordset(&m);
     bool moreEntries = false, deleted = false;
+    rs->MoveFirst();
     // If the desired mapping gets deleted, and we already know there are
     // other mappings for the message, don't bother finishing the scan.
     while (!rs->EndOfFile && !(deleted && moreEntries)) {
@@ -68,10 +71,47 @@
         rs->MoveNext();
     }
     piAdoRecordBinding->Release();
+    rs->Filter = "";
     return moreEntries;
 }
 
 void
+MessageMapRecordset::removeForQueue(uint64_t queueId,
+                                    std::vector<uint64_t>& orphaned)
+{
+    // Read all the messages queued on queueId and add them to the orphaned
+    // list. Then remove each one and learn if there are references to it
+    // from other queues. The ones without references are left in the
+    // orphaned list, others are removed.
+    std::ostringstream filter;
+    filter << "queueId = " << queueId << std::ends;
+    rs->PutFilter (VariantHelper<std::string>(filter.str()));
+    MessageMap m;
+    IADORecordBinding *piAdoRecordBinding;
+    rs->QueryInterface(__uuidof(IADORecordBinding), 
+                       (LPVOID *)&piAdoRecordBinding);
+    piAdoRecordBinding->BindToRecordset(&m);
+    while (!rs->EndOfFile) {
+        orphaned.push_back(m.messageId);
+        rs->MoveNext();
+    }
+    piAdoRecordBinding->Release();
+    rs->Filter = "";     // Remove filter on queueId
+    rs->Requery(adOptionUnspecified);  // Get the entire map again
+
+    // Now delete all the messages on this queue; any message that still has
+    // references from other queue(s) is removed from orphaned.
+    for (std::vector<uint64_t>::iterator i = orphaned.begin();
+         i != orphaned.end();
+         ) {
+        if (remove(*i, queueId))
+            i = orphaned.erase(i);     // There are other refs to message *i
+        else
+            ++i;
+    }
+}
+
+void
 MessageMapRecordset::recover(MessageQueueMap& msgMap)
 {
     if (rs->BOF && rs->EndOfFile)

Modified: qpid/trunk/qpid/cpp/src/qpid/store/ms-sql/MessageMapRecordset.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/store/ms-sql/MessageMapRecordset.h?rev=833230&r1=833229&r2=833230&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/store/ms-sql/MessageMapRecordset.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/store/ms-sql/MessageMapRecordset.h Thu Nov  5 23:13:25 2009
@@ -23,6 +23,7 @@
  */
 
 #include <icrsint.h>
+#include <vector>
 #include "Recordset.h"
 #include <qpid/broker/RecoveryManager.h>
 
@@ -40,7 +41,7 @@
     class MessageMap : public CADORecordBinding {
         BEGIN_ADO_BINDING(MessageMap)
           ADO_FIXED_LENGTH_ENTRY2(1, adBigInt, messageId, FALSE)
-          ADO_FIXED_LENGTH_ENTRY2(1, adBigInt, queueId, FALSE)
+          ADO_FIXED_LENGTH_ENTRY2(2, adBigInt, queueId, FALSE)
         END_ADO_BINDING()
 
     public:
@@ -57,6 +58,13 @@
     // exists on any other queues.
     bool remove(uint64_t messageId, uint64_t queueId);
 
+    // Remove mappings for all messages on a specified queue. If there are
+    // messages that were only on the specified queue and are, therefore,
+    // orphaned now, return them in the orphaned vector. The orphaned
+    // messages can be deleted permanently as they are not referenced on
+    // any other queues.
+    void removeForQueue(uint64_t queueId, std::vector<uint64_t>& orphaned);
+
     // Recover the mappings of message ID -> vector<queue ID>.
     void recover(MessageQueueMap& msgMap);
 

Modified: qpid/trunk/qpid/cpp/src/qpid/store/ms-sql/MessageRecordset.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/store/ms-sql/MessageRecordset.cpp?rev=833230&r1=833229&r2=833230&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/store/ms-sql/MessageRecordset.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/store/ms-sql/MessageRecordset.cpp Thu Nov  5 23:13:25 2009
@@ -113,7 +113,7 @@
 
 void
 MessageRecordset::recover(qpid::broker::RecoveryManager& recoverer,
-                          std::map<uint64_t, broker::RecoverableMessage::shared_ptr> messageMap)
+                          std::map<uint64_t, broker::RecoverableMessage::shared_ptr>& messageMap)
 {
     if (rs->BOF && rs->EndOfFile)
         return;   // Nothing to do

Modified: qpid/trunk/qpid/cpp/src/qpid/store/ms-sql/MessageRecordset.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/store/ms-sql/MessageRecordset.h?rev=833230&r1=833229&r2=833230&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/store/ms-sql/MessageRecordset.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/store/ms-sql/MessageRecordset.h Thu Nov  5 23:13:25 2009
@@ -74,7 +74,7 @@
 
     // Recover messages and save a map of those recovered.
     void recover(qpid::broker::RecoveryManager& recoverer,
-                 std::map<uint64_t, broker::RecoverableMessage::shared_ptr> messageMap);
+                 std::map<uint64_t, broker::RecoverableMessage::shared_ptr>& messageMap);
 
     // Dump table contents; useful for debugging.
     void dump();



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org