You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by gs...@apache.org on 2007/05/17 13:03:56 UTC

svn commit: r538872 - in /incubator/qpid/trunk/qpid/cpp/src: ./ qpid/broker/

Author: gsim
Date: Thu May 17 04:03:55 2007
New Revision: 538872

URL: http://svn.apache.org/viewvc?view=rev&rev=538872
Log:
Changes to support durable exchanges.


Added:
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerExchange.cpp   (with props)
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/RecoverableExchange.h   (with props)
Modified:
    incubator/qpid/trunk/qpid/cpp/src/Makefile.am
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerAdapter.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerExchange.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerQueue.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DirectExchange.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DirectExchange.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ExchangeRegistry.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ExchangeRegistry.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/FanOutExchange.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/FanOutExchange.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/HeadersExchange.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/HeadersExchange.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageStore.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageStoreModule.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageStoreModule.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/NullMessageStore.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/NullMessageStore.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/PersistableExchange.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/RecoveryManager.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/RecoveryManagerImpl.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/RecoveryManagerImpl.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TopicExchange.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TopicExchange.h

Modified: incubator/qpid/trunk/qpid/cpp/src/Makefile.am
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/Makefile.am?view=diff&rev=538872&r1=538871&r2=538872
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/Makefile.am (original)
+++ incubator/qpid/trunk/qpid/cpp/src/Makefile.am Thu May 17 04:03:55 2007
@@ -124,6 +124,7 @@
   qpid/broker/BrokerAdapter.cpp \
   qpid/broker/BrokerSingleton.cpp \
   qpid/broker/BrokerChannel.cpp \
+  qpid/broker/BrokerExchange.cpp \
   qpid/broker/BrokerMessage.cpp \
   qpid/broker/BrokerMessageMessage.cpp \
   qpid/broker/BrokerQueue.cpp \

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerAdapter.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerAdapter.cpp?view=diff&rev=538872&r1=538871&r2=538872
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerAdapter.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerAdapter.cpp Thu May 17 04:03:55 2007
@@ -118,8 +118,8 @@
 
 
 void BrokerAdapter::ExchangeHandlerImpl::declare(const MethodContext& context, uint16_t /*ticket*/, const string& exchange, const string& type, 
-                                                 bool passive, bool /*durable*/, bool /*autoDelete*/, bool /*internal*/, bool nowait, 
-                                                 const FieldTable& /*arguments*/){
+                                                 bool passive, bool durable, bool /*autoDelete*/, bool /*internal*/, bool nowait, 
+                                                 const FieldTable& args){
 
     if(passive){
         if(!broker.getExchanges().get(exchange)) {
@@ -127,8 +127,10 @@
         }
     }else{        
         try{
-            std::pair<Exchange::shared_ptr, bool> response = broker.getExchanges().declare(exchange, type);
-            if(!response.second && response.first->getType() != type){
+            std::pair<Exchange::shared_ptr, bool> response = broker.getExchanges().declare(exchange, type, durable, args);
+            if (response.second) {
+                if (durable) broker.getStore().create(*response.first);
+            } else if (response.first->getType() != type) {
                 throw ConnectionException(
                     530,
                     "Exchange already declared to be of type "
@@ -145,10 +147,12 @@
 }
                 
 void BrokerAdapter::ExchangeHandlerImpl::delete_(const MethodContext& context, uint16_t /*ticket*/, 
-                                                 const string& exchange, bool /*ifUnused*/, bool nowait){
+                                                 const string& name, bool /*ifUnused*/, bool nowait){
 
     //TODO: implement unused
-    broker.getExchanges().destroy(exchange);
+    Exchange::shared_ptr exchange(broker.getExchanges().get(name));
+    if (exchange->isDurable()) broker.getStore().destroy(*exchange);
+    broker.getExchanges().destroy(name);
     if(!nowait) client.deleteOk(context.getRequestId());
 } 
 
@@ -174,6 +178,8 @@
 
 	    //add default binding:
 	    broker.getExchanges().getDefault()->bind(queue, name, 0);
+
+            //handle automatic cleanup:
 	    if (exclusive) {
 		connection.exclusiveQueues.push_back(queue);
 	    } else if(autoDelete){
@@ -202,7 +208,9 @@
     Exchange::shared_ptr exchange = broker.getExchanges().get(exchangeName);
     if(exchange){
         string exchangeRoutingKey = routingKey.empty() && queueName.empty() ? queue->getName() : routingKey;
-        exchange->bind(queue, exchangeRoutingKey, &arguments);
+        if (exchange->bind(queue, exchangeRoutingKey, &arguments) && exchange->isDurable() && queue->isDurable()) {
+            broker.getStore().bind(*exchange, *queue, routingKey, arguments);
+        }
         if(!nowait) client.bindOk(context.getRequestId());    
     }else{
         throw ChannelException(
@@ -225,7 +233,9 @@
     Exchange::shared_ptr exchange = broker.getExchanges().get(exchangeName);
     if (!exchange.get()) throw ChannelException(404, "Unbind failed. No such exchange: " + exchangeName);
 
-    exchange->unbind(queue, routingKey, &arguments);
+    if (exchange->unbind(queue, routingKey, &arguments) && exchange->isDurable() && queue->isDurable()) {
+        broker.getStore().unbind(*exchange, *queue, routingKey, arguments);
+    }
 
     client.unbindOk(context.getRequestId());    
 }

Added: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerExchange.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerExchange.cpp?view=auto&rev=538872
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerExchange.cpp (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerExchange.cpp Thu May 17 04:03:55 2007
@@ -0,0 +1,60 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "BrokerExchange.h"
+#include "ExchangeRegistry.h"
+
+using namespace qpid::broker;
+using qpid::framing::Buffer;
+using qpid::framing::FieldTable;
+
+Exchange::shared_ptr Exchange::decode(ExchangeRegistry& exchanges, Buffer& buffer)
+{
+    string name;
+    string type;
+    FieldTable args;
+    
+    buffer.getShortString(name);
+    bool durable(buffer.getOctet());
+    buffer.getShortString(type);
+    buffer.getFieldTable(args);
+
+    return exchanges.declare(name, type, durable, args).first;
+}
+
+void Exchange::encode(Buffer& buffer) const 
+{
+    buffer.putShortString(name);
+    buffer.putOctet(durable);
+    buffer.putShortString(getType());
+    buffer.putFieldTable(args);
+}
+
+uint32_t Exchange::encodedSize() const 
+{ 
+    return name.size() + 1/*short string size*/
+        + 1 /*durable*/
+        + getType().size() + 1/*short string size*/
+        + args.size(); 
+}
+
+
+

Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerExchange.cpp
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerExchange.cpp
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerExchange.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerExchange.h?view=diff&rev=538872&r1=538871&r2=538872
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerExchange.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerExchange.h Thu May 17 04:03:55 2007
@@ -25,24 +25,47 @@
 #include <boost/shared_ptr.hpp>
 #include "Deliverable.h"
 #include "BrokerQueue.h"
+#include "MessageStore.h"
+#include "PersistableExchange.h"
 #include "qpid/framing/FieldTable.h"
 
 namespace qpid {
     namespace broker {
         using std::string;
+        class ExchangeRegistry;
 
-        class Exchange{
+        class Exchange : public PersistableExchange{
+        private:
             const string name;
+            const bool durable;
+            qpid::framing::FieldTable args;
+            mutable uint64_t persistenceId;
+
         public:
             typedef boost::shared_ptr<Exchange> shared_ptr;
 
-            explicit Exchange(const string& _name) : name(_name){}
+            explicit Exchange(const string& _name) : name(_name), durable(false), persistenceId(0){}
+            Exchange(const string& _name, bool _durable, const qpid::framing::FieldTable& _args) 
+                : name(_name), durable(_durable), args(_args), persistenceId(0){}
             virtual ~Exchange(){}
-            string getName() { return name; }
-            virtual string getType() = 0;
-            virtual void bind(Queue::shared_ptr queue, const string& routingKey, const qpid::framing::FieldTable* args) = 0;
-            virtual void unbind(Queue::shared_ptr queue, const string& routingKey, const qpid::framing::FieldTable* args) = 0;
+
+            string getName() const { return name; }
+            bool isDurable() { return durable; }
+            qpid::framing::FieldTable& getArgs() { return args; }
+
+            virtual string getType() const = 0;
+            virtual bool bind(Queue::shared_ptr queue, const string& routingKey, const qpid::framing::FieldTable* args) = 0;
+            virtual bool unbind(Queue::shared_ptr queue, const string& routingKey, const qpid::framing::FieldTable* args) = 0;
             virtual void route(Deliverable& msg, const string& routingKey, const qpid::framing::FieldTable* args) = 0;
+
+            //PersistableExchange:
+            void setPersistenceId(uint64_t id) const { persistenceId = id; }
+            uint64_t getPersistenceId() const { return persistenceId; }
+            uint32_t encodedSize() const;
+            void encode(framing::Buffer& buffer) const; 
+
+            static Exchange::shared_ptr decode(ExchangeRegistry& exchanges, framing::Buffer& buffer);
+
         };
     }
 }

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerQueue.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerQueue.h?view=diff&rev=538872&r1=538871&r2=538872
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerQueue.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerQueue.h Thu May 17 04:03:55 2007
@@ -122,6 +122,7 @@
             inline const string& getName() const { return name; }
             inline const bool isExclusiveOwner(const ConnectionToken* const o) const { return o == owner; }
             inline bool hasExclusiveConsumer() const { return exclusive; }
+            inline bool isDurable() const { return store != 0; }
 
             bool canAutoDelete() const;
 

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DirectExchange.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DirectExchange.cpp?view=diff&rev=538872&r1=538871&r2=538872
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DirectExchange.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DirectExchange.cpp Thu May 17 04:03:55 2007
@@ -25,29 +25,34 @@
 using namespace qpid::framing;
 using namespace qpid::sys;
 
-DirectExchange::DirectExchange(const string& _name) : Exchange(_name) {
+DirectExchange::DirectExchange(const string& _name) : Exchange(_name) {}
+DirectExchange::DirectExchange(const std::string& _name, bool _durable, const FieldTable& _args) : Exchange(_name, _durable, _args) {}
 
-}
-
-void DirectExchange::bind(Queue::shared_ptr queue, const string& routingKey, const FieldTable*){
+bool DirectExchange::bind(Queue::shared_ptr queue, const string& routingKey, const FieldTable*){
     Mutex::ScopedLock l(lock);
     std::vector<Queue::shared_ptr>& queues(bindings[routingKey]);
     std::vector<Queue::shared_ptr>::iterator i = find(queues.begin(), queues.end(), queue);
-    if(i == queues.end()){
+    if (i == queues.end()) {
         bindings[routingKey].push_back(queue);
+        return true;
+    } else{
+        return false;
     }
 }
 
-void DirectExchange::unbind(Queue::shared_ptr queue, const string& routingKey, const FieldTable* /*args*/){
+bool DirectExchange::unbind(Queue::shared_ptr queue, const string& routingKey, const FieldTable* /*args*/){
     Mutex::ScopedLock l(lock);
     std::vector<Queue::shared_ptr>& queues(bindings[routingKey]);
 
     std::vector<Queue::shared_ptr>::iterator i = find(queues.begin(), queues.end(), queue);
-    if(i < queues.end()){
+    if (i < queues.end()) {
         queues.erase(i);
         if(queues.empty()){
             bindings.erase(routingKey);
         }
+        return true;
+    } else {
+        return false;
     }
 }
 

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DirectExchange.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DirectExchange.h?view=diff&rev=538872&r1=538871&r2=538872
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DirectExchange.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DirectExchange.h Thu May 17 04:03:55 2007
@@ -39,12 +39,14 @@
         static const std::string typeName;
         
         DirectExchange(const std::string& name);
+        DirectExchange(const string& _name, bool _durable, 
+                       const qpid::framing::FieldTable& _args);
 
-        virtual std::string getType(){ return typeName; }            
+        virtual std::string getType() const { return typeName; }            
         
-        virtual void bind(Queue::shared_ptr queue, const std::string& routingKey, const qpid::framing::FieldTable* args);
+        virtual bool bind(Queue::shared_ptr queue, const std::string& routingKey, const qpid::framing::FieldTable* args);
 
-        virtual void unbind(Queue::shared_ptr queue, const std::string& routingKey, const qpid::framing::FieldTable* args);
+        virtual bool unbind(Queue::shared_ptr queue, const std::string& routingKey, const qpid::framing::FieldTable* args);
 
         virtual void route(Deliverable& msg, const std::string& routingKey, const qpid::framing::FieldTable* args);
 

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ExchangeRegistry.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ExchangeRegistry.cpp?view=diff&rev=538872&r1=538871&r2=538872
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ExchangeRegistry.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ExchangeRegistry.cpp Thu May 17 04:03:55 2007
@@ -27,21 +27,30 @@
 using namespace qpid::broker;
 using namespace qpid::sys;
 using std::pair;
+using qpid::framing::FieldTable;
 
-pair<Exchange::shared_ptr, bool> ExchangeRegistry::declare(const string& name, const string& type) throw(UnknownExchangeTypeException){
+pair<Exchange::shared_ptr, bool> ExchangeRegistry::declare(const string& name, const string& type) 
+    throw(UnknownExchangeTypeException){
+
+    return declare(name, type, false, FieldTable());
+}
+
+pair<Exchange::shared_ptr, bool> ExchangeRegistry::declare(const string& name, const string& type, 
+                                                           bool durable, const FieldTable& args) 
+    throw(UnknownExchangeTypeException){
     Mutex::ScopedLock locker(lock);
     ExchangeMap::iterator i =  exchanges.find(name);
     if (i == exchanges.end()) {
 	Exchange::shared_ptr exchange;
 
         if(type == TopicExchange::typeName){
-            exchange = Exchange::shared_ptr(new TopicExchange(name));
+            exchange = Exchange::shared_ptr(new TopicExchange(name, durable, args));
         }else if(type == DirectExchange::typeName){
-            exchange = Exchange::shared_ptr(new DirectExchange(name));
+            exchange = Exchange::shared_ptr(new DirectExchange(name, durable, args));
         }else if(type == FanOutExchange::typeName){
-            exchange = Exchange::shared_ptr(new FanOutExchange(name));
+            exchange = Exchange::shared_ptr(new FanOutExchange(name, durable, args));
         }else if (type == HeadersExchange::typeName) {
-            exchange = Exchange::shared_ptr(new HeadersExchange(name));
+            exchange = Exchange::shared_ptr(new HeadersExchange(name, durable, args));
         }else{
             throw UnknownExchangeTypeException();    
         }
@@ -54,7 +63,10 @@
 
 void ExchangeRegistry::destroy(const string& name){
     Mutex::ScopedLock locker(lock);
-    exchanges.erase(name);
+    ExchangeMap::iterator i =  exchanges.find(name);
+    if (i != exchanges.end()) {
+        exchanges.erase(i);
+    }
 }
 
 Exchange::shared_ptr ExchangeRegistry::get(const string& name){

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ExchangeRegistry.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ExchangeRegistry.h?view=diff&rev=538872&r1=538871&r2=538872
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ExchangeRegistry.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ExchangeRegistry.h Thu May 17 04:03:55 2007
@@ -24,6 +24,8 @@
 
 #include <map>
 #include "BrokerExchange.h"
+#include "MessageStore.h"
+#include "qpid/framing/FieldTable.h"
 #include "qpid/sys/Monitor.h"
 
 namespace qpid {
@@ -34,8 +36,12 @@
         typedef std::map<std::string, Exchange::shared_ptr> ExchangeMap;
         ExchangeMap exchanges;
         qpid::sys::Mutex lock;
-    public:
-        std::pair<Exchange::shared_ptr, bool> declare(const std::string& name, const std::string& type) throw(UnknownExchangeTypeException);
+     public:
+        std::pair<Exchange::shared_ptr, bool> declare(const std::string& name, const std::string& type)
+            throw(UnknownExchangeTypeException);
+        std::pair<Exchange::shared_ptr, bool> declare(const std::string& name, const std::string& type, 
+                                                      bool durable, const qpid::framing::FieldTable& args)
+            throw(UnknownExchangeTypeException);
         void destroy(const std::string& name);
         Exchange::shared_ptr get(const std::string& name);
         Exchange::shared_ptr getDefault();

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/FanOutExchange.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/FanOutExchange.cpp?view=diff&rev=538872&r1=538871&r2=538872
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/FanOutExchange.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/FanOutExchange.cpp Thu May 17 04:03:55 2007
@@ -26,21 +26,28 @@
 using namespace qpid::sys;
 
 FanOutExchange::FanOutExchange(const std::string& _name) : Exchange(_name) {}
+FanOutExchange::FanOutExchange(const std::string& _name, bool _durable, const FieldTable& _args) : Exchange(_name, _durable, _args) {}
 
-void FanOutExchange::bind(Queue::shared_ptr queue, const string& /*routingKey*/, const FieldTable* /*args*/){
+bool FanOutExchange::bind(Queue::shared_ptr queue, const string& /*routingKey*/, const FieldTable* /*args*/){
     Mutex::ScopedLock locker(lock);
     // Add if not already present.
     Queue::vector::iterator i = std::find(bindings.begin(), bindings.end(), queue);
     if (i == bindings.end()) {
         bindings.push_back(queue);
+        return true;
+    } else {
+        return false;
     }
 }
 
-void FanOutExchange::unbind(Queue::shared_ptr queue, const string& /*routingKey*/, const FieldTable* /*args*/){
+bool FanOutExchange::unbind(Queue::shared_ptr queue, const string& /*routingKey*/, const FieldTable* /*args*/){
     Mutex::ScopedLock locker(lock);
     Queue::vector::iterator i = std::find(bindings.begin(), bindings.end(), queue);
     if (i != bindings.end()) {
         bindings.erase(i);
+        return true;
+    } else {
+        return false;
     }
 }
 

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/FanOutExchange.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/FanOutExchange.h?view=diff&rev=538872&r1=538871&r2=538872
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/FanOutExchange.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/FanOutExchange.h Thu May 17 04:03:55 2007
@@ -40,12 +40,14 @@
     static const std::string typeName;
         
     FanOutExchange(const std::string& name);
+    FanOutExchange(const string& _name, bool _durable, 
+                   const qpid::framing::FieldTable& _args);
 
-    virtual std::string getType(){ return typeName; }            
+    virtual std::string getType() const { return typeName; }            
         
-    virtual void bind(Queue::shared_ptr queue, const std::string& routingKey, const qpid::framing::FieldTable* args);
+    virtual bool bind(Queue::shared_ptr queue, const std::string& routingKey, const qpid::framing::FieldTable* args);
 
-    virtual void unbind(Queue::shared_ptr queue, const std::string& routingKey, const qpid::framing::FieldTable* args);
+    virtual bool unbind(Queue::shared_ptr queue, const std::string& routingKey, const qpid::framing::FieldTable* args);
 
     virtual void route(Deliverable& msg, const std::string& routingKey, const qpid::framing::FieldTable* args);
 

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/HeadersExchange.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/HeadersExchange.cpp?view=diff&rev=538872&r1=538871&r2=538872
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/HeadersExchange.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/HeadersExchange.cpp Thu May 17 04:03:55 2007
@@ -41,21 +41,35 @@
 }
 
 HeadersExchange::HeadersExchange(const string& _name) : Exchange(_name) { }
+HeadersExchange::HeadersExchange(const std::string& _name, bool _durable, const FieldTable& _args) : Exchange(_name, _durable, _args) {}
 
-void HeadersExchange::bind(Queue::shared_ptr queue, const string& /*routingKey*/, const FieldTable* args){
+bool HeadersExchange::bind(Queue::shared_ptr queue, const string& /*routingKey*/, const FieldTable* args){
     Mutex::ScopedLock locker(lock);
     std::string what = args->getString("x-match");
     if (what != all && what != any) {
         THROW_QPID_ERROR(PROTOCOL_ERROR, "Invalid x-match value binding to headers exchange.");
     }
-    bindings.push_back(Binding(*args, queue));
+    Binding binding(*args, queue);
+    Bindings::iterator i =
+        std::find(bindings.begin(),bindings.end(), binding);
+    if (i == bindings.end()) {
+        bindings.push_back(binding);
+        return true;
+    } else {
+        return false;
+    }
 }
 
-void HeadersExchange::unbind(Queue::shared_ptr queue, const string& /*routingKey*/, const FieldTable* args){
+bool HeadersExchange::unbind(Queue::shared_ptr queue, const string& /*routingKey*/, const FieldTable* args){
     Mutex::ScopedLock locker(lock);
     Bindings::iterator i =
         std::find(bindings.begin(),bindings.end(), Binding(*args, queue));
-    if (i != bindings.end()) bindings.erase(i);
+    if (i != bindings.end()) {
+        bindings.erase(i);
+        return true;
+    } else {
+        return false;
+    }
 }
 
 

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/HeadersExchange.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/HeadersExchange.h?view=diff&rev=538872&r1=538871&r2=538872
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/HeadersExchange.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/HeadersExchange.h Thu May 17 04:03:55 2007
@@ -43,12 +43,14 @@
     static const std::string typeName;
 
     HeadersExchange(const string& name);
+    HeadersExchange(const string& _name, bool _durable, 
+                    const qpid::framing::FieldTable& _args);
     
-    virtual std::string getType(){ return typeName; }            
+    virtual std::string getType() const { return typeName; }            
         
-    virtual void bind(Queue::shared_ptr queue, const string& routingKey, const qpid::framing::FieldTable* args);
+    virtual bool bind(Queue::shared_ptr queue, const string& routingKey, const qpid::framing::FieldTable* args);
 
-    virtual void unbind(Queue::shared_ptr queue, const string& routingKey, const qpid::framing::FieldTable* args);
+    virtual bool unbind(Queue::shared_ptr queue, const string& routingKey, const qpid::framing::FieldTable* args);
 
     virtual void route(Deliverable& msg, const string& routingKey, const qpid::framing::FieldTable* args);
 

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageStore.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageStore.h?view=diff&rev=538872&r1=538871&r2=538872
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageStore.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageStore.h Thu May 17 04:03:55 2007
@@ -56,9 +56,21 @@
     virtual void destroy(const PersistableExchange& exchange) = 0;
     
     /**
+     * Record a binding
+     */
+    virtual void bind(const PersistableExchange& exchange, const PersistableQueue& queue, 
+                      const std::string& key, const framing::FieldTable& args) = 0;
+
+    /**
+     * Forget a binding
+     */
+    virtual void unbind(const PersistableExchange& exchange, const PersistableQueue& queue, 
+                        const std::string& key, const framing::FieldTable& args) = 0;
+
+    /**
      * Request recovery of queue and message state from store
      */
-    virtual void recover(RecoveryManager& queues) = 0;
+    virtual void recover(RecoveryManager& recoverer) = 0;
     
     /**
      * Stores a messages before it has been enqueued

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageStoreModule.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageStoreModule.cpp?view=diff&rev=538872&r1=538871&r2=538872
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageStoreModule.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageStoreModule.cpp Thu May 17 04:03:55 2007
@@ -48,6 +48,18 @@
     store->destroy(exchange);
 }
 
+void MessageStoreModule::bind(const PersistableExchange& e, const PersistableQueue& q, 
+                              const std::string& k, const framing::FieldTable& a)
+{
+    store->bind(e, q, k, a);
+}
+
+void MessageStoreModule::unbind(const PersistableExchange& e, const PersistableQueue& q, 
+                                const std::string& k, const framing::FieldTable& a)
+{
+    store->unbind(e, q, k, a);
+}
+
 void MessageStoreModule::recover(RecoveryManager& registry)
 {
     store->recover(registry);

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageStoreModule.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageStoreModule.h?view=diff&rev=538872&r1=538871&r2=538872
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageStoreModule.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageStoreModule.h Thu May 17 04:03:55 2007
@@ -50,6 +50,10 @@
     void destroy(const PersistableQueue& queue);
     void create(const PersistableExchange& exchange);
     void destroy(const PersistableExchange& exchange);
+    void bind(const PersistableExchange& exchange, const PersistableQueue& queue, 
+              const std::string& key, const framing::FieldTable& args);
+    void unbind(const PersistableExchange& exchange, const PersistableQueue& queue, 
+                const std::string& key, const framing::FieldTable& args);
     void recover(RecoveryManager& queues);
     void stage(PersistableMessage& msg);
     void destroy(PersistableMessage& msg);

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/NullMessageStore.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/NullMessageStore.cpp?view=diff&rev=538872&r1=538871&r2=538872
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/NullMessageStore.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/NullMessageStore.cpp Thu May 17 04:03:55 2007
@@ -39,13 +39,18 @@
     if (warn) std::cout << "WARNING: Can't destroy durable queue '" << queue.getName() << "'. Persistence not enabled." << std::endl;
 }
 
-void NullMessageStore::create(const PersistableExchange&)
+void NullMessageStore::create(const PersistableExchange& exchange)
 {
+    if (warn) std::cout << "WARNING: Can't create durable exchange '" << exchange.getName() << "'. Persistence not enabled." << std::endl;
 }
 
-void NullMessageStore::destroy(const PersistableExchange&)
+void NullMessageStore::destroy(const PersistableExchange& exchange)
 {
+    if (warn) std::cout << "WARNING: Can't destroy durable exchange '" << exchange.getName() << "'. Persistence not enabled." << std::endl;
 }
+void NullMessageStore::bind(const PersistableExchange&, const PersistableQueue&, const std::string&, const framing::FieldTable&){}
+
+void NullMessageStore::unbind(const PersistableExchange&, const PersistableQueue&, const std::string&, const framing::FieldTable&){}
 
 void NullMessageStore::recover(RecoveryManager&)
 {

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/NullMessageStore.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/NullMessageStore.h?view=diff&rev=538872&r1=538871&r2=538872
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/NullMessageStore.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/NullMessageStore.h Thu May 17 04:03:55 2007
@@ -48,6 +48,11 @@
     virtual void destroy(const PersistableQueue& queue);
     virtual void create(const PersistableExchange& exchange);
     virtual void destroy(const PersistableExchange& exchange);
+
+    virtual void bind(const PersistableExchange& exchange, const PersistableQueue& queue, 
+                      const std::string& key, const framing::FieldTable& args);
+    virtual void unbind(const PersistableExchange& exchange, const PersistableQueue& queue, 
+                        const std::string& key, const framing::FieldTable& args);
     virtual void recover(RecoveryManager& queues);
     virtual void stage(PersistableMessage& msg);
     virtual void destroy(PersistableMessage& msg);

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/PersistableExchange.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/PersistableExchange.h?view=diff&rev=538872&r1=538871&r2=538872
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/PersistableExchange.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/PersistableExchange.h Thu May 17 04:03:55 2007
@@ -35,6 +35,7 @@
 class PersistableExchange : public Persistable
 {
 public:
+    virtual  std::string getName() const = 0;
     virtual ~PersistableExchange() {};
 };
 

Added: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/RecoverableExchange.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/RecoverableExchange.h?view=auto&rev=538872
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/RecoverableExchange.h (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/RecoverableExchange.h Thu May 17 04:03:55 2007
@@ -0,0 +1,49 @@
+#ifndef _broker_RecoverableExchange_h
+#define _broker_RecoverableExchange_h
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include <boost/shared_ptr.hpp>
+
+namespace qpid {
+namespace broker {
+
+/**
+ * The interface through which bindings are recovered.
+ */
+class RecoverableExchange
+{
+public:
+    typedef boost::shared_ptr<RecoverableExchange> shared_ptr;
+
+    virtual void setPersistenceId(uint64_t id) = 0;
+    /**
+     * Recover binding. Nb: queue must have been recovered earlier.
+     */
+    virtual void bind(std::string& queue, std::string& routingKey, qpid::framing::FieldTable& args) = 0;
+    virtual ~RecoverableExchange() {};
+};
+
+}}
+
+
+#endif

Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/RecoverableExchange.h
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/RecoverableExchange.h
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/RecoveryManager.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/RecoveryManager.h?view=diff&rev=538872&r1=538871&r2=538872
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/RecoveryManager.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/RecoveryManager.h Thu May 17 04:03:55 2007
@@ -21,6 +21,7 @@
 #ifndef _RecoveryManager_
 #define _RecoveryManager_
 
+#include "RecoverableExchange.h"
 #include "RecoverableQueue.h"
 #include "RecoverableMessage.h"
 #include "qpid/framing/Buffer.h"
@@ -31,7 +32,7 @@
     class RecoveryManager{
     public:
         virtual ~RecoveryManager(){}
-        virtual void recoverExchange(framing::Buffer& buffer) = 0;
+        virtual RecoverableExchange::shared_ptr recoverExchange(framing::Buffer& buffer) = 0;
         virtual RecoverableQueue::shared_ptr recoverQueue(framing::Buffer& buffer) = 0;
         virtual RecoverableMessage::shared_ptr recoverMessage(framing::Buffer& buffer) = 0;
         virtual void recoveryComplete() = 0;

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/RecoveryManagerImpl.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/RecoveryManagerImpl.cpp?view=diff&rev=538872&r1=538871&r2=538872
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/RecoveryManagerImpl.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/RecoveryManagerImpl.cpp Thu May 17 04:03:55 2007
@@ -61,9 +61,19 @@
     void recover(RecoverableMessage::shared_ptr msg);
 };
 
-void RecoveryManagerImpl::recoverExchange(framing::Buffer&)
+class RecoverableExchangeImpl : public RecoverableExchange
 {
-    //TODO
+    Exchange::shared_ptr exchange;
+    QueueRegistry& queues;
+public:
+    RecoverableExchangeImpl(Exchange::shared_ptr _exchange, QueueRegistry& _queues) : exchange(_exchange), queues(_queues) {}
+    void setPersistenceId(uint64_t id);
+    void bind(std::string& queue, std::string& routingKey, qpid::framing::FieldTable& args);
+};
+
+RecoverableExchange::shared_ptr RecoveryManagerImpl::recoverExchange(framing::Buffer& buffer)
+{
+    return RecoverableExchange::shared_ptr(new RecoverableExchangeImpl(Exchange::decode(exchanges, buffer), queues));
 }
 
 RecoverableQueue::shared_ptr RecoveryManagerImpl::recoverQueue(framing::Buffer& buffer)
@@ -140,4 +150,15 @@
 void RecoverableQueueImpl::setPersistenceId(uint64_t id)
 {
     queue->setPersistenceId(id);
+}
+
+void RecoverableExchangeImpl::setPersistenceId(uint64_t id)
+{
+    exchange->setPersistenceId(id);
+}
+
+void RecoverableExchangeImpl::bind(string& queueName, string& key, framing::FieldTable& args)
+{
+    Queue::shared_ptr queue = queues.find(queueName);
+    exchange->bind(queue, key, &args);
 }

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/RecoveryManagerImpl.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/RecoveryManagerImpl.h?view=diff&rev=538872&r1=538871&r2=538872
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/RecoveryManagerImpl.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/RecoveryManagerImpl.h Thu May 17 04:03:55 2007
@@ -37,7 +37,7 @@
         RecoveryManagerImpl(QueueRegistry& queues, ExchangeRegistry& exchanges, uint64_t stagingThreshold);
         ~RecoveryManagerImpl();
 
-        void recoverExchange(framing::Buffer& buffer);
+        RecoverableExchange::shared_ptr recoverExchange(framing::Buffer& buffer);
         RecoverableQueue::shared_ptr recoverQueue(framing::Buffer& buffer);
         RecoverableMessage::shared_ptr recoverMessage(framing::Buffer& buffer);
         void recoveryComplete();

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TopicExchange.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TopicExchange.cpp?view=diff&rev=538872&r1=538871&r2=538872
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TopicExchange.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TopicExchange.cpp Thu May 17 04:03:55 2007
@@ -116,24 +116,39 @@
 }
 
 TopicExchange::TopicExchange(const string& _name) : Exchange(_name) { }
+TopicExchange::TopicExchange(const std::string& _name, bool _durable, const FieldTable& _args) : Exchange(_name, _durable, _args) {}
 
-void TopicExchange::bind(Queue::shared_ptr queue, const string& routingKey, const FieldTable* /*args*/){
+
+bool TopicExchange::bind(Queue::shared_ptr queue, const string& routingKey, const FieldTable* /*args*/){
     Monitor::ScopedLock l(lock);
     TopicPattern routingPattern(routingKey);
-    bindings[routingPattern].push_back(queue);
+    if (isBound(queue, routingPattern)) {
+        return false;
+    } else {
+        bindings[routingPattern].push_back(queue);
+        return true;
+    }
 }
 
-void TopicExchange::unbind(Queue::shared_ptr queue, const string& routingKey, const FieldTable* /*args*/){
+bool TopicExchange::unbind(Queue::shared_ptr queue, const string& routingKey, const FieldTable* /*args*/){
     Monitor::ScopedLock l(lock);
     BindingMap::iterator bi = bindings.find(TopicPattern(routingKey));
     Queue::vector& qv(bi->second);
-    if (bi == bindings.end()) return;
+    if (bi == bindings.end()) return false;
     Queue::vector::iterator q = find(qv.begin(), qv.end(), queue);
-    if(q == qv.end()) return;
+    if(q == qv.end()) return false;
     qv.erase(q);
     if(qv.empty()) bindings.erase(bi);
+    return true;
 }
 
+bool TopicExchange::isBound(Queue::shared_ptr queue, TopicPattern& pattern)
+{
+    BindingMap::iterator bi = bindings.find(pattern);
+    if (bi == bindings.end()) return false;
+    Queue::vector& qv(bi->second);
+    return find(qv.begin(), qv.end(), queue) != qv.end();
+}
 
 void TopicExchange::route(Deliverable& msg, const string& routingKey, const FieldTable* /*args*/){
     Monitor::ScopedLock l(lock);

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TopicExchange.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TopicExchange.h?view=diff&rev=538872&r1=538871&r2=538872
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TopicExchange.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TopicExchange.h Thu May 17 04:03:55 2007
@@ -76,16 +76,19 @@
     BindingMap bindings;
     qpid::sys::Mutex lock;
 
+    bool isBound(Queue::shared_ptr queue, TopicPattern& pattern);
   public:
     static const std::string typeName;
 
     TopicExchange(const string& name);
+    TopicExchange(const string& _name, bool _durable, 
+                  const qpid::framing::FieldTable& _args);
 
-    virtual std::string getType(){ return typeName; }            
-        
-    virtual void bind(Queue::shared_ptr queue, const string& routingKey, const qpid::framing::FieldTable* args);
+    virtual std::string getType() const { return typeName; }            
 
-    virtual void unbind(Queue::shared_ptr queue, const string& routingKey, const qpid::framing::FieldTable* args);
+    virtual bool bind(Queue::shared_ptr queue, const string& routingKey, const qpid::framing::FieldTable* args);
+
+    virtual bool unbind(Queue::shared_ptr queue, const string& routingKey, const qpid::framing::FieldTable* args);
 
     virtual void route(Deliverable& msg, const string& routingKey, const qpid::framing::FieldTable* args);