You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by gs...@apache.org on 2013/06/26 20:32:48 UTC
svn commit: r1497036 - in /qpid/trunk/qpid: cpp/src/ cpp/src/qpid/amqp_0_10/
cpp/src/qpid/broker/ cpp/src/qpid/broker/amqp/ specs/
Author: gsim
Date: Wed Jun 26 18:32:47 2013
New Revision: 1497036
URL: http://svn.apache.org/r1497036
Log:
QPID-4919: Allow definition of topics in AMQP 1.0, composed of an exchange and subscription queue configuration
Added:
qpid/trunk/qpid/cpp/src/qpid/amqp_0_10/CodecsInternal.h
qpid/trunk/qpid/cpp/src/qpid/broker/PersistableObject.cpp
qpid/trunk/qpid/cpp/src/qpid/broker/PersistableObject.h
qpid/trunk/qpid/cpp/src/qpid/broker/amqp/BrokerContext.cpp
qpid/trunk/qpid/cpp/src/qpid/broker/amqp/BrokerContext.h
qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Topic.cpp
qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Topic.h
Modified:
qpid/trunk/qpid/cpp/src/CMakeLists.txt
qpid/trunk/qpid/cpp/src/Makefile.am
qpid/trunk/qpid/cpp/src/amqp.cmake
qpid/trunk/qpid/cpp/src/qpid/amqp_0_10/Codecs.cpp
qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp
qpid/trunk/qpid/cpp/src/qpid/broker/ObjectFactory.cpp
qpid/trunk/qpid/cpp/src/qpid/broker/ObjectFactory.h
qpid/trunk/qpid/cpp/src/qpid/broker/RecoveryManagerImpl.cpp
qpid/trunk/qpid/cpp/src/qpid/broker/RecoveryManagerImpl.h
qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Connection.cpp
qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Connection.h
qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Domain.cpp
qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Domain.h
qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Interconnect.cpp
qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Interconnect.h
qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Interconnects.cpp
qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Interconnects.h
qpid/trunk/qpid/cpp/src/qpid/broker/amqp/ProtocolPlugin.cpp
qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Sasl.cpp
qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Sasl.h
qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Session.cpp
qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Session.h
qpid/trunk/qpid/specs/management-schema.xml
Modified: qpid/trunk/qpid/cpp/src/CMakeLists.txt
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/CMakeLists.txt?rev=1497036&r1=1497035&r2=1497036&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/CMakeLists.txt (original)
+++ qpid/trunk/qpid/cpp/src/CMakeLists.txt Wed Jun 26 18:32:47 2013
@@ -1262,6 +1262,7 @@ set (qpidbroker_SOURCES
qpid/broker/FifoDistributor.cpp
qpid/broker/MessageGroupManager.cpp
qpid/broker/PersistableMessage.cpp
+ qpid/broker/PersistableObject.cpp
qpid/broker/Bridge.cpp
qpid/broker/amqp_0_10/Connection.cpp
qpid/broker/ConnectionHandler.cpp
Modified: qpid/trunk/qpid/cpp/src/Makefile.am
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/Makefile.am?rev=1497036&r1=1497035&r2=1497036&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/Makefile.am (original)
+++ qpid/trunk/qpid/cpp/src/Makefile.am Wed Jun 26 18:32:47 2013
@@ -517,6 +517,7 @@ libqpidcommon_la_SOURCES += \
qpid/sys/Waitable.h \
qpid/sys/uuid.h \
qpid/sys/unordered_map.h \
+ qpid/amqp_0_10/CodecsInternal.h \
qpid/amqp_0_10/Codecs.cpp \
qpid/amqp/CharSequence.h \
qpid/amqp/CharSequence.cpp \
@@ -691,6 +692,8 @@ libqpidbroker_la_SOURCES = \
qpid/broker/PersistableExchange.h \
qpid/broker/PersistableMessage.cpp \
qpid/broker/PersistableMessage.h \
+ qpid/broker/PersistableObject.h \
+ qpid/broker/PersistableObject.cpp \
qpid/broker/PersistableQueue.h \
qpid/broker/Queue.cpp \
qpid/broker/Queue.h \
@@ -805,6 +808,8 @@ amqp_la_LIBADD = libqpidcommon.la
amqp_la_SOURCES = \
qpid/broker/amqp/Authorise.h \
qpid/broker/amqp/Authorise.cpp \
+ qpid/broker/amqp/BrokerContext.h \
+ qpid/broker/amqp/BrokerContext.cpp \
qpid/broker/amqp/Connection.h \
qpid/broker/amqp/Connection.cpp \
qpid/broker/amqp/DataReader.h \
@@ -846,6 +851,8 @@ amqp_la_SOURCES = \
qpid/broker/amqp/SaslClient.cpp \
qpid/broker/amqp/Session.h \
qpid/broker/amqp/Session.cpp \
+ qpid/broker/amqp/Topic.h \
+ qpid/broker/amqp/Topic.cpp \
qpid/broker/amqp/Translation.h \
qpid/broker/amqp/Translation.cpp
Modified: qpid/trunk/qpid/cpp/src/amqp.cmake
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/amqp.cmake?rev=1497036&r1=1497035&r2=1497036&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/amqp.cmake (original)
+++ qpid/trunk/qpid/cpp/src/amqp.cmake Wed Jun 26 18:32:47 2013
@@ -55,6 +55,8 @@ if (BUILD_AMQP)
set (amqp_SOURCES
qpid/broker/amqp/Authorise.h
qpid/broker/amqp/Authorise.cpp
+ qpid/broker/amqp/BrokerContext.h
+ qpid/broker/amqp/BrokerContext.cpp
qpid/broker/amqp/Connection.h
qpid/broker/amqp/Connection.cpp
qpid/broker/amqp/DataReader.h
@@ -96,6 +98,8 @@ if (BUILD_AMQP)
qpid/broker/amqp/SaslClient.cpp
qpid/broker/amqp/Session.h
qpid/broker/amqp/Session.cpp
+ qpid/broker/amqp/Topic.h
+ qpid/broker/amqp/Topic.cpp
qpid/broker/amqp/Translation.h
qpid/broker/amqp/Translation.cpp
)
Modified: qpid/trunk/qpid/cpp/src/qpid/amqp_0_10/Codecs.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/amqp_0_10/Codecs.cpp?rev=1497036&r1=1497035&r2=1497036&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/amqp_0_10/Codecs.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/amqp_0_10/Codecs.cpp Wed Jun 26 18:32:47 2013
@@ -19,6 +19,7 @@
*
*/
#include "qpid/amqp_0_10/Codecs.h"
+#include "qpid/amqp_0_10/CodecsInternal.h"
#include "qpid/framing/Array.h"
#include "qpid/framing/Buffer.h"
#include "qpid/framing/FieldTable.h"
@@ -188,10 +189,6 @@ template <class T, class U, class F> voi
convert(t, value, f);
}
-uint32_t encodedSize(const Variant::Map& values);
-uint32_t encodedSize(const Variant::List& values);
-uint32_t encodedSize(const std::string& value);
-
uint32_t encodedSize(const Variant& value)
{
switch (value.getType()) {
@@ -290,9 +287,6 @@ void encode(const std::string& value, co
}
}
-void encode(const Variant::Map& map, uint32_t len, qpid::framing::Buffer& buffer);
-void encode(const Variant::List& list, uint32_t len, qpid::framing::Buffer& buffer);
-
void encode(const Variant& value, qpid::framing::Buffer& buffer)
{
switch (value.getType()) {
Added: qpid/trunk/qpid/cpp/src/qpid/amqp_0_10/CodecsInternal.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/amqp_0_10/CodecsInternal.h?rev=1497036&view=auto
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/amqp_0_10/CodecsInternal.h (added)
+++ qpid/trunk/qpid/cpp/src/qpid/amqp_0_10/CodecsInternal.h Wed Jun 26 18:32:47 2013
@@ -0,0 +1,41 @@
+#ifndef QPID_AMQP_0_10_CODECSINTERNAL_H
+#define QPID_AMQP_0_10_CODECSINTERNAL_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "qpid/types/Variant.h"
+
+namespace qpid {
+namespace framing {
+class Buffer;
+}
+namespace amqp_0_10 {
+void encode(const qpid::types::Variant::Map& map, uint32_t len, qpid::framing::Buffer& buffer);
+void encode(const qpid::types::Variant::List& list, uint32_t len, qpid::framing::Buffer& buffer);
+void encode(const qpid::types::Variant& value, qpid::framing::Buffer& buffer);
+void encode(const std::string& value, const std::string& encoding, qpid::framing::Buffer& buffer);
+uint32_t encodedSize(const qpid::types::Variant::Map& values);
+uint32_t encodedSize(const qpid::types::Variant::List& values);
+uint32_t encodedSize(const std::string& value);
+
+}} // namespace qpid::amqp_0_10
+
+#endif /*!QPID_AMQP_0_10_CODECSINTERNAL_H*/
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp?rev=1497036&r1=1497035&r2=1497036&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp Wed Jun 26 18:32:47 2013
@@ -35,6 +35,7 @@
#include "qpid/broker/TopicExchange.h"
#include "qpid/broker/Link.h"
#include "qpid/broker/ExpiryPolicy.h"
+#include "qpid/broker/PersistableObject.h"
#include "qpid/broker/QueueFlowLimit.h"
#include "qpid/broker/QueueSettings.h"
#include "qpid/broker/MessageGroupManager.h"
@@ -295,9 +296,10 @@ Broker::Broker(const Broker::Options& co
// Default exchnge is not replicated.
exchanges.declare(empty, DirectExchange::typeName, false, noReplicateArgs());
+ RecoveredObjects objects;
if (store.get() != 0) {
RecoveryManagerImpl recoverer(
- queues, exchanges, links, dtxManager, protocolRegistry);
+ queues, exchanges, links, dtxManager, protocolRegistry, objects);
recoveryInProgress = true;
store->recover(recoverer);
recoveryInProgress = false;
@@ -349,6 +351,8 @@ Broker::Broker(const Broker::Options& co
// Initialize plugins
Plugin::initializeAll(*this);
+ //recover any objects via object factories
+ objects.restore(*this);
if(conf.enableMgmt) {
if (getAcl()) {
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/ObjectFactory.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/ObjectFactory.cpp?rev=1497036&r1=1497035&r2=1497036&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/ObjectFactory.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/ObjectFactory.cpp Wed Jun 26 18:32:47 2013
@@ -44,6 +44,16 @@ bool ObjectFactoryRegistry::deleteObject
return false;
}
+bool ObjectFactoryRegistry::recoverObject(Broker& broker, const std::string& type, const std::string& name, const qpid::types::Variant::Map& properties,
+ uint64_t persistenceId)
+{
+ for (Factories::iterator i = factories.begin(); i != factories.end(); ++i)
+ {
+ if ((*i)->recoverObject(broker, type, name, properties, persistenceId)) return true;
+ }
+ return false;
+}
+
ObjectFactoryRegistry::~ObjectFactoryRegistry()
{
for (Factories::iterator i = factories.begin(); i != factories.end(); ++i)
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/ObjectFactory.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/ObjectFactory.h?rev=1497036&r1=1497035&r2=1497036&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/ObjectFactory.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/ObjectFactory.h Wed Jun 26 18:32:47 2013
@@ -41,6 +41,7 @@ class ObjectFactory
const std::string& userId, const std::string& connectionId) = 0;
virtual bool deleteObject(Broker&, const std::string& type, const std::string& name, const qpid::types::Variant::Map& properties,
const std::string& userId, const std::string& connectionId) = 0;
+ virtual bool recoverObject(Broker&, const std::string& type, const std::string& name, const qpid::types::Variant::Map& properties, uint64_t persistenceId) = 0;
virtual ~ObjectFactory() {}
private:
};
@@ -52,6 +53,7 @@ class ObjectFactoryRegistry : public Obj
const std::string& userId, const std::string& connectionId);
bool deleteObject(Broker&, const std::string& type, const std::string& name, const qpid::types::Variant::Map& properties,
const std::string& userId, const std::string& connectionId);
+ bool recoverObject(Broker&, const std::string& type, const std::string& name, const qpid::types::Variant::Map& properties, uint64_t persistenceId);
~ObjectFactoryRegistry();
void add(ObjectFactory*);
Added: qpid/trunk/qpid/cpp/src/qpid/broker/PersistableObject.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/PersistableObject.cpp?rev=1497036&view=auto
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/PersistableObject.cpp (added)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/PersistableObject.cpp Wed Jun 26 18:32:47 2013
@@ -0,0 +1,88 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "PersistableObject.h"
+#include "Broker.h"
+#include "qpid/amqp_0_10/Codecs.h"
+#include "qpid/amqp_0_10/CodecsInternal.h"
+#include "qpid/log/Statement.h"
+
+namespace qpid {
+namespace broker {
+namespace {
+const std::string UTF8("utf8");
+}
+PersistableObject::PersistableObject(const std::string& n, const std::string& t, const qpid::types::Variant::Map p) : name(n), type(t), properties(p), id(0) {}
+PersistableObject::PersistableObject() : id(0) {}
+PersistableObject::~PersistableObject() {}
+const std::string& PersistableObject::getName() const { return name; }
+void PersistableObject::setPersistenceId(uint64_t i) const { id = i; }
+uint64_t PersistableObject::getPersistenceId() const { return id; }
+void PersistableObject::encode(framing::Buffer& buffer) const
+{
+ buffer.putShortString(type);
+ buffer.putMediumString(name);
+ qpid::amqp_0_10::encode(properties, qpid::amqp_0_10::encodedSize(properties), buffer);
+}
+uint32_t PersistableObject::encodedSize() const
+{
+ return type.size()+1 + name.size()+2 + qpid::amqp_0_10::encodedSize(properties);
+}
+void PersistableObject::decode(framing::Buffer& buffer)
+{
+ buffer.getShortString(type);
+ buffer.getMediumString(name);
+ qpid::framing::FieldTable ft;
+ buffer.get(ft);
+ qpid::amqp_0_10::translate(ft, properties);
+}
+bool PersistableObject::recover(Broker& broker)
+{
+ return broker.getObjectFactoryRegistry().recoverObject(broker, type, name, properties, id);
+}
+
+namespace {
+class RecoverableObject : public RecoverableConfig
+{
+ public:
+ RecoverableObject(boost::shared_ptr<PersistableObject> o) : object(o) {}
+ void setPersistenceId(uint64_t id) { object->setPersistenceId(id); }
+ private:
+ boost::shared_ptr<PersistableObject> object;
+};
+}
+boost::shared_ptr<RecoverableConfig> RecoveredObjects::recover(framing::Buffer& buffer)
+{
+ boost::shared_ptr<PersistableObject> object(new PersistableObject());
+ object->decode(buffer);
+ objects.push_back(object);
+ return boost::shared_ptr<RecoverableConfig>(new RecoverableObject(object));
+}
+void RecoveredObjects::restore(Broker& broker)
+{
+ //recover objects created through ObjectFactory
+ for (Objects::iterator i = objects.begin(); i != objects.end(); ++i) {
+ if (!(*i)->recover(broker)) {
+ QPID_LOG(warning, "Failed to recover object " << (*i)->name << " of type " << (*i)->type);
+ }
+ }
+}
+
+}} // namespace qpid::broker
Added: qpid/trunk/qpid/cpp/src/qpid/broker/PersistableObject.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/PersistableObject.h?rev=1497036&view=auto
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/PersistableObject.h (added)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/PersistableObject.h Wed Jun 26 18:32:47 2013
@@ -0,0 +1,71 @@
+#ifndef QPID_BROKER_PERSISTABLEOBJECT_H
+#define QPID_BROKER_PERSISTABLEOBJECT_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "PersistableConfig.h"
+#include "qpid/types/Variant.h"
+#include <vector>
+#include <boost/shared_ptr.hpp>
+
+namespace qpid {
+namespace broker {
+class Broker;
+class RecoverableConfig;
+/**
+ * Generic persistence support for objects created through the brokers
+ * create method.
+ */
+class PersistableObject : public PersistableConfig
+{
+ public:
+ PersistableObject(const std::string& name, const std::string& type, const qpid::types::Variant::Map properties);
+ virtual ~PersistableObject();
+ const std::string& getName() const;
+ void setPersistenceId(uint64_t id) const;
+ uint64_t getPersistenceId() const;
+ void encode(framing::Buffer& buffer) const;
+ uint32_t encodedSize() const;
+ friend class RecoveredObjects;
+ private:
+ std::string name;
+ std::string type;
+ qpid::types::Variant::Map properties;
+ mutable uint64_t id;
+
+ PersistableObject();
+ void decode(framing::Buffer& buffer);
+ bool recover(Broker&);
+};
+
+class RecoveredObjects
+{
+ public:
+ boost::shared_ptr<RecoverableConfig> recover(framing::Buffer&);
+ void restore(Broker&);
+ private:
+ typedef std::vector<boost::shared_ptr<PersistableObject> > Objects;
+ Objects objects;
+};
+
+}} // namespace qpid::broker
+
+#endif /*!QPID_BROKER_PERSISTABLEOBJECT_H*/
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/RecoveryManagerImpl.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/RecoveryManagerImpl.cpp?rev=1497036&r1=1497035&r2=1497036&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/RecoveryManagerImpl.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/RecoveryManagerImpl.cpp Wed Jun 26 18:32:47 2013
@@ -22,6 +22,7 @@
#include "qpid/broker/Message.h"
#include "qpid/broker/PersistableMessage.h"
+#include "qpid/broker/PersistableObject.h"
#include "qpid/broker/Queue.h"
#include "qpid/broker/Link.h"
#include "qpid/broker/Bridge.h"
@@ -40,8 +41,8 @@ namespace qpid {
namespace broker {
RecoveryManagerImpl::RecoveryManagerImpl(QueueRegistry& _queues, ExchangeRegistry& _exchanges, LinkRegistry& _links,
- DtxManager& _dtxMgr, ProtocolRegistry& p)
- : queues(_queues), exchanges(_exchanges), links(_links), dtxMgr(_dtxMgr), protocols(p) {}
+ DtxManager& _dtxMgr, ProtocolRegistry& p, RecoveredObjects& o)
+ : queues(_queues), exchanges(_exchanges), links(_links), dtxMgr(_dtxMgr), protocols(p), objects(o) {}
RecoveryManagerImpl::~RecoveryManagerImpl() {}
@@ -145,7 +146,7 @@ RecoverableConfig::shared_ptr RecoveryMa
else if (Bridge::isEncodedBridge(kind))
return RecoverableConfig::shared_ptr(new RecoverableConfigImpl(Bridge::decode (links, buffer)));
- return RecoverableConfig::shared_ptr(); // TODO: raise an exception instead
+ return objects.recover(buffer);
}
void RecoveryManagerImpl::recoveryComplete()
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/RecoveryManagerImpl.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/RecoveryManagerImpl.h?rev=1497036&r1=1497035&r2=1497036&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/RecoveryManagerImpl.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/RecoveryManagerImpl.h Wed Jun 26 18:32:47 2013
@@ -22,6 +22,7 @@
#define _RecoveryManagerImpl_
#include <list>
+#include <vector>
#include "qpid/broker/DtxManager.h"
#include "qpid/broker/ExchangeRegistry.h"
#include "qpid/broker/QueueRegistry.h"
@@ -30,17 +31,21 @@
namespace qpid {
namespace broker {
+class Broker;
+class PersistableObject;
class ProtocolRegistry;
+class RecoveredObjects;
- class RecoveryManagerImpl : public RecoveryManager{
+ class RecoveryManagerImpl : public RecoveryManager {
QueueRegistry& queues;
ExchangeRegistry& exchanges;
LinkRegistry& links;
DtxManager& dtxMgr;
ProtocolRegistry& protocols;
+ RecoveredObjects& objects;
public:
RecoveryManagerImpl(QueueRegistry& queues, ExchangeRegistry& exchanges, LinkRegistry& links,
- DtxManager& dtxMgr, ProtocolRegistry&);
+ DtxManager& dtxMgr, ProtocolRegistry&, RecoveredObjects&);
~RecoveryManagerImpl();
RecoverableExchange::shared_ptr recoverExchange(framing::Buffer& buffer);
Added: qpid/trunk/qpid/cpp/src/qpid/broker/amqp/BrokerContext.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/amqp/BrokerContext.cpp?rev=1497036&view=auto
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/amqp/BrokerContext.cpp (added)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/amqp/BrokerContext.cpp Wed Jun 26 18:32:47 2013
@@ -0,0 +1,32 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "BrokerContext.h"
+
+namespace qpid {
+namespace broker {
+namespace amqp {
+BrokerContext::BrokerContext(Broker& b, Interconnects& i, TopicRegistry& t, const std::string& d) : broker(b), interconnects(i), topics(t), domain(d) {}
+BrokerContext::BrokerContext(BrokerContext& c) : broker(c.broker), interconnects(c.interconnects), topics(c.topics), domain(c.domain) {}
+Broker& BrokerContext::getBroker() { return broker; }
+Interconnects& BrokerContext::getInterconnects() { return interconnects; }
+TopicRegistry& BrokerContext::getTopics() { return topics; }
+std::string BrokerContext::getDomain() { return domain; }
+}}} // namespace qpid::broker::amqp
Added: qpid/trunk/qpid/cpp/src/qpid/broker/amqp/BrokerContext.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/amqp/BrokerContext.h?rev=1497036&view=auto
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/amqp/BrokerContext.h (added)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/amqp/BrokerContext.h Wed Jun 26 18:32:47 2013
@@ -0,0 +1,52 @@
+#ifndef QPID_BROKER_AMQP_BROKERCONTEXT_H
+#define QPID_BROKER_AMQP_BROKERCONTEXT_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include <string>
+
+namespace qpid {
+namespace broker {
+class Broker;
+namespace amqp {
+class Interconnects;
+class TopicRegistry;
+/**
+ * Context providing access to broker scoped entities.
+ */
+class BrokerContext
+{
+ public:
+ BrokerContext(Broker&, Interconnects&, TopicRegistry&, const std::string&);
+ BrokerContext(BrokerContext&);
+ Broker& getBroker();
+ Interconnects& getInterconnects();
+ TopicRegistry& getTopics();
+ std::string getDomain();
+ private:
+ Broker& broker;
+ Interconnects& interconnects;
+ TopicRegistry& topics;
+ std::string domain;
+};
+}}} // namespace qpid::broker::amqp
+
+#endif /*!QPID_BROKER_AMQP_BROKERCONTEXT_H*/
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Connection.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Connection.cpp?rev=1497036&r1=1497035&r2=1497036&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Connection.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Connection.cpp Wed Jun 26 18:32:47 2013
@@ -39,11 +39,12 @@ extern "C" {
namespace qpid {
namespace broker {
namespace amqp {
-Connection::Connection(qpid::sys::OutputControl& o, const std::string& i, qpid::broker::Broker& b, Interconnects& interconnects_, bool saslInUse, const std::string& d)
- : ManagedConnection(b, i),
+
+Connection::Connection(qpid::sys::OutputControl& o, const std::string& i, BrokerContext& b, bool saslInUse)
+ : BrokerContext(b), ManagedConnection(getBroker(), i),
connection(pn_connection()),
transport(pn_transport()),
- out(o), id(i), broker(b), haveOutput(true), interconnects(interconnects_), domain(d)
+ out(o), id(i), haveOutput(true)
{
if (pn_transport_bind(transport, connection)) {
//error
@@ -54,7 +55,7 @@ Connection::Connection(qpid::sys::Output
QPID_LOG_TEST_CAT(trace, protocol, enableTrace);
if (enableTrace) pn_transport_trace(transport, PN_TRACE_FRM);
- broker.getConnectionObservers().connection(*this);
+ getBroker().getConnectionObservers().connection(*this);
if (!saslInUse) {
//feed in a dummy AMQP 1.0 header as engine expects one, but
//we already read it (if sasl is in use we read the sasl
@@ -72,15 +73,11 @@ Connection::Connection(qpid::sys::Output
Connection::~Connection()
{
- broker.getConnectionObservers().closed(*this);
+ getBroker().getConnectionObservers().closed(*this);
pn_transport_free(transport);
pn_connection_free(connection);
}
-Interconnects& Connection::getInterconnects()
-{
- return interconnects;
-}
pn_transport_t* Connection::getTransport()
{
return transport;
@@ -173,11 +170,11 @@ void Connection::open()
{
readPeerProperties();
- pn_connection_set_container(connection, broker.getFederationTag().c_str());
+ pn_connection_set_container(connection, getBroker().getFederationTag().c_str());
pn_connection_open(connection);
out.connectionEstablished();
opened();
- broker.getConnectionObservers().opened(*this);
+ getBroker().getConnectionObservers().opened(*this);
}
void Connection::readPeerProperties()
@@ -227,7 +224,7 @@ void Connection::process()
for (pn_session_t* s = pn_session_head(connection, REQUIRES_OPEN); s; s = pn_session_next(s, REQUIRES_OPEN)) {
QPID_LOG_CAT(debug, model, id << " session begun");
pn_session_open(s);
- boost::shared_ptr<Session> ssn(new Session(s, broker, *this, out));
+ boost::shared_ptr<Session> ssn(new Session(s, *this, out));
sessions[s] = ssn;
}
for (pn_link_t* l = pn_link_head(connection, REQUIRES_OPEN); l; l = pn_link_next(l, REQUIRES_OPEN)) {
@@ -323,11 +320,6 @@ std::string Connection::getError()
return text.str();
}
-std::string Connection::getDomain() const
-{
- return domain;
-}
-
void Connection::abort()
{
out.abort();
@@ -336,7 +328,7 @@ void Connection::abort()
void Connection::setUserId(const std::string& user)
{
ManagedConnection::setUserId(user);
- AclModule* acl = broker.getAcl();
+ AclModule* acl = getBroker().getAcl();
if (acl && !acl->approveConnection(*this))
{
throw Exception(qpid::amqp::error_conditions::RESOURCE_LIMIT_EXCEEDED, "User connection denied by configured limit");
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Connection.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Connection.h?rev=1497036&r1=1497035&r2=1497036&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Connection.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Connection.h Wed Jun 26 18:32:47 2013
@@ -22,6 +22,7 @@
*
*/
#include "qpid/sys/ConnectionCodec.h"
+#include "qpid/broker/amqp/BrokerContext.h"
#include "qpid/broker/amqp/ManagedConnection.h"
#include <map>
#include <boost/shared_ptr.hpp>
@@ -42,10 +43,10 @@ class Session;
/**
* AMQP 1.0 protocol support for broker
*/
-class Connection : public sys::ConnectionCodec, public ManagedConnection
+class Connection : public BrokerContext, public sys::ConnectionCodec, public ManagedConnection
{
public:
- Connection(qpid::sys::OutputControl& out, const std::string& id, qpid::broker::Broker& broker, Interconnects&, bool saslInUse, const std::string& domain);
+ Connection(qpid::sys::OutputControl& out, const std::string& id, BrokerContext& context, bool saslInUse);
virtual ~Connection();
size_t decode(const char* buffer, size_t size);
virtual size_t encode(char* buffer, size_t size);
@@ -56,22 +57,16 @@ class Connection : public sys::Connectio
framing::ProtocolVersion getVersion() const;
pn_transport_t* getTransport();
- Interconnects& getInterconnects();
- std::string getDomain() const;
void setUserId(const std::string&);
void abort();
-
protected:
typedef std::map<pn_session_t*, boost::shared_ptr<Session> > Sessions;
pn_connection_t* connection;
pn_transport_t* transport;
qpid::sys::OutputControl& out;
const std::string id;
- qpid::broker::Broker& broker;
bool haveOutput;
Sessions sessions;
- Interconnects& interconnects;
- std::string domain;
virtual void process();
std::string getError();
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Domain.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Domain.cpp?rev=1497036&r1=1497035&r2=1497036&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Domain.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Domain.cpp Wed Jun 26 18:32:47 2013
@@ -50,6 +50,7 @@ const std::string SASL_MECHANISMS("sasl_
const std::string SASL_SERVICE("sasl_service");
const std::string MIN_SSF("min_ssf");
const std::string MAX_SSF("max_ssf");
+const std::string DURABLE("durable");
class Wrapper : public qpid::sys::ConnectionCodec
{
public:
@@ -119,14 +120,23 @@ bool get(qpid::Url& url, const std::stri
return false;
}
}
+bool get(const std::string& key, const qpid::types::Variant::Map& map)
+{
+ qpid::types::Variant::Map::const_iterator i = map.find(key);
+ if (i != map.end()) {
+ return i->second.asBool();
+ } else {
+ return false;
+ }
+}
}
-class InterconnectFactory : public qpid::sys::ConnectionCodec::Factory, public boost::enable_shared_from_this<InterconnectFactory>
+class InterconnectFactory : public BrokerContext, public qpid::sys::ConnectionCodec::Factory, public boost::enable_shared_from_this<InterconnectFactory>
{
public:
- InterconnectFactory(bool incoming, const std::string& name, const qpid::types::Variant::Map& properties, Domain&, Broker&, Interconnects&);
+ InterconnectFactory(bool incoming, const std::string& name, const qpid::types::Variant::Map& properties, Domain&, BrokerContext&);
InterconnectFactory(bool incoming, const std::string& name, const std::string& source, const std::string& target,
- Domain&, Broker&, Interconnects&, boost::shared_ptr<Relay>);
+ Domain&, BrokerContext&, boost::shared_ptr<Relay>);
qpid::sys::ConnectionCodec* create(framing::ProtocolVersion, qpid::sys::OutputControl&, const std::string&, const qpid::sys::SecuritySettings&);
qpid::sys::ConnectionCodec* create(qpid::sys::OutputControl&, const std::string&, const qpid::sys::SecuritySettings&);
bool connect();
@@ -140,14 +150,12 @@ class InterconnectFactory : public qpid:
qpid::Url::iterator next;
std::string hostname;
Domain& domain;
- Broker& broker;
- Interconnects& registry;
qpid::Address address;
boost::shared_ptr<Relay> relay;
};
-InterconnectFactory::InterconnectFactory(bool i, const std::string& n, const qpid::types::Variant::Map& properties, Domain& d, Broker& b, Interconnects& r)
- : incoming(i), name(n), url(d.getUrl()), domain(d), broker(b), registry(r)
+InterconnectFactory::InterconnectFactory(bool i, const std::string& n, const qpid::types::Variant::Map& properties, Domain& d, BrokerContext& c)
+ : BrokerContext(c), incoming(i), name(n), url(d.getUrl()), domain(d)
{
get(source, SOURCE, properties);
get(target, TARGET, properties);
@@ -155,8 +163,8 @@ InterconnectFactory::InterconnectFactory
}
InterconnectFactory::InterconnectFactory(bool i, const std::string& n, const std::string& source_, const std::string& target_,
- Domain& d, Broker& b, Interconnects& r, boost::shared_ptr<Relay> relay_)
- : incoming(i), name(n), source(source_), target(target_), url(d.getUrl()), domain(d), broker(b), registry(r), relay(relay_)
+ Domain& d, BrokerContext& c, boost::shared_ptr<Relay> relay_)
+ : BrokerContext(c), incoming(i), name(n), source(source_), target(target_), url(d.getUrl()), domain(d), relay(relay_)
{
next = url.begin();
}
@@ -168,8 +176,8 @@ qpid::sys::ConnectionCodec* Interconnect
qpid::sys::ConnectionCodec* InterconnectFactory::create(qpid::sys::OutputControl& out, const std::string& id, const qpid::sys::SecuritySettings& t)
{
bool useSasl = domain.getMechanisms() != NONE;
- boost::shared_ptr<Interconnect> connection(new Interconnect(out, id, broker, useSasl, incoming, name, source, target, domain, registry));
- if (!relay) registry.add(name, connection);
+ boost::shared_ptr<Interconnect> connection(new Interconnect(out, id, *this, useSasl, incoming, name, source, target, domain));
+ if (!relay) getInterconnects().add(name, connection);
else connection->setRelay(relay);
std::auto_ptr<qpid::sys::ConnectionCodec> codec;
@@ -191,7 +199,7 @@ bool InterconnectFactory::connect()
next++;
hostname = address.host;
QPID_LOG (info, "Inter-broker connection initiated (" << address << ")");
- broker.connect(name, address.host, boost::lexical_cast<std::string>(address.port), address.protocol, this, boost::bind(&InterconnectFactory::failed, this, _1, _2));
+ getBroker().connect(name, address.host, boost::lexical_cast<std::string>(address.port), address.protocol, this, boost::bind(&InterconnectFactory::failed, this, _1, _2));
return true;
}
@@ -204,7 +212,7 @@ void InterconnectFactory::failed(int, st
}
Domain::Domain(const std::string& n, const qpid::types::Variant::Map& properties, Broker& b)
- : name(n), durable(false), broker(b), mechanisms("ANONYMOUS"), service(qpid::saslName), minSsf(0), maxSsf(0), agent(b.getManagementAgent())
+ : PersistableObject(n, "domain", properties), name(n), durable(get(DURABLE, properties)), broker(b), mechanisms("ANONYMOUS"), service(qpid::saslName), minSsf(0), maxSsf(0), agent(b.getManagementAgent())
{
if (!get(url, URL, properties)) {
QPID_LOG(error, "No URL specified for domain " << name << "!");
@@ -212,7 +220,6 @@ Domain::Domain(const std::string& n, con
} else {
QPID_LOG(notice, "Created domain " << name << " with url " << url << " from " << properties);
}
- //TODO: durable
get(username, USERNAME, properties);
get(password, PASSWORD, properties);
get(mechanisms, SASL_MECHANISMS, properties);
@@ -249,21 +256,26 @@ qpid::Url Domain::getUrl() const
return url;
}
+bool Domain::isDurable() const
+{
+ return durable;
+}
+
std::auto_ptr<qpid::Sasl> Domain::sasl(const std::string& hostname)
{
return qpid::SaslFactory::getInstance().create(username, password, service, hostname, minSsf, maxSsf, false);
}
-void Domain::connect(bool incoming, const std::string& name, const qpid::types::Variant::Map& properties, Interconnects& registry)
+void Domain::connect(bool incoming, const std::string& name, const qpid::types::Variant::Map& properties, BrokerContext& context)
{
- boost::shared_ptr<InterconnectFactory> factory(new InterconnectFactory(incoming, name, properties, *this, broker, registry));
+ boost::shared_ptr<InterconnectFactory> factory(new InterconnectFactory(incoming, name, properties, *this, context));
factory->connect();
addPending(factory);
}
-void Domain::connect(bool incoming, const std::string& name, const std::string& source, const std::string& target, Interconnects& registry, boost::shared_ptr<Relay> relay)
+void Domain::connect(bool incoming, const std::string& name, const std::string& source, const std::string& target, BrokerContext& context, boost::shared_ptr<Relay> relay)
{
- boost::shared_ptr<InterconnectFactory> factory(new InterconnectFactory(incoming, name, source, target, *this, broker, registry, relay));
+ boost::shared_ptr<InterconnectFactory> factory(new InterconnectFactory(incoming, name, source, target, *this, context, relay));
factory->connect();
addPending(factory);
}
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Domain.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Domain.h?rev=1497036&r1=1497035&r2=1497036&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Domain.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Domain.h Wed Jun 26 18:32:47 2013
@@ -25,6 +25,7 @@
#include "qpid/types/Variant.h"
#include "qpid/Url.h"
#include "qpid/Version.h"
+#include "qpid/broker/PersistableObject.h"
#include "qpid/management/Manageable.h"
#include "qpid/sys/Mutex.h"
#include "qmf/org/apache/qpid/broker/Domain.h"
@@ -42,19 +43,20 @@ namespace broker {
class Broker;
namespace amqp {
class InterconnectFactory;
-class Interconnects;
+class BrokerContext;
class Relay;
-class Domain : public qpid::management::Manageable
+class Domain : public PersistableObject, public qpid::management::Manageable
{
public:
Domain(const std::string& name, const qpid::types::Variant::Map& properties, Broker&);
~Domain();
- void connect(bool incoming, const std::string& name, const qpid::types::Variant::Map& properties, Interconnects&);
- void connect(bool incoming, const std::string& name, const std::string& source, const std::string& target, Interconnects&, boost::shared_ptr<Relay>);
+ void connect(bool incoming, const std::string& name, const qpid::types::Variant::Map& properties, BrokerContext&);
+ void connect(bool incoming, const std::string& name, const std::string& source, const std::string& target, BrokerContext&, boost::shared_ptr<Relay>);
std::auto_ptr<qpid::Sasl> sasl(const std::string& hostname);
const std::string& getMechanisms() const;
qpid::Url getUrl() const;
+ bool isDurable() const;
void addPending(boost::shared_ptr<InterconnectFactory>);
void removePending(boost::shared_ptr<InterconnectFactory>);
boost::shared_ptr<qpid::management::ManagementObject> GetManagementObject() const;
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Interconnect.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Interconnect.cpp?rev=1497036&r1=1497035&r2=1497036&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Interconnect.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Interconnect.cpp Wed Jun 26 18:32:47 2013
@@ -39,9 +39,9 @@ namespace qpid {
namespace broker {
namespace amqp {
-Interconnect::Interconnect(qpid::sys::OutputControl& out, const std::string& id, qpid::broker::Broker& broker, bool saslInUse,
- bool i, const std::string& n, const std::string& s, const std::string& t, Domain& d, Interconnects& r)
- : Connection(out, id, broker, r, true, std::string()), incoming(i), name(n), source(s), target(t), domain(d), registry(r), headerDiscarded(saslInUse),
+Interconnect::Interconnect(qpid::sys::OutputControl& out, const std::string& id, BrokerContext& broker, bool saslInUse,
+ bool i, const std::string& n, const std::string& s, const std::string& t, Domain& d)
+ : Connection(out, id, broker, true), incoming(i), name(n), source(s), target(t), domain(d), headerDiscarded(saslInUse),
closeRequested(false), isTransportDeleted(false)
{}
@@ -83,10 +83,9 @@ void Interconnect::process()
if ((pn_connection_state(connection) & UNINIT) == UNINIT) {
QPID_LOG_CAT(debug, model, id << " interconnect opened");
open();
-
pn_session_t* s = pn_session(connection);
pn_session_open(s);
- boost::shared_ptr<Session> ssn(new Session(s, broker, *this, out));
+ boost::shared_ptr<Session> ssn(new Session(s, *this, out));
sessions[s] = ssn;
pn_link_t* l = incoming ? pn_receiver(s, name.c_str()) : pn_sender(s, name.c_str());
@@ -111,7 +110,7 @@ void Interconnect::deletedFromRegistry()
void Interconnect::transportDeleted()
{
isTransportDeleted = true;
- registry.remove(name);
+ getInterconnects().remove(name);
}
bool Interconnect::isLink() const
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Interconnect.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Interconnect.h?rev=1497036&r1=1497035&r2=1497036&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Interconnect.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Interconnect.h Wed Jun 26 18:32:47 2013
@@ -37,8 +37,8 @@ class Relay;
class Interconnect : public Connection
{
public:
- Interconnect(qpid::sys::OutputControl& out, const std::string& id, qpid::broker::Broker& broker, bool saslInUse,
- bool incoming, const std::string& name, const std::string& source, const std::string& target, Domain&, Interconnects&);
+ Interconnect(qpid::sys::OutputControl& out, const std::string& id, BrokerContext& broker, bool saslInUse,
+ bool incoming, const std::string& name, const std::string& source, const std::string& target, Domain&);
void setRelay(boost::shared_ptr<Relay>);
~Interconnect();
size_t encode(char* buffer, size_t size);
@@ -51,7 +51,6 @@ class Interconnect : public Connection
std::string source;
std::string target;
Domain& domain;
- Interconnects& registry;
bool headerDiscarded;
boost::shared_ptr<Relay> relay;
bool closeRequested;
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Interconnects.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Interconnects.cpp?rev=1497036&r1=1497035&r2=1497036&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Interconnects.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Interconnects.cpp Wed Jun 26 18:32:47 2013
@@ -30,6 +30,7 @@
#include "qpid/sys/OutputControl.h"
#include "qpid/log/Statement.h"
#include <boost/shared_ptr.hpp>
+#include <assert.h>
namespace qpid {
namespace broker {
@@ -50,6 +51,7 @@ bool Interconnects::createObject(Broker&
if (i == domains.end()) {
boost::shared_ptr<Domain> domain(new Domain(name, properties, broker));
domains[name] = domain;
+ if (domain->isDurable()) broker.getStore().create(*domain);
return true;
} else {
return false;
@@ -72,20 +74,23 @@ bool Interconnects::createObject(Broker&
throw qpid::Exception(QPID_MSG("Domain must be specified"));
}
}
- domain->connect(type == INCOMING_TYPE, name, properties, *this);
+ domain->connect(type == INCOMING_TYPE, name, properties, *context);
return true;
} else {
return false;
}
}
-bool Interconnects::deleteObject(Broker&, const std::string& type, const std::string& name, const qpid::types::Variant::Map& /*properties*/,
+bool Interconnects::deleteObject(Broker& broker, const std::string& type, const std::string& name, const qpid::types::Variant::Map& /*properties*/,
const std::string& /*userId*/, const std::string& /*connectionId*/)
{
if (type == DOMAIN_TYPE) {
+ boost::shared_ptr<Domain> domain;
qpid::sys::ScopedLock<qpid::sys::Mutex> l(lock);
DomainMap::iterator i = domains.find(name);
if (i != domains.end()) {
+ domain = i->second;
domains.erase(i);
+ if (domain->isDurable()) broker.getStore().destroy(*domain);
return true;
} else {
throw qpid::Exception(QPID_MSG("No such domain: " << name));
@@ -109,6 +114,20 @@ bool Interconnects::deleteObject(Broker&
}
}
+bool Interconnects::recoverObject(Broker& broker, const std::string& type, const std::string& name, const qpid::types::Variant::Map& properties,
+ uint64_t persistenceId)
+{
+ if (type == DOMAIN_TYPE) {
+ boost::shared_ptr<Domain> domain(new Domain(name, properties, broker));
+ domain->setPersistenceId(persistenceId);
+ qpid::sys::ScopedLock<qpid::sys::Mutex> l(lock);
+ domains[name] = domain;
+ return true;
+ } else {
+ return false;
+ }
+}
+
bool Interconnects::add(const std::string& name, boost::shared_ptr<Interconnect> connection)
{
@@ -145,7 +164,13 @@ boost::shared_ptr<Domain> Interconnects:
} else {
return i->second;
}
-
}
+void Interconnects::setContext(BrokerContext& c)
+{
+ context = &c;
+ assert(&(context->getInterconnects()) == this);
+}
+
+Interconnects::Interconnects() : context(0) {}
}}} // namespace qpid::broker::amqp
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Interconnects.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Interconnects.h?rev=1497036&r1=1497035&r2=1497036&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Interconnects.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Interconnects.h Wed Jun 26 18:32:47 2013
@@ -30,7 +30,7 @@
namespace qpid {
namespace broker {
namespace amqp {
-
+class BrokerContext;
class Domain;
class Interconnect;
/**
@@ -43,19 +43,23 @@ class Interconnects : public ObjectFacto
const std::string& userId, const std::string& connectionId);
bool deleteObject(Broker&, const std::string& type, const std::string& name, const qpid::types::Variant::Map& properties,
const std::string& userId, const std::string& connectionId);
-
+ bool recoverObject(Broker&, const std::string& type, const std::string& name, const qpid::types::Variant::Map& properties,
+ uint64_t persistenceId);
bool add(const std::string&, boost::shared_ptr<Interconnect>);
boost::shared_ptr<Interconnect> get(const std::string&);
bool remove(const std::string&);
boost::shared_ptr<Domain> findDomain(const std::string&);
+ void setContext(BrokerContext&);
+ Interconnects();
private:
typedef std::map<std::string, boost::shared_ptr<Interconnect> > InterconnectMap;
typedef std::map<std::string, boost::shared_ptr<Domain> > DomainMap;
InterconnectMap interconnects;
DomainMap domains;
qpid::sys::Mutex lock;
+ BrokerContext* context;
};
}}} // namespace qpid::broker::amqp
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/amqp/ProtocolPlugin.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/amqp/ProtocolPlugin.cpp?rev=1497036&r1=1497035&r2=1497036&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/amqp/ProtocolPlugin.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/amqp/ProtocolPlugin.cpp Wed Jun 26 18:32:47 2013
@@ -30,6 +30,7 @@
#include "qpid/broker/amqp/Interconnects.h"
#include "qpid/broker/amqp/Message.h"
#include "qpid/broker/amqp/Sasl.h"
+#include "qpid/broker/amqp/Topic.h"
#include "qpid/broker/amqp/Translation.h"
#include "qpid/broker/amqp_0_10/MessageTransfer.h"
#include "qpid/framing/Buffer.h"
@@ -50,20 +51,20 @@ struct Options : public qpid::Options {
}
};
-class ProtocolImpl : public Protocol
+class ProtocolImpl : public BrokerContext, public Protocol
{
public:
- ProtocolImpl(Interconnects* i, Broker& b, const std::string& d) : interconnects(i), broker(b), domain(d)
+ ProtocolImpl(Interconnects* interconnects, TopicRegistry* topics, Broker& broker, const std::string& domain)
+ : BrokerContext(broker, *interconnects, *topics, domain)
{
+ interconnects->setContext(*this);
broker.getObjectFactoryRegistry().add(interconnects);//registry deletes on shutdown
+ broker.getObjectFactoryRegistry().add(topics);//registry deletes on shutdown
}
qpid::sys::ConnectionCodec* create(const qpid::framing::ProtocolVersion&, qpid::sys::OutputControl&, const std::string&, const qpid::sys::SecuritySettings&);
boost::intrusive_ptr<const qpid::broker::amqp_0_10::MessageTransfer> translate(const qpid::broker::Message&);
boost::shared_ptr<RecoverableMessage> recover(qpid::framing::Buffer&);
private:
- Interconnects* interconnects;
- Broker& broker;
- std::string domain;
};
struct ProtocolPlugin : public Plugin
@@ -76,7 +77,7 @@ struct ProtocolPlugin : public Plugin
//need to register protocol before recovery from store
broker::Broker* broker = dynamic_cast<qpid::broker::Broker*>(&target);
if (broker) {
- ProtocolImpl* impl = new ProtocolImpl(new Interconnects(), *broker, options.domain);
+ ProtocolImpl* impl = new ProtocolImpl(new Interconnects(), new TopicRegistry(), *broker, options.domain);
broker->getProtocolRegistry().add("AMQP 1.0", impl);//registry deletes on shutdown
}
}
@@ -90,22 +91,21 @@ qpid::sys::ConnectionCodec* ProtocolImpl
{
if (v == qpid::framing::ProtocolVersion(1, 0)) {
if (v.getProtocol() == qpid::framing::ProtocolVersion::SASL) {
- if (broker.getOptions().auth) {
+ if (getBroker().getOptions().auth) {
QPID_LOG(info, "Using AMQP 1.0 (with SASL layer)");
- return new qpid::broker::amqp::Sasl(out, id, broker, *interconnects,
- qpid::SaslFactory::getInstance().createServer(broker.getOptions().realm,broker.getOptions().requireEncrypted, external),
- domain);
+ return new qpid::broker::amqp::Sasl(out, id, *this,
+ qpid::SaslFactory::getInstance().createServer(getBroker().getOptions().realm,getBroker().getOptions().requireEncrypted, external));
} else {
- std::auto_ptr<SaslServer> authenticator(new qpid::NullSaslServer(broker.getOptions().realm));
+ std::auto_ptr<SaslServer> authenticator(new qpid::NullSaslServer(getBroker().getOptions().realm));
QPID_LOG(info, "Using AMQP 1.0 (with dummy SASL layer)");
- return new qpid::broker::amqp::Sasl(out, id, broker, *interconnects, authenticator, domain);
+ return new qpid::broker::amqp::Sasl(out, id, *this, authenticator);
}
} else {
- if (broker.getOptions().auth) {
+ if (getBroker().getOptions().auth) {
throw qpid::Exception("SASL layer required!");
} else {
QPID_LOG(info, "Using AMQP 1.0 (no SASL layer)");
- return new qpid::broker::amqp::Connection(out, id, broker, *interconnects, false, domain);
+ return new qpid::broker::amqp::Connection(out, id, *this, false);
}
}
}
@@ -114,7 +114,7 @@ qpid::sys::ConnectionCodec* ProtocolImpl
boost::intrusive_ptr<const qpid::broker::amqp_0_10::MessageTransfer> ProtocolImpl::translate(const qpid::broker::Message& m)
{
- qpid::broker::amqp::Translation t(m, &broker);
+ qpid::broker::amqp::Translation t(m, &getBroker());
return t.getTransfer();
}
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Sasl.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Sasl.cpp?rev=1497036&r1=1497035&r2=1497036&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Sasl.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Sasl.cpp Wed Jun 26 18:32:47 2013
@@ -31,8 +31,8 @@ namespace qpid {
namespace broker {
namespace amqp {
-Sasl::Sasl(qpid::sys::OutputControl& o, const std::string& id, qpid::broker::Broker& broker, Interconnects& i, std::auto_ptr<qpid::SaslServer> auth, const std::string& domain)
- : qpid::amqp::SaslServer(id), out(o), connection(out, id, broker, i, true, domain),
+Sasl::Sasl(qpid::sys::OutputControl& o, const std::string& id, BrokerContext& context, std::auto_ptr<qpid::SaslServer> auth)
+ : qpid::amqp::SaslServer(id), out(o), connection(out, id, context, true),
authenticator(auth),
state(INCOMPLETE), writeHeader(true), haveOutput(true)
{
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Sasl.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Sasl.h?rev=1497036&r1=1497035&r2=1497036&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Sasl.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Sasl.h Wed Jun 26 18:32:47 2013
@@ -39,7 +39,7 @@ namespace amqp {
class Sasl : public sys::ConnectionCodec, qpid::amqp::SaslServer
{
public:
- Sasl(qpid::sys::OutputControl& out, const std::string& id, qpid::broker::Broker& broker, Interconnects&, std::auto_ptr<qpid::SaslServer> authenticator, const std::string& domain);
+ Sasl(qpid::sys::OutputControl& out, const std::string& id, BrokerContext& context, std::auto_ptr<qpid::SaslServer> authenticator);
~Sasl();
size_t decode(const char* buffer, size_t size);
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Session.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Session.cpp?rev=1497036&r1=1497035&r2=1497036&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Session.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Session.cpp Wed Jun 26 18:32:47 2013
@@ -26,6 +26,7 @@
#include "Domain.h"
#include "Interconnects.h"
#include "Relay.h"
+#include "Topic.h"
#include "qpid/broker/Broker.h"
#include "qpid/broker/DeliverableMessage.h"
#include "qpid/broker/Exchange.h"
@@ -128,24 +129,27 @@ class IncomingToExchange : public Decodi
Authorise& authorise;
};
-Session::Session(pn_session_t* s, qpid::broker::Broker& b, Connection& c, qpid::sys::OutputControl& o)
- : ManagedSession(b, c, (boost::format("%1%") % s).str()), session(s), broker(b), connection(c), out(o), deleted(false), authorise(connection.getUserId(), broker.getAcl()) {}
+Session::Session(pn_session_t* s, Connection& c, qpid::sys::OutputControl& o)
+ : ManagedSession(c.getBroker(), c, (boost::format("%1%") % s).str()), session(s), connection(c), out(o), deleted(false),
+ authorise(connection.getUserId(), connection.getBroker().getAcl()) {}
Session::ResolvedNode Session::resolve(const std::string name, pn_terminus_t* terminus, bool incoming)
{
ResolvedNode node;
- node.exchange = broker.getExchanges().find(name);
- node.queue = broker.getQueues().find(name);
+ node.exchange = connection.getBroker().getExchanges().find(name);
+ node.queue = connection.getBroker().getQueues().find(name);
+ node.topic = connection.getTopics().get(name);
+ if (node.topic) node.exchange = node.topic->getExchange();
if (!node.queue && !node.exchange) {
if (pn_terminus_is_dynamic(terminus) || is_capability_requested(CREATE_ON_DEMAND, pn_terminus_capabilities(terminus))) {
//is it a queue or an exchange?
node.properties.read(pn_terminus_properties(terminus));
if (node.properties.isQueue()) {
- node.queue = broker.createQueue(name, node.properties.getQueueSettings(), this, node.properties.getAlternateExchange(), connection.getUserId(), connection.getId()).first;
+ node.queue = connection.getBroker().createQueue(name, node.properties.getQueueSettings(), this, node.properties.getAlternateExchange(), connection.getUserId(), connection.getId()).first;
} else {
qpid::framing::FieldTable args;
- node.exchange = broker.createExchange(name, node.properties.getExchangeType(), node.properties.isDurable(), node.properties.getAlternateExchange(),
+ node.exchange = connection.getBroker().createExchange(name, node.properties.getExchangeType(), node.properties.isDurable(), node.properties.getAlternateExchange(),
args, connection.getUserId(), connection.getId()).first;
}
} else {
@@ -159,13 +163,16 @@ Session::ResolvedNode Session::resolve(c
if (d) {
node.relay = boost::shared_ptr<Relay>(new Relay(1000));
if (incoming) {
- d->connect(false, id, name, local, connection.getInterconnects(), node.relay);
+ d->connect(false, id, name, local, connection, node.relay);
} else {
- d->connect(true, id, local, name, connection.getInterconnects(), node.relay);
+ d->connect(true, id, local, name, connection, node.relay);
}
}
}
}
+ } else if (node.queue && node.topic) {
+ QPID_LOG_CAT(warning, protocol, "Ambiguous node name; " << name << " could be queue or topic, assuming topic");
+ node.queue.reset();
} else if (node.queue && node.exchange) {
QPID_LOG_CAT(warning, protocol, "Ambiguous node name; " << name << " could be queue or exchange, assuming queue");
node.exchange.reset();
@@ -255,19 +262,20 @@ void Session::setupIncoming(pn_link_t* l
source = sourceAddress;
}
if (node.queue) {
- boost::shared_ptr<Incoming> q(new IncomingToQueue(broker, *this, node.queue, link, source));
+ boost::shared_ptr<Incoming> q(new IncomingToQueue(connection.getBroker(), *this, node.queue, link, source));
incoming[link] = q;
} else if (node.exchange) {
- boost::shared_ptr<Incoming> e(new IncomingToExchange(broker, *this, node.exchange, link, source));
+ boost::shared_ptr<Incoming> e(new IncomingToExchange(connection.getBroker(), *this, node.exchange, link, source));
incoming[link] = e;
} else if (node.relay) {
- boost::shared_ptr<Incoming> in(new IncomingToRelay(link, broker, *this, source, name, pn_link_name(link), node.relay));
+ boost::shared_ptr<Incoming> in(new IncomingToRelay(link, connection.getBroker(), *this, source, name, pn_link_name(link), node.relay));
incoming[link] = in;
} else {
pn_terminus_set_type(pn_link_target(link), PN_UNSPECIFIED);
throw qpid::Exception("Node not found: " + name);/*not-found*/
}
- if (broker.getOptions().auth && !connection.isLink()) incoming[link]->verify(connection.getUserId(), broker.getOptions().realm);
+ if (connection.getBroker().getOptions().auth && !connection.isLink())
+ incoming[link]->verify(connection.getUserId(), connection.getBroker().getOptions().realm);
QPID_LOG(debug, "Incoming link attached");
}
@@ -291,7 +299,7 @@ void Session::setupOutgoing(pn_link_t* l
if (node.queue) {
authorise.outgoing(node.queue);
- boost::shared_ptr<Outgoing> q(new OutgoingFromQueue(broker, name, target, node.queue, link, *this, out, false));
+ boost::shared_ptr<Outgoing> q(new OutgoingFromQueue(connection.getBroker(), name, target, node.queue, link, *this, out, false));
q->init();
filter.apply(q);
outgoing[link] = q;
@@ -300,6 +308,11 @@ void Session::setupOutgoing(pn_link_t* l
bool shared = is_capability_requested(SHARED, pn_terminus_capabilities(source));
bool durable = pn_terminus_get_durability(source);
QueueSettings settings(durable, !durable);
+ if (node.topic) {
+ settings = node.topic->getPolicy();
+ settings.durable = durable;
+ settings.autodelete = !durable;
+ }
filter.configure(settings);
//TODO: populate settings from source details when available from engine
std::stringstream queueName;
@@ -313,15 +326,15 @@ void Session::setupOutgoing(pn_link_t* l
queueName << connection.getContainerId() << "_" << pn_link_name(link);
}
boost::shared_ptr<qpid::broker::Queue> queue
- = broker.createQueue(queueName.str(), settings, this, "", connection.getUserId(), connection.getId()).first;
+ = connection.getBroker().createQueue(queueName.str(), settings, this, "", connection.getUserId(), connection.getId()).first;
if (!shared) queue->setExclusiveOwner(this);
authorise.outgoing(node.exchange, queue, filter);
filter.bind(node.exchange, queue);
- boost::shared_ptr<Outgoing> q(new OutgoingFromQueue(broker, name, target, queue, link, *this, out, !shared));
+ boost::shared_ptr<Outgoing> q(new OutgoingFromQueue(connection.getBroker(), name, target, queue, link, *this, out, !shared));
outgoing[link] = q;
q->init();
} else if (node.relay) {
- boost::shared_ptr<Outgoing> out(new OutgoingFromRelay(link, broker, *this, name, target, pn_link_name(link), node.relay));
+ boost::shared_ptr<Outgoing> out(new OutgoingFromRelay(link, connection.getBroker(), *this, name, target, pn_link_name(link), node.relay));
outgoing[link] = out;
out->init();
} else {
@@ -344,11 +357,11 @@ void Session::attach(pn_link_t* link, co
if (relay) {
if (pn_link_is_sender(link)) {
- boost::shared_ptr<Outgoing> out(new OutgoingFromRelay(link, broker, *this, src, tgt, pn_link_name(link), relay));
+ boost::shared_ptr<Outgoing> out(new OutgoingFromRelay(link, connection.getBroker(), *this, src, tgt, pn_link_name(link), relay));
outgoing[link] = out;
out->init();
} else {
- boost::shared_ptr<Incoming> in(new IncomingToRelay(link, broker, *this, src, tgt, pn_link_name(link), relay));
+ boost::shared_ptr<Incoming> in(new IncomingToRelay(link, connection.getBroker(), *this, src, tgt, pn_link_name(link), relay));
incoming[link] = in;
}
} else {
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Session.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Session.h?rev=1497036&r1=1497035&r2=1497036&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Session.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Session.h Wed Jun 26 18:32:47 2013
@@ -50,13 +50,14 @@ class Connection;
class Incoming;
class Outgoing;
class Relay;
+class Topic;
/**
*
*/
class Session : public ManagedSession, public boost::enable_shared_from_this<Session>
{
public:
- Session(pn_session_t*, qpid::broker::Broker&, Connection&, qpid::sys::OutputControl&);
+ Session(pn_session_t*, Connection&, qpid::sys::OutputControl&);
/**
* called for links initiated by the peer
*/
@@ -82,7 +83,6 @@ class Session : public ManagedSession, p
typedef std::map<pn_link_t*, boost::shared_ptr<Outgoing> > OutgoingLinks;
typedef std::map<pn_link_t*, boost::shared_ptr<Incoming> > IncomingLinks;
pn_session_t* session;
- qpid::broker::Broker& broker;
Connection& connection;
qpid::sys::OutputControl& out;
IncomingLinks incoming;
@@ -97,6 +97,7 @@ class Session : public ManagedSession, p
{
boost::shared_ptr<qpid::broker::Exchange> exchange;
boost::shared_ptr<qpid::broker::Queue> queue;
+ boost::shared_ptr<qpid::broker::amqp::Topic> topic;
boost::shared_ptr<Relay> relay;
NodeProperties properties;
};
Added: qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Topic.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Topic.cpp?rev=1497036&view=auto
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Topic.cpp (added)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Topic.cpp Wed Jun 26 18:32:47 2013
@@ -0,0 +1,174 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "qpid/broker/amqp/Topic.h"
+#include "qpid/broker/Broker.h"
+#include "qpid/management/ManagementAgent.h"
+
+namespace _qmf = qmf::org::apache::qpid::broker;
+
+namespace qpid {
+namespace broker {
+namespace amqp {
+namespace {
+const std::string TOPIC("topic");
+const std::string EXCHANGE("exchange");
+const std::string DURABLE("durable");
+const std::string EMPTY;
+
+std::string getProperty(const std::string& k, const qpid::types::Variant::Map& m)
+{
+ qpid::types::Variant::Map::const_iterator i = m.find(k);
+ if (i == m.end()) return EMPTY;
+ else return i->second;
+}
+
+bool testProperty(const std::string& k, const qpid::types::Variant::Map& m)
+{
+ qpid::types::Variant::Map::const_iterator i = m.find(k);
+ if (i == m.end()) return false;
+ else return i->second;
+}
+
+}
+
+Topic::Topic(Broker& broker, const std::string& n, const qpid::types::Variant::Map& properties)
+ : PersistableObject(n, TOPIC, properties), name(n), durable(testProperty(DURABLE, properties)), exchange(broker.getExchanges().get(getProperty(EXCHANGE, properties)))
+{
+ if (exchange->getName().empty()) throw qpid::Exception("Exchange must be specified.");
+
+ qpid::types::Variant::Map unused;
+ policy.populate(properties, unused);
+
+ qpid::management::ManagementAgent* agent = broker.getManagementAgent();
+ if (agent != 0) {
+ topic = _qmf::Topic::shared_ptr(new _qmf::Topic(agent, this, name, exchange->GetManagementObject()->getObjectId(), durable));
+ topic->set_properties(policy.asMap());
+ agent->addObject(topic);
+ }
+}
+
+bool Topic::isDurable() const
+{
+ return durable;
+}
+
+Topic::~Topic()
+{
+ if (topic != 0) topic->resourceDestroy();
+}
+
+boost::shared_ptr<qpid::management::ManagementObject> Topic::GetManagementObject() const
+{
+ return topic;
+}
+
+const QueueSettings& Topic::getPolicy() const
+{
+ return policy;
+}
+boost::shared_ptr<Exchange> Topic::getExchange()
+{
+ return exchange;
+}
+const std::string& Topic::getName() const
+{
+ return name;
+}
+
+boost::shared_ptr<Topic> TopicRegistry::createTopic(Broker& broker, const std::string& name, const qpid::types::Variant::Map& properties)
+{
+ boost::shared_ptr<Topic> topic(new Topic(broker, name, properties));
+ add(topic);
+ return topic;
+}
+
+bool TopicRegistry::createObject(Broker& broker, const std::string& type, const std::string& name, const qpid::types::Variant::Map& props,
+ const std::string& /*userId*/, const std::string& /*connectionId*/)
+{
+ if (type == TOPIC) {
+ boost::shared_ptr<Topic> topic = createTopic(broker, name, props);
+ if (topic->isDurable()) broker.getStore().create(*topic);
+ return true;
+ } else {
+ return false;
+ }
+}
+
+bool TopicRegistry::deleteObject(Broker& broker, const std::string& type, const std::string& name, const qpid::types::Variant::Map& /*properties*/,
+ const std::string& /*userId*/, const std::string& /*connectionId*/)
+{
+ if (type == TOPIC) {
+ boost::shared_ptr<Topic> topic = remove(name);
+ if (topic->isDurable()) broker.getStore().destroy(*topic);
+ return true;
+ } else {
+ return false;
+ }
+}
+
+bool TopicRegistry::recoverObject(Broker& broker, const std::string& type, const std::string& name, const qpid::types::Variant::Map& properties,
+ uint64_t persistenceId)
+{
+ if (type == TOPIC) {
+ boost::shared_ptr<Topic> topic = createTopic(broker, name, properties);
+ topic->setPersistenceId(persistenceId);
+ return true;
+ } else {
+ return false;
+ }
+}
+
+bool TopicRegistry::add(boost::shared_ptr<Topic> topic)
+{
+ qpid::sys::Mutex::ScopedLock l(lock);
+ Topics::const_iterator i = topics.find(topic->getName());
+ if (i == topics.end()) {
+ topics.insert(Topics::value_type(topic->getName(), topic));
+ return true;
+ } else {
+ return false;
+ }
+
+}
+boost::shared_ptr<Topic> TopicRegistry::remove(const std::string& name)
+{
+ boost::shared_ptr<Topic> result;
+ qpid::sys::Mutex::ScopedLock l(lock);
+ Topics::iterator i = topics.find(name);
+ if (i != topics.end()) {
+ result = i->second;
+ topics.erase(i);
+ }
+ return result;
+}
+
+boost::shared_ptr<Topic> TopicRegistry::get(const std::string& name)
+{
+ qpid::sys::Mutex::ScopedLock l(lock);
+ Topics::const_iterator i = topics.find(name);
+ if (i == topics.end()) {
+ return boost::shared_ptr<Topic>();
+ } else {
+ return i->second;
+ }
+}
+
+}}} // namespace qpid::broker::amqp
Added: qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Topic.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Topic.h?rev=1497036&view=auto
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Topic.h (added)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Topic.h Wed Jun 26 18:32:47 2013
@@ -0,0 +1,88 @@
+#ifndef QPID_BROKER_AMQP_TOPIC_H
+#define QPID_BROKER_AMQP_TOPIC_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "qpid/broker/ObjectFactory.h"
+#include "qpid/broker/PersistableObject.h"
+#include "qpid/broker/QueueSettings.h"
+#include "qpid/sys/Mutex.h"
+#include "qpid/types/Variant.h"
+#include "qpid/management/Manageable.h"
+#include "qmf/org/apache/qpid/broker/Exchange.h"
+#include "qmf/org/apache/qpid/broker/Topic.h"
+#include <boost/shared_ptr.hpp>
+
+namespace qpid {
+namespace broker {
+class Broker;
+class Exchange;
+class QueueDepth;
+
+namespace amqp {
+
+/**
+ * A topic is a node supporting a pub-sub style. It is at present
+ * implemented by an exchange with an additional policy for handling
+ * subscription queues.
+ */
+class Topic : public PersistableObject, public management::Manageable
+{
+ public:
+ Topic(Broker&, const std::string& name, const qpid::types::Variant::Map& properties);
+ ~Topic();
+ const std::string& getName() const;
+ const QueueSettings& getPolicy() const;
+ boost::shared_ptr<Exchange> getExchange();
+ bool isDurable() const;
+ boost::shared_ptr<qpid::management::ManagementObject> GetManagementObject() const;
+ private:
+ std::string name;
+ bool durable;
+ boost::shared_ptr<Exchange> exchange;
+ QueueSettings policy;
+ qmf::org::apache::qpid::broker::Topic::shared_ptr topic;
+};
+
+class TopicRegistry : public ObjectFactory
+{
+ public:
+ bool createObject(Broker&, const std::string& type, const std::string& name, const qpid::types::Variant::Map& properties,
+ const std::string& userId, const std::string& connectionId);
+ bool deleteObject(Broker&, const std::string& type, const std::string& name, const qpid::types::Variant::Map& properties,
+ const std::string& userId, const std::string& connectionId);
+ bool recoverObject(Broker&, const std::string& type, const std::string& name, const qpid::types::Variant::Map& properties,
+ uint64_t persistenceId);
+
+ bool add(boost::shared_ptr<Topic> topic);
+ boost::shared_ptr<Topic> remove(const std::string& name);
+ boost::shared_ptr<Topic> get(const std::string& name);
+ private:
+ typedef std::map<std::string, boost::shared_ptr<Topic> > Topics;
+ qpid::sys::Mutex lock;
+ Topics topics;
+
+ boost::shared_ptr<Topic> createTopic(Broker&, const std::string& name, const qpid::types::Variant::Map& properties);
+};
+
+}}} // namespace qpid::broker::amqp
+
+#endif /*!QPID_BROKER_AMQP_TOPIC_H*/
Modified: qpid/trunk/qpid/specs/management-schema.xml
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/specs/management-schema.xml?rev=1497036&r1=1497035&r2=1497036&view=diff
==============================================================================
--- qpid/trunk/qpid/specs/management-schema.xml (original)
+++ qpid/trunk/qpid/specs/management-schema.xml Wed Jun 26 18:32:47 2013
@@ -415,7 +415,7 @@
</class>
<!--
===============================================================
- Domain
+ AMQP 1.0 Domain
===============================================================
-->
<class name="Domain">
@@ -426,6 +426,18 @@
<property name="username" type="sstr" access="RO"/>
<property name="password" type="sstr" access="RO"/>
</class>
+ <!--
+ ===============================================================
+ AMQP 1.0 Topic
+ ===============================================================
+ -->
+ <class name="Topic">
+ <property name="name" type="sstr" access="RC" index="y"/>
+ <property name="exchangeRef" type="objId" references="Exchange" access="RC"/>
+ <property name="durable" type="bool" access="RC"/>
+ <property name="properties" type="map" access="RO"/>
+ </class>
+
<!--
===============================================================
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org