You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by gs...@apache.org on 2007/05/17 13:03:56 UTC
svn commit: r538872 - in /incubator/qpid/trunk/qpid/cpp/src: ./ qpid/broker/
Author: gsim
Date: Thu May 17 04:03:55 2007
New Revision: 538872
URL: http://svn.apache.org/viewvc?view=rev&rev=538872
Log:
Changes to support durable exchanges.
Added:
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerExchange.cpp (with props)
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/RecoverableExchange.h (with props)
Modified:
incubator/qpid/trunk/qpid/cpp/src/Makefile.am
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerAdapter.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerExchange.h
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerQueue.h
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DirectExchange.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DirectExchange.h
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ExchangeRegistry.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ExchangeRegistry.h
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/FanOutExchange.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/FanOutExchange.h
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/HeadersExchange.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/HeadersExchange.h
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageStore.h
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageStoreModule.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageStoreModule.h
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/NullMessageStore.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/NullMessageStore.h
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/PersistableExchange.h
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/RecoveryManager.h
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/RecoveryManagerImpl.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/RecoveryManagerImpl.h
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TopicExchange.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TopicExchange.h
Modified: incubator/qpid/trunk/qpid/cpp/src/Makefile.am
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/Makefile.am?view=diff&rev=538872&r1=538871&r2=538872
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/Makefile.am (original)
+++ incubator/qpid/trunk/qpid/cpp/src/Makefile.am Thu May 17 04:03:55 2007
@@ -124,6 +124,7 @@
qpid/broker/BrokerAdapter.cpp \
qpid/broker/BrokerSingleton.cpp \
qpid/broker/BrokerChannel.cpp \
+ qpid/broker/BrokerExchange.cpp \
qpid/broker/BrokerMessage.cpp \
qpid/broker/BrokerMessageMessage.cpp \
qpid/broker/BrokerQueue.cpp \
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerAdapter.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerAdapter.cpp?view=diff&rev=538872&r1=538871&r2=538872
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerAdapter.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerAdapter.cpp Thu May 17 04:03:55 2007
@@ -118,8 +118,8 @@
void BrokerAdapter::ExchangeHandlerImpl::declare(const MethodContext& context, uint16_t /*ticket*/, const string& exchange, const string& type,
- bool passive, bool /*durable*/, bool /*autoDelete*/, bool /*internal*/, bool nowait,
- const FieldTable& /*arguments*/){
+ bool passive, bool durable, bool /*autoDelete*/, bool /*internal*/, bool nowait,
+ const FieldTable& args){
if(passive){
if(!broker.getExchanges().get(exchange)) {
@@ -127,8 +127,10 @@
}
}else{
try{
- std::pair<Exchange::shared_ptr, bool> response = broker.getExchanges().declare(exchange, type);
- if(!response.second && response.first->getType() != type){
+ std::pair<Exchange::shared_ptr, bool> response = broker.getExchanges().declare(exchange, type, durable, args);
+ if (response.second) {
+ if (durable) broker.getStore().create(*response.first);
+ } else if (response.first->getType() != type) {
throw ConnectionException(
530,
"Exchange already declared to be of type "
@@ -145,10 +147,12 @@
}
void BrokerAdapter::ExchangeHandlerImpl::delete_(const MethodContext& context, uint16_t /*ticket*/,
- const string& exchange, bool /*ifUnused*/, bool nowait){
+ const string& name, bool /*ifUnused*/, bool nowait){
//TODO: implement unused
- broker.getExchanges().destroy(exchange);
+ Exchange::shared_ptr exchange(broker.getExchanges().get(name));
+ if (exchange->isDurable()) broker.getStore().destroy(*exchange);
+ broker.getExchanges().destroy(name);
if(!nowait) client.deleteOk(context.getRequestId());
}
@@ -174,6 +178,8 @@
//add default binding:
broker.getExchanges().getDefault()->bind(queue, name, 0);
+
+ //handle automatic cleanup:
if (exclusive) {
connection.exclusiveQueues.push_back(queue);
} else if(autoDelete){
@@ -202,7 +208,9 @@
Exchange::shared_ptr exchange = broker.getExchanges().get(exchangeName);
if(exchange){
string exchangeRoutingKey = routingKey.empty() && queueName.empty() ? queue->getName() : routingKey;
- exchange->bind(queue, exchangeRoutingKey, &arguments);
+ if (exchange->bind(queue, exchangeRoutingKey, &arguments) && exchange->isDurable() && queue->isDurable()) {
+ broker.getStore().bind(*exchange, *queue, routingKey, arguments);
+ }
if(!nowait) client.bindOk(context.getRequestId());
}else{
throw ChannelException(
@@ -225,7 +233,9 @@
Exchange::shared_ptr exchange = broker.getExchanges().get(exchangeName);
if (!exchange.get()) throw ChannelException(404, "Unbind failed. No such exchange: " + exchangeName);
- exchange->unbind(queue, routingKey, &arguments);
+ if (exchange->unbind(queue, routingKey, &arguments) && exchange->isDurable() && queue->isDurable()) {
+ broker.getStore().unbind(*exchange, *queue, routingKey, arguments);
+ }
client.unbindOk(context.getRequestId());
}
Added: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerExchange.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerExchange.cpp?view=auto&rev=538872
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerExchange.cpp (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerExchange.cpp Thu May 17 04:03:55 2007
@@ -0,0 +1,60 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "BrokerExchange.h"
+#include "ExchangeRegistry.h"
+
+using namespace qpid::broker;
+using qpid::framing::Buffer;
+using qpid::framing::FieldTable;
+
+Exchange::shared_ptr Exchange::decode(ExchangeRegistry& exchanges, Buffer& buffer)
+{
+ string name;
+ string type;
+ FieldTable args;
+
+ buffer.getShortString(name);
+ bool durable(buffer.getOctet());
+ buffer.getShortString(type);
+ buffer.getFieldTable(args);
+
+ return exchanges.declare(name, type, durable, args).first;
+}
+
+void Exchange::encode(Buffer& buffer) const
+{
+ buffer.putShortString(name);
+ buffer.putOctet(durable);
+ buffer.putShortString(getType());
+ buffer.putFieldTable(args);
+}
+
+uint32_t Exchange::encodedSize() const
+{
+ return name.size() + 1/*short string size*/
+ + 1 /*durable*/
+ + getType().size() + 1/*short string size*/
+ + args.size();
+}
+
+
+
Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerExchange.cpp
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerExchange.cpp
------------------------------------------------------------------------------
svn:keywords = Rev Date
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerExchange.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerExchange.h?view=diff&rev=538872&r1=538871&r2=538872
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerExchange.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerExchange.h Thu May 17 04:03:55 2007
@@ -25,24 +25,47 @@
#include <boost/shared_ptr.hpp>
#include "Deliverable.h"
#include "BrokerQueue.h"
+#include "MessageStore.h"
+#include "PersistableExchange.h"
#include "qpid/framing/FieldTable.h"
namespace qpid {
namespace broker {
using std::string;
+ class ExchangeRegistry;
- class Exchange{
+ class Exchange : public PersistableExchange{
+ private:
const string name;
+ const bool durable;
+ qpid::framing::FieldTable args;
+ mutable uint64_t persistenceId;
+
public:
typedef boost::shared_ptr<Exchange> shared_ptr;
- explicit Exchange(const string& _name) : name(_name){}
+ explicit Exchange(const string& _name) : name(_name), durable(false), persistenceId(0){}
+ Exchange(const string& _name, bool _durable, const qpid::framing::FieldTable& _args)
+ : name(_name), durable(_durable), args(_args), persistenceId(0){}
virtual ~Exchange(){}
- string getName() { return name; }
- virtual string getType() = 0;
- virtual void bind(Queue::shared_ptr queue, const string& routingKey, const qpid::framing::FieldTable* args) = 0;
- virtual void unbind(Queue::shared_ptr queue, const string& routingKey, const qpid::framing::FieldTable* args) = 0;
+
+ string getName() const { return name; }
+ bool isDurable() { return durable; }
+ qpid::framing::FieldTable& getArgs() { return args; }
+
+ virtual string getType() const = 0;
+ virtual bool bind(Queue::shared_ptr queue, const string& routingKey, const qpid::framing::FieldTable* args) = 0;
+ virtual bool unbind(Queue::shared_ptr queue, const string& routingKey, const qpid::framing::FieldTable* args) = 0;
virtual void route(Deliverable& msg, const string& routingKey, const qpid::framing::FieldTable* args) = 0;
+
+ //PersistableExchange:
+ void setPersistenceId(uint64_t id) const { persistenceId = id; }
+ uint64_t getPersistenceId() const { return persistenceId; }
+ uint32_t encodedSize() const;
+ void encode(framing::Buffer& buffer) const;
+
+ static Exchange::shared_ptr decode(ExchangeRegistry& exchanges, framing::Buffer& buffer);
+
};
}
}
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerQueue.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerQueue.h?view=diff&rev=538872&r1=538871&r2=538872
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerQueue.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerQueue.h Thu May 17 04:03:55 2007
@@ -122,6 +122,7 @@
inline const string& getName() const { return name; }
inline const bool isExclusiveOwner(const ConnectionToken* const o) const { return o == owner; }
inline bool hasExclusiveConsumer() const { return exclusive; }
+ inline bool isDurable() const { return store != 0; }
bool canAutoDelete() const;
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DirectExchange.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DirectExchange.cpp?view=diff&rev=538872&r1=538871&r2=538872
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DirectExchange.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DirectExchange.cpp Thu May 17 04:03:55 2007
@@ -25,29 +25,34 @@
using namespace qpid::framing;
using namespace qpid::sys;
-DirectExchange::DirectExchange(const string& _name) : Exchange(_name) {
+DirectExchange::DirectExchange(const string& _name) : Exchange(_name) {}
+DirectExchange::DirectExchange(const std::string& _name, bool _durable, const FieldTable& _args) : Exchange(_name, _durable, _args) {}
-}
-
-void DirectExchange::bind(Queue::shared_ptr queue, const string& routingKey, const FieldTable*){
+bool DirectExchange::bind(Queue::shared_ptr queue, const string& routingKey, const FieldTable*){
Mutex::ScopedLock l(lock);
std::vector<Queue::shared_ptr>& queues(bindings[routingKey]);
std::vector<Queue::shared_ptr>::iterator i = find(queues.begin(), queues.end(), queue);
- if(i == queues.end()){
+ if (i == queues.end()) {
bindings[routingKey].push_back(queue);
+ return true;
+ } else{
+ return false;
}
}
-void DirectExchange::unbind(Queue::shared_ptr queue, const string& routingKey, const FieldTable* /*args*/){
+bool DirectExchange::unbind(Queue::shared_ptr queue, const string& routingKey, const FieldTable* /*args*/){
Mutex::ScopedLock l(lock);
std::vector<Queue::shared_ptr>& queues(bindings[routingKey]);
std::vector<Queue::shared_ptr>::iterator i = find(queues.begin(), queues.end(), queue);
- if(i < queues.end()){
+ if (i < queues.end()) {
queues.erase(i);
if(queues.empty()){
bindings.erase(routingKey);
}
+ return true;
+ } else {
+ return false;
}
}
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DirectExchange.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DirectExchange.h?view=diff&rev=538872&r1=538871&r2=538872
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DirectExchange.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DirectExchange.h Thu May 17 04:03:55 2007
@@ -39,12 +39,14 @@
static const std::string typeName;
DirectExchange(const std::string& name);
+ DirectExchange(const string& _name, bool _durable,
+ const qpid::framing::FieldTable& _args);
- virtual std::string getType(){ return typeName; }
+ virtual std::string getType() const { return typeName; }
- virtual void bind(Queue::shared_ptr queue, const std::string& routingKey, const qpid::framing::FieldTable* args);
+ virtual bool bind(Queue::shared_ptr queue, const std::string& routingKey, const qpid::framing::FieldTable* args);
- virtual void unbind(Queue::shared_ptr queue, const std::string& routingKey, const qpid::framing::FieldTable* args);
+ virtual bool unbind(Queue::shared_ptr queue, const std::string& routingKey, const qpid::framing::FieldTable* args);
virtual void route(Deliverable& msg, const std::string& routingKey, const qpid::framing::FieldTable* args);
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ExchangeRegistry.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ExchangeRegistry.cpp?view=diff&rev=538872&r1=538871&r2=538872
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ExchangeRegistry.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ExchangeRegistry.cpp Thu May 17 04:03:55 2007
@@ -27,21 +27,30 @@
using namespace qpid::broker;
using namespace qpid::sys;
using std::pair;
+using qpid::framing::FieldTable;
-pair<Exchange::shared_ptr, bool> ExchangeRegistry::declare(const string& name, const string& type) throw(UnknownExchangeTypeException){
+pair<Exchange::shared_ptr, bool> ExchangeRegistry::declare(const string& name, const string& type)
+ throw(UnknownExchangeTypeException){
+
+ return declare(name, type, false, FieldTable());
+}
+
+pair<Exchange::shared_ptr, bool> ExchangeRegistry::declare(const string& name, const string& type,
+ bool durable, const FieldTable& args)
+ throw(UnknownExchangeTypeException){
Mutex::ScopedLock locker(lock);
ExchangeMap::iterator i = exchanges.find(name);
if (i == exchanges.end()) {
Exchange::shared_ptr exchange;
if(type == TopicExchange::typeName){
- exchange = Exchange::shared_ptr(new TopicExchange(name));
+ exchange = Exchange::shared_ptr(new TopicExchange(name, durable, args));
}else if(type == DirectExchange::typeName){
- exchange = Exchange::shared_ptr(new DirectExchange(name));
+ exchange = Exchange::shared_ptr(new DirectExchange(name, durable, args));
}else if(type == FanOutExchange::typeName){
- exchange = Exchange::shared_ptr(new FanOutExchange(name));
+ exchange = Exchange::shared_ptr(new FanOutExchange(name, durable, args));
}else if (type == HeadersExchange::typeName) {
- exchange = Exchange::shared_ptr(new HeadersExchange(name));
+ exchange = Exchange::shared_ptr(new HeadersExchange(name, durable, args));
}else{
throw UnknownExchangeTypeException();
}
@@ -54,7 +63,10 @@
void ExchangeRegistry::destroy(const string& name){
Mutex::ScopedLock locker(lock);
- exchanges.erase(name);
+ ExchangeMap::iterator i = exchanges.find(name);
+ if (i != exchanges.end()) {
+ exchanges.erase(i);
+ }
}
Exchange::shared_ptr ExchangeRegistry::get(const string& name){
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ExchangeRegistry.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ExchangeRegistry.h?view=diff&rev=538872&r1=538871&r2=538872
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ExchangeRegistry.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ExchangeRegistry.h Thu May 17 04:03:55 2007
@@ -24,6 +24,8 @@
#include <map>
#include "BrokerExchange.h"
+#include "MessageStore.h"
+#include "qpid/framing/FieldTable.h"
#include "qpid/sys/Monitor.h"
namespace qpid {
@@ -34,8 +36,12 @@
typedef std::map<std::string, Exchange::shared_ptr> ExchangeMap;
ExchangeMap exchanges;
qpid::sys::Mutex lock;
- public:
- std::pair<Exchange::shared_ptr, bool> declare(const std::string& name, const std::string& type) throw(UnknownExchangeTypeException);
+ public:
+ std::pair<Exchange::shared_ptr, bool> declare(const std::string& name, const std::string& type)
+ throw(UnknownExchangeTypeException);
+ std::pair<Exchange::shared_ptr, bool> declare(const std::string& name, const std::string& type,
+ bool durable, const qpid::framing::FieldTable& args)
+ throw(UnknownExchangeTypeException);
void destroy(const std::string& name);
Exchange::shared_ptr get(const std::string& name);
Exchange::shared_ptr getDefault();
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/FanOutExchange.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/FanOutExchange.cpp?view=diff&rev=538872&r1=538871&r2=538872
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/FanOutExchange.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/FanOutExchange.cpp Thu May 17 04:03:55 2007
@@ -26,21 +26,28 @@
using namespace qpid::sys;
FanOutExchange::FanOutExchange(const std::string& _name) : Exchange(_name) {}
+FanOutExchange::FanOutExchange(const std::string& _name, bool _durable, const FieldTable& _args) : Exchange(_name, _durable, _args) {}
-void FanOutExchange::bind(Queue::shared_ptr queue, const string& /*routingKey*/, const FieldTable* /*args*/){
+bool FanOutExchange::bind(Queue::shared_ptr queue, const string& /*routingKey*/, const FieldTable* /*args*/){
Mutex::ScopedLock locker(lock);
// Add if not already present.
Queue::vector::iterator i = std::find(bindings.begin(), bindings.end(), queue);
if (i == bindings.end()) {
bindings.push_back(queue);
+ return true;
+ } else {
+ return false;
}
}
-void FanOutExchange::unbind(Queue::shared_ptr queue, const string& /*routingKey*/, const FieldTable* /*args*/){
+bool FanOutExchange::unbind(Queue::shared_ptr queue, const string& /*routingKey*/, const FieldTable* /*args*/){
Mutex::ScopedLock locker(lock);
Queue::vector::iterator i = std::find(bindings.begin(), bindings.end(), queue);
if (i != bindings.end()) {
bindings.erase(i);
+ return true;
+ } else {
+ return false;
}
}
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/FanOutExchange.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/FanOutExchange.h?view=diff&rev=538872&r1=538871&r2=538872
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/FanOutExchange.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/FanOutExchange.h Thu May 17 04:03:55 2007
@@ -40,12 +40,14 @@
static const std::string typeName;
FanOutExchange(const std::string& name);
+ FanOutExchange(const string& _name, bool _durable,
+ const qpid::framing::FieldTable& _args);
- virtual std::string getType(){ return typeName; }
+ virtual std::string getType() const { return typeName; }
- virtual void bind(Queue::shared_ptr queue, const std::string& routingKey, const qpid::framing::FieldTable* args);
+ virtual bool bind(Queue::shared_ptr queue, const std::string& routingKey, const qpid::framing::FieldTable* args);
- virtual void unbind(Queue::shared_ptr queue, const std::string& routingKey, const qpid::framing::FieldTable* args);
+ virtual bool unbind(Queue::shared_ptr queue, const std::string& routingKey, const qpid::framing::FieldTable* args);
virtual void route(Deliverable& msg, const std::string& routingKey, const qpid::framing::FieldTable* args);
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/HeadersExchange.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/HeadersExchange.cpp?view=diff&rev=538872&r1=538871&r2=538872
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/HeadersExchange.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/HeadersExchange.cpp Thu May 17 04:03:55 2007
@@ -41,21 +41,35 @@
}
HeadersExchange::HeadersExchange(const string& _name) : Exchange(_name) { }
+HeadersExchange::HeadersExchange(const std::string& _name, bool _durable, const FieldTable& _args) : Exchange(_name, _durable, _args) {}
-void HeadersExchange::bind(Queue::shared_ptr queue, const string& /*routingKey*/, const FieldTable* args){
+bool HeadersExchange::bind(Queue::shared_ptr queue, const string& /*routingKey*/, const FieldTable* args){
Mutex::ScopedLock locker(lock);
std::string what = args->getString("x-match");
if (what != all && what != any) {
THROW_QPID_ERROR(PROTOCOL_ERROR, "Invalid x-match value binding to headers exchange.");
}
- bindings.push_back(Binding(*args, queue));
+ Binding binding(*args, queue);
+ Bindings::iterator i =
+ std::find(bindings.begin(),bindings.end(), binding);
+ if (i == bindings.end()) {
+ bindings.push_back(binding);
+ return true;
+ } else {
+ return false;
+ }
}
-void HeadersExchange::unbind(Queue::shared_ptr queue, const string& /*routingKey*/, const FieldTable* args){
+bool HeadersExchange::unbind(Queue::shared_ptr queue, const string& /*routingKey*/, const FieldTable* args){
Mutex::ScopedLock locker(lock);
Bindings::iterator i =
std::find(bindings.begin(),bindings.end(), Binding(*args, queue));
- if (i != bindings.end()) bindings.erase(i);
+ if (i != bindings.end()) {
+ bindings.erase(i);
+ return true;
+ } else {
+ return false;
+ }
}
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/HeadersExchange.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/HeadersExchange.h?view=diff&rev=538872&r1=538871&r2=538872
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/HeadersExchange.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/HeadersExchange.h Thu May 17 04:03:55 2007
@@ -43,12 +43,14 @@
static const std::string typeName;
HeadersExchange(const string& name);
+ HeadersExchange(const string& _name, bool _durable,
+ const qpid::framing::FieldTable& _args);
- virtual std::string getType(){ return typeName; }
+ virtual std::string getType() const { return typeName; }
- virtual void bind(Queue::shared_ptr queue, const string& routingKey, const qpid::framing::FieldTable* args);
+ virtual bool bind(Queue::shared_ptr queue, const string& routingKey, const qpid::framing::FieldTable* args);
- virtual void unbind(Queue::shared_ptr queue, const string& routingKey, const qpid::framing::FieldTable* args);
+ virtual bool unbind(Queue::shared_ptr queue, const string& routingKey, const qpid::framing::FieldTable* args);
virtual void route(Deliverable& msg, const string& routingKey, const qpid::framing::FieldTable* args);
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageStore.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageStore.h?view=diff&rev=538872&r1=538871&r2=538872
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageStore.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageStore.h Thu May 17 04:03:55 2007
@@ -56,9 +56,21 @@
virtual void destroy(const PersistableExchange& exchange) = 0;
/**
+ * Record a binding
+ */
+ virtual void bind(const PersistableExchange& exchange, const PersistableQueue& queue,
+ const std::string& key, const framing::FieldTable& args) = 0;
+
+ /**
+ * Forget a binding
+ */
+ virtual void unbind(const PersistableExchange& exchange, const PersistableQueue& queue,
+ const std::string& key, const framing::FieldTable& args) = 0;
+
+ /**
* Request recovery of queue and message state from store
*/
- virtual void recover(RecoveryManager& queues) = 0;
+ virtual void recover(RecoveryManager& recoverer) = 0;
/**
* Stores a messages before it has been enqueued
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageStoreModule.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageStoreModule.cpp?view=diff&rev=538872&r1=538871&r2=538872
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageStoreModule.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageStoreModule.cpp Thu May 17 04:03:55 2007
@@ -48,6 +48,18 @@
store->destroy(exchange);
}
+void MessageStoreModule::bind(const PersistableExchange& e, const PersistableQueue& q,
+ const std::string& k, const framing::FieldTable& a)
+{
+ store->bind(e, q, k, a);
+}
+
+void MessageStoreModule::unbind(const PersistableExchange& e, const PersistableQueue& q,
+ const std::string& k, const framing::FieldTable& a)
+{
+ store->unbind(e, q, k, a);
+}
+
void MessageStoreModule::recover(RecoveryManager& registry)
{
store->recover(registry);
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageStoreModule.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageStoreModule.h?view=diff&rev=538872&r1=538871&r2=538872
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageStoreModule.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageStoreModule.h Thu May 17 04:03:55 2007
@@ -50,6 +50,10 @@
void destroy(const PersistableQueue& queue);
void create(const PersistableExchange& exchange);
void destroy(const PersistableExchange& exchange);
+ void bind(const PersistableExchange& exchange, const PersistableQueue& queue,
+ const std::string& key, const framing::FieldTable& args);
+ void unbind(const PersistableExchange& exchange, const PersistableQueue& queue,
+ const std::string& key, const framing::FieldTable& args);
void recover(RecoveryManager& queues);
void stage(PersistableMessage& msg);
void destroy(PersistableMessage& msg);
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/NullMessageStore.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/NullMessageStore.cpp?view=diff&rev=538872&r1=538871&r2=538872
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/NullMessageStore.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/NullMessageStore.cpp Thu May 17 04:03:55 2007
@@ -39,13 +39,18 @@
if (warn) std::cout << "WARNING: Can't destroy durable queue '" << queue.getName() << "'. Persistence not enabled." << std::endl;
}
-void NullMessageStore::create(const PersistableExchange&)
+void NullMessageStore::create(const PersistableExchange& exchange)
{
+ if (warn) std::cout << "WARNING: Can't create durable exchange '" << exchange.getName() << "'. Persistence not enabled." << std::endl;
}
-void NullMessageStore::destroy(const PersistableExchange&)
+void NullMessageStore::destroy(const PersistableExchange& exchange)
{
+ if (warn) std::cout << "WARNING: Can't destroy durable exchange '" << exchange.getName() << "'. Persistence not enabled." << std::endl;
}
+void NullMessageStore::bind(const PersistableExchange&, const PersistableQueue&, const std::string&, const framing::FieldTable&){}
+
+void NullMessageStore::unbind(const PersistableExchange&, const PersistableQueue&, const std::string&, const framing::FieldTable&){}
void NullMessageStore::recover(RecoveryManager&)
{
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/NullMessageStore.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/NullMessageStore.h?view=diff&rev=538872&r1=538871&r2=538872
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/NullMessageStore.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/NullMessageStore.h Thu May 17 04:03:55 2007
@@ -48,6 +48,11 @@
virtual void destroy(const PersistableQueue& queue);
virtual void create(const PersistableExchange& exchange);
virtual void destroy(const PersistableExchange& exchange);
+
+ virtual void bind(const PersistableExchange& exchange, const PersistableQueue& queue,
+ const std::string& key, const framing::FieldTable& args);
+ virtual void unbind(const PersistableExchange& exchange, const PersistableQueue& queue,
+ const std::string& key, const framing::FieldTable& args);
virtual void recover(RecoveryManager& queues);
virtual void stage(PersistableMessage& msg);
virtual void destroy(PersistableMessage& msg);
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/PersistableExchange.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/PersistableExchange.h?view=diff&rev=538872&r1=538871&r2=538872
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/PersistableExchange.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/PersistableExchange.h Thu May 17 04:03:55 2007
@@ -35,6 +35,7 @@
class PersistableExchange : public Persistable
{
public:
+ virtual std::string getName() const = 0;
virtual ~PersistableExchange() {};
};
Added: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/RecoverableExchange.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/RecoverableExchange.h?view=auto&rev=538872
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/RecoverableExchange.h (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/RecoverableExchange.h Thu May 17 04:03:55 2007
@@ -0,0 +1,49 @@
+#ifndef _broker_RecoverableExchange_h
+#define _broker_RecoverableExchange_h
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include <boost/shared_ptr.hpp>
+
+namespace qpid {
+namespace broker {
+
+/**
+ * The interface through which bindings are recovered.
+ */
+class RecoverableExchange
+{
+public:
+ typedef boost::shared_ptr<RecoverableExchange> shared_ptr;
+
+ virtual void setPersistenceId(uint64_t id) = 0;
+ /**
+ * Recover binding. Nb: queue must have been recovered earlier.
+ */
+ virtual void bind(std::string& queue, std::string& routingKey, qpid::framing::FieldTable& args) = 0;
+ virtual ~RecoverableExchange() {};
+};
+
+}}
+
+
+#endif
Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/RecoverableExchange.h
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/RecoverableExchange.h
------------------------------------------------------------------------------
svn:keywords = Rev Date
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/RecoveryManager.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/RecoveryManager.h?view=diff&rev=538872&r1=538871&r2=538872
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/RecoveryManager.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/RecoveryManager.h Thu May 17 04:03:55 2007
@@ -21,6 +21,7 @@
#ifndef _RecoveryManager_
#define _RecoveryManager_
+#include "RecoverableExchange.h"
#include "RecoverableQueue.h"
#include "RecoverableMessage.h"
#include "qpid/framing/Buffer.h"
@@ -31,7 +32,7 @@
class RecoveryManager{
public:
virtual ~RecoveryManager(){}
- virtual void recoverExchange(framing::Buffer& buffer) = 0;
+ virtual RecoverableExchange::shared_ptr recoverExchange(framing::Buffer& buffer) = 0;
virtual RecoverableQueue::shared_ptr recoverQueue(framing::Buffer& buffer) = 0;
virtual RecoverableMessage::shared_ptr recoverMessage(framing::Buffer& buffer) = 0;
virtual void recoveryComplete() = 0;
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/RecoveryManagerImpl.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/RecoveryManagerImpl.cpp?view=diff&rev=538872&r1=538871&r2=538872
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/RecoveryManagerImpl.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/RecoveryManagerImpl.cpp Thu May 17 04:03:55 2007
@@ -61,9 +61,19 @@
void recover(RecoverableMessage::shared_ptr msg);
};
-void RecoveryManagerImpl::recoverExchange(framing::Buffer&)
+class RecoverableExchangeImpl : public RecoverableExchange
{
- //TODO
+ Exchange::shared_ptr exchange;
+ QueueRegistry& queues;
+public:
+ RecoverableExchangeImpl(Exchange::shared_ptr _exchange, QueueRegistry& _queues) : exchange(_exchange), queues(_queues) {}
+ void setPersistenceId(uint64_t id);
+ void bind(std::string& queue, std::string& routingKey, qpid::framing::FieldTable& args);
+};
+
+RecoverableExchange::shared_ptr RecoveryManagerImpl::recoverExchange(framing::Buffer& buffer)
+{
+ return RecoverableExchange::shared_ptr(new RecoverableExchangeImpl(Exchange::decode(exchanges, buffer), queues));
}
RecoverableQueue::shared_ptr RecoveryManagerImpl::recoverQueue(framing::Buffer& buffer)
@@ -140,4 +150,15 @@
void RecoverableQueueImpl::setPersistenceId(uint64_t id)
{
queue->setPersistenceId(id);
+}
+
+void RecoverableExchangeImpl::setPersistenceId(uint64_t id)
+{
+ exchange->setPersistenceId(id);
+}
+
+void RecoverableExchangeImpl::bind(string& queueName, string& key, framing::FieldTable& args)
+{
+ Queue::shared_ptr queue = queues.find(queueName);
+ exchange->bind(queue, key, &args);
}
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/RecoveryManagerImpl.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/RecoveryManagerImpl.h?view=diff&rev=538872&r1=538871&r2=538872
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/RecoveryManagerImpl.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/RecoveryManagerImpl.h Thu May 17 04:03:55 2007
@@ -37,7 +37,7 @@
RecoveryManagerImpl(QueueRegistry& queues, ExchangeRegistry& exchanges, uint64_t stagingThreshold);
~RecoveryManagerImpl();
- void recoverExchange(framing::Buffer& buffer);
+ RecoverableExchange::shared_ptr recoverExchange(framing::Buffer& buffer);
RecoverableQueue::shared_ptr recoverQueue(framing::Buffer& buffer);
RecoverableMessage::shared_ptr recoverMessage(framing::Buffer& buffer);
void recoveryComplete();
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TopicExchange.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TopicExchange.cpp?view=diff&rev=538872&r1=538871&r2=538872
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TopicExchange.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TopicExchange.cpp Thu May 17 04:03:55 2007
@@ -116,24 +116,39 @@
}
TopicExchange::TopicExchange(const string& _name) : Exchange(_name) { }
+TopicExchange::TopicExchange(const std::string& _name, bool _durable, const FieldTable& _args) : Exchange(_name, _durable, _args) {}
-void TopicExchange::bind(Queue::shared_ptr queue, const string& routingKey, const FieldTable* /*args*/){
+
+bool TopicExchange::bind(Queue::shared_ptr queue, const string& routingKey, const FieldTable* /*args*/){
Monitor::ScopedLock l(lock);
TopicPattern routingPattern(routingKey);
- bindings[routingPattern].push_back(queue);
+ if (isBound(queue, routingPattern)) {
+ return false;
+ } else {
+ bindings[routingPattern].push_back(queue);
+ return true;
+ }
}
-void TopicExchange::unbind(Queue::shared_ptr queue, const string& routingKey, const FieldTable* /*args*/){
+bool TopicExchange::unbind(Queue::shared_ptr queue, const string& routingKey, const FieldTable* /*args*/){
Monitor::ScopedLock l(lock);
BindingMap::iterator bi = bindings.find(TopicPattern(routingKey));
Queue::vector& qv(bi->second);
- if (bi == bindings.end()) return;
+ if (bi == bindings.end()) return false;
Queue::vector::iterator q = find(qv.begin(), qv.end(), queue);
- if(q == qv.end()) return;
+ if(q == qv.end()) return false;
qv.erase(q);
if(qv.empty()) bindings.erase(bi);
+ return true;
}
+bool TopicExchange::isBound(Queue::shared_ptr queue, TopicPattern& pattern)
+{
+ BindingMap::iterator bi = bindings.find(pattern);
+ if (bi == bindings.end()) return false;
+ Queue::vector& qv(bi->second);
+ return find(qv.begin(), qv.end(), queue) != qv.end();
+}
void TopicExchange::route(Deliverable& msg, const string& routingKey, const FieldTable* /*args*/){
Monitor::ScopedLock l(lock);
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TopicExchange.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TopicExchange.h?view=diff&rev=538872&r1=538871&r2=538872
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TopicExchange.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TopicExchange.h Thu May 17 04:03:55 2007
@@ -76,16 +76,19 @@
BindingMap bindings;
qpid::sys::Mutex lock;
+ bool isBound(Queue::shared_ptr queue, TopicPattern& pattern);
public:
static const std::string typeName;
TopicExchange(const string& name);
+ TopicExchange(const string& _name, bool _durable,
+ const qpid::framing::FieldTable& _args);
- virtual std::string getType(){ return typeName; }
-
- virtual void bind(Queue::shared_ptr queue, const string& routingKey, const qpid::framing::FieldTable* args);
+ virtual std::string getType() const { return typeName; }
- virtual void unbind(Queue::shared_ptr queue, const string& routingKey, const qpid::framing::FieldTable* args);
+ virtual bool bind(Queue::shared_ptr queue, const string& routingKey, const qpid::framing::FieldTable* args);
+
+ virtual bool unbind(Queue::shared_ptr queue, const string& routingKey, const qpid::framing::FieldTable* args);
virtual void route(Deliverable& msg, const string& routingKey, const qpid::framing::FieldTable* args);