You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by sh...@apache.org on 2009/10/30 23:30:35 UTC

svn commit: r831476 - in /qpid/trunk/qpid/cpp/src/qpid: broker/ store/ store/ms-sql/

Author: shuston
Date: Fri Oct 30 22:30:34 2009
New Revision: 831476

URL: http://svn.apache.org/viewvc?rev=831476&view=rev
Log:
In broker, change RecoverableExchange[Impl]::bind() strings to const; they are never changed, and are only passed on to other methods that expect const strings. Makes it easier to work with from store.

In SQL store, fix the deletion of multiple bindings at once. Also, change the tblBinding table to store queue persistenceIds instead of names. Helps keep referential integrity with the queue table. Restoring bindings then just needs to look up the queue name from its ID before restoring each binding.

Fixes QPID-2169, and probably QPID-2170.

Modified:
    qpid/trunk/qpid/cpp/src/qpid/broker/RecoverableExchange.h
    qpid/trunk/qpid/cpp/src/qpid/broker/RecoveryManagerImpl.cpp
    qpid/trunk/qpid/cpp/src/qpid/store/MessageStorePlugin.cpp
    qpid/trunk/qpid/cpp/src/qpid/store/StorageProvider.h
    qpid/trunk/qpid/cpp/src/qpid/store/ms-sql/BindingRecordset.cpp
    qpid/trunk/qpid/cpp/src/qpid/store/ms-sql/BindingRecordset.h
    qpid/trunk/qpid/cpp/src/qpid/store/ms-sql/MSSqlProvider.cpp

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/RecoverableExchange.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/RecoverableExchange.h?rev=831476&r1=831475&r2=831476&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/RecoverableExchange.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/RecoverableExchange.h Fri Oct 30 22:30:34 2009
@@ -40,7 +40,9 @@
     /**
      * Recover binding. Nb: queue must have been recovered earlier.
      */
-    virtual void bind(std::string& queue, std::string& routingKey, qpid::framing::FieldTable& args) = 0;
+    virtual void bind(const std::string& queue,
+                      const std::string& routingKey,
+                      qpid::framing::FieldTable& args) = 0;
     virtual ~RecoverableExchange() {};
 };
 

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/RecoveryManagerImpl.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/RecoveryManagerImpl.cpp?rev=831476&r1=831475&r2=831476&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/RecoveryManagerImpl.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/RecoveryManagerImpl.cpp Fri Oct 30 22:30:34 2009
@@ -78,7 +78,7 @@
 public:
     RecoverableExchangeImpl(Exchange::shared_ptr _exchange, QueueRegistry& _queues) : exchange(_exchange), queues(_queues) {}
     void setPersistenceId(uint64_t id);
-    void bind(std::string& queue, std::string& routingKey, qpid::framing::FieldTable& args);
+    void bind(const std::string& queue, const std::string& routingKey, qpid::framing::FieldTable& args);
 };
 
 class RecoverableConfigImpl : public RecoverableConfig
@@ -230,7 +230,9 @@
         bridge->setPersistenceId(id);
 }
 
-void RecoverableExchangeImpl::bind(string& queueName, string& key, framing::FieldTable& args)
+void RecoverableExchangeImpl::bind(const string& queueName,
+                                   const string& key,
+                                   framing::FieldTable& args)
 {
     Queue::shared_ptr queue = queues.find(queueName);
     exchange->bind(queue, key, &args);

Modified: qpid/trunk/qpid/cpp/src/qpid/store/MessageStorePlugin.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/store/MessageStorePlugin.cpp?rev=831476&r1=831475&r2=831476&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/store/MessageStorePlugin.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/store/MessageStorePlugin.cpp Fri Oct 30 22:30:34 2009
@@ -409,7 +409,7 @@
     provider->second->recoverConfigs(recoverer);
     provider->second->recoverExchanges(recoverer, exchanges);
     provider->second->recoverQueues(recoverer, queues);
-    provider->second->recoverBindings(recoverer, exchanges);
+    provider->second->recoverBindings(recoverer, exchanges, queues);
     provider->second->recoverMessages(recoverer, messages, messageQueueMap);
     // Enqueue msgs where needed.
     for (MessageQueueMap::const_iterator i = messageQueueMap.begin();

Modified: qpid/trunk/qpid/cpp/src/qpid/store/StorageProvider.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/store/StorageProvider.h?rev=831476&r1=831475&r2=831476&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/store/StorageProvider.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/store/StorageProvider.h Fri Oct 30 22:30:34 2009
@@ -311,7 +311,8 @@
     virtual void recoverQueues(qpid::broker::RecoveryManager& recoverer,
                                QueueMap& queueMap) = 0;
     virtual void recoverBindings(qpid::broker::RecoveryManager& recoverer,
-                                 const ExchangeMap& exchangeMap) = 0;
+                                 const ExchangeMap& exchangeMap,
+                                 const QueueMap& queueMap) = 0;
     virtual void recoverMessages(qpid::broker::RecoveryManager& recoverer,
                                  MessageMap& messageMap,
                                  MessageQueueMap& messageQueueMap) = 0;

Modified: qpid/trunk/qpid/cpp/src/qpid/store/ms-sql/BindingRecordset.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/store/ms-sql/BindingRecordset.cpp?rev=831476&r1=831475&r2=831476&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/store/ms-sql/BindingRecordset.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/store/ms-sql/BindingRecordset.cpp Fri Oct 30 22:30:34 2009
@@ -32,17 +32,32 @@
 namespace ms_sql {
 
 void
+BindingRecordset::removeFilter(const std::string& filter)
+{
+    rs->PutFilter (VariantHelper<std::string>(filter));
+    long recs = rs->GetRecordCount();
+    if (recs == 0)
+        return;   // Nothing to do
+    while (recs > 0) {
+        // Deleting adAffectAll doesn't work as documented; go one by one.
+        rs->Delete(adAffectCurrent);
+        if (--recs > 0)
+            rs->MoveNext();
+    }
+    rs->Update();
+}
+
+void
 BindingRecordset::add(uint64_t exchangeId,
-                      const std::string& queueName,
+                      uint64_t queueId,
                       const std::string& routingKey,
                       const qpid::framing::FieldTable& args)
 {
-    VariantHelper<std::string> queueNameStr(queueName);
     VariantHelper<std::string> routingKeyStr(routingKey);
     BlobEncoder blob (args);   // Marshall field table to a blob
     rs->AddNew();
     rs->Fields->GetItem("exchangeId")->Value = exchangeId;
-    rs->Fields->GetItem("queueName")->Value = queueNameStr;
+    rs->Fields->GetItem("queueId")->Value = queueId;
     rs->Fields->GetItem("routingKey")->Value = routingKeyStr;
     rs->Fields->GetItem("fieldTableBlob")->AppendChunk(blob);
     rs->Update();
@@ -50,57 +65,40 @@
 
 void
 BindingRecordset::remove(uint64_t exchangeId,
-                         const std::string& queueName,
+                         uint64_t queueId,
                          const std::string& routingKey,
                          const qpid::framing::FieldTable& /*args*/)
 {
     // Look up the affected binding.
     std::ostringstream filter;
     filter << "exchangeId = " << exchangeId
-           << " AND queueName = '" << queueName << "'"
+           << " AND queueId = " << queueId
            << " AND routingKey = '" << routingKey << "'" << std::ends;
-    rs->PutFilter (VariantHelper<std::string>(filter.str()));
-    if (rs->RecordCount != 0) {
-        // Delete the records
-        rs->Delete(adAffectGroup);
-        rs->Update();
-    }
-    requery();
+    removeFilter(filter.str());
 }
 
 void
-BindingRecordset::remove(uint64_t exchangeId)
+BindingRecordset::removeForExchange(uint64_t exchangeId)
 {
     // Look up the affected bindings by the exchange ID
     std::ostringstream filter;
     filter << "exchangeId = " << exchangeId << std::ends;
-    rs->PutFilter (VariantHelper<std::string>(filter.str()));
-    if (rs->RecordCount != 0) {
-        // Delete the records
-        rs->Delete(adAffectGroup);
-        rs->Update();
-    }
-    requery();
+    removeFilter(filter.str());
 }
 
 void
-BindingRecordset::remove(const std::string& queueName)
+BindingRecordset::removeForQueue(uint64_t queueId)
 {
-    // Look up the affected bindings by the exchange ID
+    // Look up the affected bindings by the queue ID
     std::ostringstream filter;
-    filter << "queueName = '" << queueName << "'" << std::ends;
-    rs->PutFilter (VariantHelper<std::string>(filter.str()));
-    if (rs->RecordCount != 0) {
-        // Delete the records
-        rs->Delete(adAffectGroup);
-        rs->Update();
-    }
-    requery();
+    filter << "queueId = " << queueId << std::ends;
+    removeFilter(filter.str());
 }
 
 void
-BindingRecordset::recover(qpid::broker::RecoveryManager& recoverer,
-                          std::map<uint64_t, broker::RecoverableExchange::shared_ptr> exchMap)
+BindingRecordset::recover(broker::RecoveryManager& recoverer,
+                          const store::ExchangeMap& exchMap,
+                          const store::QueueMap& queueMap)
 {
     if (rs->BOF && rs->EndOfFile)
         return;   // Nothing to do
@@ -114,9 +112,26 @@
         long blobSize = rs->Fields->Item["fieldTableBlob"]->ActualSize;
         BlobAdapter blob(blobSize);
         blob = rs->Fields->Item["fieldTableBlob"]->GetChunk(blobSize);
-        broker::RecoverableExchange::shared_ptr exch = exchMap[b.exchangeId];
-        std::string q(b.queueName), k(b.routingKey);
-        exch->bind(q, k, blob);
+        store::ExchangeMap::const_iterator exch = exchMap.find(b.exchangeId);
+        if (exch == exchMap.end()) {
+            std::ostringstream msg;
+            msg << "Error recovering bindings; exchange ID " << b.exchangeId
+                << " not found in exchange map";
+            throw qpid::Exception(msg.str());
+        }
+        broker::RecoverableExchange::shared_ptr exchPtr = exch->second;
+        store::QueueMap::const_iterator q = queueMap.find(b.queueId);
+        if (q == queueMap.end()) {
+            std::ostringstream msg;
+            msg << "Error recovering bindings; queue ID " << b.queueId
+                << " not found in queue map";
+            throw qpid::Exception(msg.str());
+        }
+        broker::RecoverableQueue::shared_ptr qPtr = q->second;
+        // The recovery manager wants the queue name, so get it from the
+        // RecoverableQueue.
+        std::string key(b.routingKey);
+        exchPtr->bind(qPtr->getName(), key, blob);
         rs->MoveNext();
     }
 
@@ -138,8 +153,8 @@
     piAdoRecordBinding->BindToRecordset(&b);
    
     while (VARIANT_FALSE == rs->EndOfFile) {
-      QPID_LOG(notice, "exch " << b.exchangeId
-                       << ", q: " << b.queueName
+      QPID_LOG(notice, "exch Id " << b.exchangeId
+                       << ", q Id " << b.queueId
                        << ", k: " << b.routingKey);
       rs->MoveNext();
     }

Modified: qpid/trunk/qpid/cpp/src/qpid/store/ms-sql/BindingRecordset.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/store/ms-sql/BindingRecordset.h?rev=831476&r1=831475&r2=831476&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/store/ms-sql/BindingRecordset.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/store/ms-sql/BindingRecordset.h Fri Oct 30 22:30:34 2009
@@ -24,6 +24,7 @@
 
 #include <icrsint.h>
 #include "Recordset.h"
+#include <qpid/store/StorageProvider.h>
 #include <qpid/broker/RecoveryManager.h>
 
 namespace qpid {
@@ -40,40 +41,43 @@
     class Binding : public CADORecordBinding {
         BEGIN_ADO_BINDING(Binding)
           ADO_FIXED_LENGTH_ENTRY2(1, adBigInt, exchangeId, FALSE)
-          ADO_VARIABLE_LENGTH_ENTRY4(2, adVarChar, queueName, 
-                                     sizeof(queueName), FALSE)
+          ADO_FIXED_LENGTH_ENTRY2(2, adBigInt, queueId, FALSE)
           ADO_VARIABLE_LENGTH_ENTRY4(3, adVarChar, routingKey, 
                                      sizeof(routingKey), FALSE)
         END_ADO_BINDING()
 
     public:
         uint64_t exchangeId;
-        char queueName[256];
+        uint64_t queueId;
         char routingKey[256];
     };
 
+    // Remove all records matching the specified filter/query.
+    void removeFilter(const std::string& filter);
+
 public:
     // Add a new binding
     void add(uint64_t exchangeId,
-             const std::string& queueName,
+             uint64_t queueId,
              const std::string& routingKey,
              const qpid::framing::FieldTable& args);
 
     // Remove a specific binding
     void remove(uint64_t exchangeId,
-                const std::string& queueName,
+                uint64_t queueId,
                 const std::string& routingKey,
                 const qpid::framing::FieldTable& args);
 
     // Remove all bindings for the specified exchange
-    void remove(uint64_t exchangeId);
+    void removeForExchange(uint64_t exchangeId);
 
     // Remove all bindings for the specified queue
-    void remove(const std::string& queueName);
+    void removeForQueue(uint64_t queueId);
 
     // Recover bindings set using exchMap to get from Id to RecoverableExchange.
     void recover(qpid::broker::RecoveryManager& recoverer,
-                 std::map<uint64_t, broker::RecoverableExchange::shared_ptr> exchMap);
+                 const qpid::store::ExchangeMap& exchMap,
+                 const qpid::store::QueueMap& queueMap);
 
     // Dump table contents; useful for debugging.
     void dump();

Modified: qpid/trunk/qpid/cpp/src/qpid/store/ms-sql/MSSqlProvider.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/store/ms-sql/MSSqlProvider.cpp?rev=831476&r1=831475&r2=831476&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/store/ms-sql/MSSqlProvider.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/store/ms-sql/MSSqlProvider.cpp Fri Oct 30 22:30:34 2009
@@ -267,7 +267,8 @@
     virtual void recoverQueues(qpid::broker::RecoveryManager& recoverer,
                                QueueMap& queueMap);
     virtual void recoverBindings(qpid::broker::RecoveryManager& recoverer,
-                                 const ExchangeMap& exchangeMap);
+                                 const ExchangeMap& exchangeMap,
+                                 const QueueMap& queueMap);
     virtual void recoverMessages(qpid::broker::RecoveryManager& recoverer,
                                  MessageMap& messageMap,
                                  MessageQueueMap& messageQueueMap);
@@ -424,8 +425,10 @@
         db->beginTransaction();
         rsQueues.open(db, TblQueue);
         rsBindings.open(db, TblBinding);
+        // Remove bindings first; the queue IDs can't be ripped out from
+        // under the references in the bindings table.
+        rsBindings.removeForQueue(queue.getPersistenceId());
         rsQueues.remove(queue);
-        rsBindings.remove(queue.getName());
         db->commitTransaction();
     }
     catch(_com_error &e) {
@@ -468,8 +471,10 @@
         db->beginTransaction();
         rsExchanges.open(db, TblExchange);
         rsBindings.open(db, TblBinding);
+        // Remove bindings first; the exchange IDs can't be ripped out from
+        // under the references in the bindings table.
+        rsBindings.removeForExchange(exchange.getPersistenceId());
         rsExchanges.remove(exchange);
-        rsBindings.remove(exchange.getPersistenceId());
         db->commitTransaction();
     }
     catch(_com_error &e) {
@@ -492,7 +497,10 @@
         BindingRecordset rsBindings;
         db->beginTransaction();
         rsBindings.open(db, TblBinding);
-        rsBindings.add(exchange.getPersistenceId(), queue.getName(), key, args);
+        rsBindings.add(exchange.getPersistenceId(),
+                       queue.getPersistenceId(),
+                       key,
+                       args);
         db->commitTransaction();
     }
     catch(_com_error &e) {
@@ -517,7 +525,7 @@
         db->beginTransaction();
         rsBindings.open(db, TblBinding);
         rsBindings.remove(exchange.getPersistenceId(),
-                          queue.getName(),
+                          queue.getPersistenceId(),
                           key,
                           args);
         db->commitTransaction();
@@ -888,12 +896,13 @@
 
 void
 MSSqlProvider::recoverBindings(qpid::broker::RecoveryManager& recoverer,
-                               const ExchangeMap& exchangeMap)
+                               const ExchangeMap& exchangeMap,
+                               const QueueMap& queueMap)
 {
     DatabaseConnection *db = initConnection();
     BindingRecordset rsBindings;
     rsBindings.open(db, TblBinding);
-    rsBindings.recover(recoverer, exchangeMap);
+    rsBindings.recover(recoverer, exchangeMap, queueMap);
 }
 
 void
@@ -947,7 +956,7 @@
         "  fieldTableBlob varbinary(MAX) NOT NULL)";
     const std::string bindingSpecs =
         " (exchangeId bigint REFERENCES tblExchange(persistenceId) NOT NULL,"
-        "  queueName varchar(255) NOT NULL,"
+        "  queueId bigint REFERENCES tblQueue(persistenceId) NOT NULL,"
         "  routingKey varchar(255),"
         "  fieldTableBlob varbinary(MAX))";
     const std::string messageMapSpecs =



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org