You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by gs...@apache.org on 2013/06/26 20:32:48 UTC

svn commit: r1497036 - in /qpid/trunk/qpid: cpp/src/ cpp/src/qpid/amqp_0_10/ cpp/src/qpid/broker/ cpp/src/qpid/broker/amqp/ specs/

Author: gsim
Date: Wed Jun 26 18:32:47 2013
New Revision: 1497036

URL: http://svn.apache.org/r1497036
Log:
QPID-4919: Allow definition of topics in AMQP 1.0, composed of an exchange and subscription queue configuration

Added:
    qpid/trunk/qpid/cpp/src/qpid/amqp_0_10/CodecsInternal.h
    qpid/trunk/qpid/cpp/src/qpid/broker/PersistableObject.cpp
    qpid/trunk/qpid/cpp/src/qpid/broker/PersistableObject.h
    qpid/trunk/qpid/cpp/src/qpid/broker/amqp/BrokerContext.cpp
    qpid/trunk/qpid/cpp/src/qpid/broker/amqp/BrokerContext.h
    qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Topic.cpp
    qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Topic.h
Modified:
    qpid/trunk/qpid/cpp/src/CMakeLists.txt
    qpid/trunk/qpid/cpp/src/Makefile.am
    qpid/trunk/qpid/cpp/src/amqp.cmake
    qpid/trunk/qpid/cpp/src/qpid/amqp_0_10/Codecs.cpp
    qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp
    qpid/trunk/qpid/cpp/src/qpid/broker/ObjectFactory.cpp
    qpid/trunk/qpid/cpp/src/qpid/broker/ObjectFactory.h
    qpid/trunk/qpid/cpp/src/qpid/broker/RecoveryManagerImpl.cpp
    qpid/trunk/qpid/cpp/src/qpid/broker/RecoveryManagerImpl.h
    qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Connection.cpp
    qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Connection.h
    qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Domain.cpp
    qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Domain.h
    qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Interconnect.cpp
    qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Interconnect.h
    qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Interconnects.cpp
    qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Interconnects.h
    qpid/trunk/qpid/cpp/src/qpid/broker/amqp/ProtocolPlugin.cpp
    qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Sasl.cpp
    qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Sasl.h
    qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Session.cpp
    qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Session.h
    qpid/trunk/qpid/specs/management-schema.xml

Modified: qpid/trunk/qpid/cpp/src/CMakeLists.txt
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/CMakeLists.txt?rev=1497036&r1=1497035&r2=1497036&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/CMakeLists.txt (original)
+++ qpid/trunk/qpid/cpp/src/CMakeLists.txt Wed Jun 26 18:32:47 2013
@@ -1262,6 +1262,7 @@ set (qpidbroker_SOURCES
      qpid/broker/FifoDistributor.cpp
      qpid/broker/MessageGroupManager.cpp
      qpid/broker/PersistableMessage.cpp
+     qpid/broker/PersistableObject.cpp
      qpid/broker/Bridge.cpp
      qpid/broker/amqp_0_10/Connection.cpp
      qpid/broker/ConnectionHandler.cpp

Modified: qpid/trunk/qpid/cpp/src/Makefile.am
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/Makefile.am?rev=1497036&r1=1497035&r2=1497036&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/Makefile.am (original)
+++ qpid/trunk/qpid/cpp/src/Makefile.am Wed Jun 26 18:32:47 2013
@@ -517,6 +517,7 @@ libqpidcommon_la_SOURCES +=			\
   qpid/sys/Waitable.h				\
   qpid/sys/uuid.h				\
   qpid/sys/unordered_map.h			\
+  qpid/amqp_0_10/CodecsInternal.h		\
   qpid/amqp_0_10/Codecs.cpp			\
   qpid/amqp/CharSequence.h                      \
   qpid/amqp/CharSequence.cpp                    \
@@ -691,6 +692,8 @@ libqpidbroker_la_SOURCES = \
   qpid/broker/PersistableExchange.h \
   qpid/broker/PersistableMessage.cpp \
   qpid/broker/PersistableMessage.h \
+  qpid/broker/PersistableObject.h \
+  qpid/broker/PersistableObject.cpp \
   qpid/broker/PersistableQueue.h \
   qpid/broker/Queue.cpp \
   qpid/broker/Queue.h \
@@ -805,6 +808,8 @@ amqp_la_LIBADD = libqpidcommon.la
 amqp_la_SOURCES = \
   qpid/broker/amqp/Authorise.h \
   qpid/broker/amqp/Authorise.cpp \
+  qpid/broker/amqp/BrokerContext.h \
+  qpid/broker/amqp/BrokerContext.cpp \
   qpid/broker/amqp/Connection.h \
   qpid/broker/amqp/Connection.cpp \
   qpid/broker/amqp/DataReader.h \
@@ -846,6 +851,8 @@ amqp_la_SOURCES = \
   qpid/broker/amqp/SaslClient.cpp \
   qpid/broker/amqp/Session.h \
   qpid/broker/amqp/Session.cpp \
+  qpid/broker/amqp/Topic.h \
+  qpid/broker/amqp/Topic.cpp \
   qpid/broker/amqp/Translation.h \
   qpid/broker/amqp/Translation.cpp
 

Modified: qpid/trunk/qpid/cpp/src/amqp.cmake
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/amqp.cmake?rev=1497036&r1=1497035&r2=1497036&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/amqp.cmake (original)
+++ qpid/trunk/qpid/cpp/src/amqp.cmake Wed Jun 26 18:32:47 2013
@@ -55,6 +55,8 @@ if (BUILD_AMQP)
     set (amqp_SOURCES
          qpid/broker/amqp/Authorise.h
          qpid/broker/amqp/Authorise.cpp
+         qpid/broker/amqp/BrokerContext.h
+         qpid/broker/amqp/BrokerContext.cpp
          qpid/broker/amqp/Connection.h
          qpid/broker/amqp/Connection.cpp
          qpid/broker/amqp/DataReader.h
@@ -96,6 +98,8 @@ if (BUILD_AMQP)
          qpid/broker/amqp/SaslClient.cpp
          qpid/broker/amqp/Session.h
          qpid/broker/amqp/Session.cpp
+         qpid/broker/amqp/Topic.h
+         qpid/broker/amqp/Topic.cpp
          qpid/broker/amqp/Translation.h
          qpid/broker/amqp/Translation.cpp
         )

Modified: qpid/trunk/qpid/cpp/src/qpid/amqp_0_10/Codecs.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/amqp_0_10/Codecs.cpp?rev=1497036&r1=1497035&r2=1497036&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/amqp_0_10/Codecs.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/amqp_0_10/Codecs.cpp Wed Jun 26 18:32:47 2013
@@ -19,6 +19,7 @@
  *
  */
 #include "qpid/amqp_0_10/Codecs.h"
+#include "qpid/amqp_0_10/CodecsInternal.h"
 #include "qpid/framing/Array.h"
 #include "qpid/framing/Buffer.h"
 #include "qpid/framing/FieldTable.h"
@@ -188,10 +189,6 @@ template <class T, class U, class F> voi
     convert(t, value, f);
 }
 
-uint32_t encodedSize(const Variant::Map& values);
-uint32_t encodedSize(const Variant::List& values);
-uint32_t encodedSize(const std::string& value);
-
 uint32_t encodedSize(const Variant& value)
 {
     switch (value.getType()) {
@@ -290,9 +287,6 @@ void encode(const std::string& value, co
     }
 }
 
-void encode(const Variant::Map& map, uint32_t len, qpid::framing::Buffer& buffer);
-void encode(const Variant::List& list, uint32_t len, qpid::framing::Buffer& buffer);
-
 void encode(const Variant& value, qpid::framing::Buffer& buffer)
 {
     switch (value.getType()) {

Added: qpid/trunk/qpid/cpp/src/qpid/amqp_0_10/CodecsInternal.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/amqp_0_10/CodecsInternal.h?rev=1497036&view=auto
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/amqp_0_10/CodecsInternal.h (added)
+++ qpid/trunk/qpid/cpp/src/qpid/amqp_0_10/CodecsInternal.h Wed Jun 26 18:32:47 2013
@@ -0,0 +1,41 @@
+#ifndef QPID_AMQP_0_10_CODECSINTERNAL_H
+#define QPID_AMQP_0_10_CODECSINTERNAL_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "qpid/types/Variant.h"
+
+namespace qpid {
+namespace framing {
+class Buffer;
+}
+namespace amqp_0_10 {
+void encode(const qpid::types::Variant::Map& map, uint32_t len, qpid::framing::Buffer& buffer);
+void encode(const qpid::types::Variant::List& list, uint32_t len, qpid::framing::Buffer& buffer);
+void encode(const qpid::types::Variant& value, qpid::framing::Buffer& buffer);
+void encode(const std::string& value, const std::string& encoding, qpid::framing::Buffer& buffer);
+uint32_t encodedSize(const qpid::types::Variant::Map& values);
+uint32_t encodedSize(const qpid::types::Variant::List& values);
+uint32_t encodedSize(const std::string& value);
+
+}} // namespace qpid::amqp_0_10
+
+#endif  /*!QPID_AMQP_0_10_CODECSINTERNAL_H*/

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp?rev=1497036&r1=1497035&r2=1497036&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp Wed Jun 26 18:32:47 2013
@@ -35,6 +35,7 @@
 #include "qpid/broker/TopicExchange.h"
 #include "qpid/broker/Link.h"
 #include "qpid/broker/ExpiryPolicy.h"
+#include "qpid/broker/PersistableObject.h"
 #include "qpid/broker/QueueFlowLimit.h"
 #include "qpid/broker/QueueSettings.h"
 #include "qpid/broker/MessageGroupManager.h"
@@ -295,9 +296,10 @@ Broker::Broker(const Broker::Options& co
     // Default exchnge is not replicated.
     exchanges.declare(empty, DirectExchange::typeName, false, noReplicateArgs());
 
+    RecoveredObjects objects;
     if (store.get() != 0) {
         RecoveryManagerImpl recoverer(
-            queues, exchanges, links, dtxManager, protocolRegistry);
+            queues, exchanges, links, dtxManager, protocolRegistry, objects);
         recoveryInProgress = true;
         store->recover(recoverer);
         recoveryInProgress = false;
@@ -349,6 +351,8 @@ Broker::Broker(const Broker::Options& co
 
     // Initialize plugins
     Plugin::initializeAll(*this);
+    //recover any objects via object factories
+    objects.restore(*this);
 
     if(conf.enableMgmt) {
         if (getAcl()) {

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/ObjectFactory.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/ObjectFactory.cpp?rev=1497036&r1=1497035&r2=1497036&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/ObjectFactory.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/ObjectFactory.cpp Wed Jun 26 18:32:47 2013
@@ -44,6 +44,16 @@ bool ObjectFactoryRegistry::deleteObject
     return false;
 }
 
+bool ObjectFactoryRegistry::recoverObject(Broker& broker, const std::string& type, const std::string& name, const qpid::types::Variant::Map& properties,
+                                          uint64_t persistenceId)
+{
+    for (Factories::iterator i = factories.begin(); i != factories.end(); ++i)
+    {
+        if ((*i)->recoverObject(broker, type, name, properties, persistenceId)) return true;
+    }
+    return false;
+}
+
 ObjectFactoryRegistry::~ObjectFactoryRegistry()
 {
     for (Factories::iterator i = factories.begin(); i != factories.end(); ++i)

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/ObjectFactory.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/ObjectFactory.h?rev=1497036&r1=1497035&r2=1497036&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/ObjectFactory.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/ObjectFactory.h Wed Jun 26 18:32:47 2013
@@ -41,6 +41,7 @@ class ObjectFactory
                               const std::string& userId, const std::string& connectionId) = 0;
     virtual bool deleteObject(Broker&, const std::string& type, const std::string& name, const qpid::types::Variant::Map& properties,
                               const std::string& userId, const std::string& connectionId) = 0;
+    virtual bool recoverObject(Broker&, const std::string& type, const std::string& name, const qpid::types::Variant::Map& properties,  uint64_t persistenceId) = 0;
     virtual ~ObjectFactory() {}
   private:
 };
@@ -52,6 +53,7 @@ class ObjectFactoryRegistry : public Obj
                       const std::string& userId, const std::string& connectionId);
     bool deleteObject(Broker&, const std::string& type, const std::string& name, const qpid::types::Variant::Map& properties,
                       const std::string& userId, const std::string& connectionId);
+    bool recoverObject(Broker&, const std::string& type, const std::string& name, const qpid::types::Variant::Map& properties,  uint64_t persistenceId);
 
     ~ObjectFactoryRegistry();
     void add(ObjectFactory*);

Added: qpid/trunk/qpid/cpp/src/qpid/broker/PersistableObject.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/PersistableObject.cpp?rev=1497036&view=auto
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/PersistableObject.cpp (added)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/PersistableObject.cpp Wed Jun 26 18:32:47 2013
@@ -0,0 +1,88 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "PersistableObject.h"
+#include "Broker.h"
+#include "qpid/amqp_0_10/Codecs.h"
+#include "qpid/amqp_0_10/CodecsInternal.h"
+#include "qpid/log/Statement.h"
+
+namespace qpid {
+namespace broker {
+namespace {
+const std::string UTF8("utf8");
+}
+PersistableObject::PersistableObject(const std::string& n, const std::string& t, const qpid::types::Variant::Map p) : name(n), type(t), properties(p), id(0) {}
+PersistableObject::PersistableObject() : id(0) {}
+PersistableObject::~PersistableObject() {}
+const std::string& PersistableObject::getName() const { return name; }
+void PersistableObject::setPersistenceId(uint64_t i) const { id = i; }
+uint64_t PersistableObject::getPersistenceId() const { return id; }
+void PersistableObject::encode(framing::Buffer& buffer) const
+{
+    buffer.putShortString(type);
+    buffer.putMediumString(name);
+    qpid::amqp_0_10::encode(properties, qpid::amqp_0_10::encodedSize(properties), buffer);
+}
+uint32_t PersistableObject::encodedSize() const
+{
+    return type.size()+1 + name.size()+2 + qpid::amqp_0_10::encodedSize(properties);
+}
+void PersistableObject::decode(framing::Buffer& buffer)
+{
+    buffer.getShortString(type);
+    buffer.getMediumString(name);
+    qpid::framing::FieldTable ft;
+    buffer.get(ft);
+    qpid::amqp_0_10::translate(ft, properties);
+}
+bool PersistableObject::recover(Broker& broker)
+{
+    return broker.getObjectFactoryRegistry().recoverObject(broker, type, name, properties, id);
+}
+
+namespace {
+class RecoverableObject : public RecoverableConfig
+{
+  public:
+    RecoverableObject(boost::shared_ptr<PersistableObject> o) : object(o) {}
+    void setPersistenceId(uint64_t id) { object->setPersistenceId(id); }
+  private:
+    boost::shared_ptr<PersistableObject> object;
+};
+}
+boost::shared_ptr<RecoverableConfig> RecoveredObjects::recover(framing::Buffer& buffer)
+{
+    boost::shared_ptr<PersistableObject> object(new PersistableObject());
+    object->decode(buffer);
+    objects.push_back(object);
+    return boost::shared_ptr<RecoverableConfig>(new RecoverableObject(object));
+}
+void RecoveredObjects::restore(Broker& broker)
+{
+    //recover objects created through ObjectFactory
+    for (Objects::iterator i = objects.begin(); i != objects.end(); ++i) {
+        if (!(*i)->recover(broker)) {
+            QPID_LOG(warning, "Failed to recover object " << (*i)->name << " of type " << (*i)->type);
+        }
+    }
+}
+
+}} // namespace qpid::broker

Added: qpid/trunk/qpid/cpp/src/qpid/broker/PersistableObject.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/PersistableObject.h?rev=1497036&view=auto
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/PersistableObject.h (added)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/PersistableObject.h Wed Jun 26 18:32:47 2013
@@ -0,0 +1,71 @@
+#ifndef QPID_BROKER_PERSISTABLEOBJECT_H
+#define QPID_BROKER_PERSISTABLEOBJECT_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "PersistableConfig.h"
+#include "qpid/types/Variant.h"
+#include <vector>
+#include <boost/shared_ptr.hpp>
+
+namespace qpid {
+namespace broker {
+class Broker;
+class RecoverableConfig;
+/**
+ * Generic persistence support for objects created through the brokers
+ * create method.
+ */
+class PersistableObject : public PersistableConfig
+{
+  public:
+    PersistableObject(const std::string& name, const std::string& type, const qpid::types::Variant::Map properties);
+    virtual ~PersistableObject();
+    const std::string& getName() const;
+    void setPersistenceId(uint64_t id) const;
+    uint64_t getPersistenceId() const;
+    void encode(framing::Buffer& buffer) const;
+    uint32_t encodedSize() const;
+  friend class RecoveredObjects;
+  private:
+    std::string name;
+    std::string type;
+    qpid::types::Variant::Map properties;
+    mutable uint64_t id;
+
+    PersistableObject();
+    void decode(framing::Buffer& buffer);
+    bool recover(Broker&);
+};
+
+class RecoveredObjects
+{
+  public:
+    boost::shared_ptr<RecoverableConfig> recover(framing::Buffer&);
+    void restore(Broker&);
+  private:
+    typedef std::vector<boost::shared_ptr<PersistableObject> > Objects;
+    Objects objects;
+};
+
+}} // namespace qpid::broker
+
+#endif  /*!QPID_BROKER_PERSISTABLEOBJECT_H*/

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/RecoveryManagerImpl.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/RecoveryManagerImpl.cpp?rev=1497036&r1=1497035&r2=1497036&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/RecoveryManagerImpl.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/RecoveryManagerImpl.cpp Wed Jun 26 18:32:47 2013
@@ -22,6 +22,7 @@
 
 #include "qpid/broker/Message.h"
 #include "qpid/broker/PersistableMessage.h"
+#include "qpid/broker/PersistableObject.h"
 #include "qpid/broker/Queue.h"
 #include "qpid/broker/Link.h"
 #include "qpid/broker/Bridge.h"
@@ -40,8 +41,8 @@ namespace qpid {
 namespace broker {
 
 RecoveryManagerImpl::RecoveryManagerImpl(QueueRegistry& _queues, ExchangeRegistry& _exchanges, LinkRegistry& _links,
-                                         DtxManager& _dtxMgr, ProtocolRegistry& p)
-    : queues(_queues), exchanges(_exchanges), links(_links), dtxMgr(_dtxMgr), protocols(p) {}
+                                         DtxManager& _dtxMgr, ProtocolRegistry& p, RecoveredObjects& o)
+    : queues(_queues), exchanges(_exchanges), links(_links), dtxMgr(_dtxMgr), protocols(p), objects(o) {}
 
 RecoveryManagerImpl::~RecoveryManagerImpl() {}
 
@@ -145,7 +146,7 @@ RecoverableConfig::shared_ptr RecoveryMa
     else if (Bridge::isEncodedBridge(kind))
         return RecoverableConfig::shared_ptr(new RecoverableConfigImpl(Bridge::decode (links, buffer)));
 
-    return RecoverableConfig::shared_ptr(); // TODO: raise an exception instead
+    return objects.recover(buffer);
 }
 
 void RecoveryManagerImpl::recoveryComplete()

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/RecoveryManagerImpl.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/RecoveryManagerImpl.h?rev=1497036&r1=1497035&r2=1497036&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/RecoveryManagerImpl.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/RecoveryManagerImpl.h Wed Jun 26 18:32:47 2013
@@ -22,6 +22,7 @@
 #define _RecoveryManagerImpl_
 
 #include <list>
+#include <vector>
 #include "qpid/broker/DtxManager.h"
 #include "qpid/broker/ExchangeRegistry.h"
 #include "qpid/broker/QueueRegistry.h"
@@ -30,17 +31,21 @@
 
 namespace qpid {
 namespace broker {
+class Broker;
+class PersistableObject;
 class ProtocolRegistry;
+class RecoveredObjects;
 
-    class RecoveryManagerImpl : public RecoveryManager{
+    class RecoveryManagerImpl : public RecoveryManager {
         QueueRegistry& queues;
         ExchangeRegistry& exchanges;
         LinkRegistry& links;
         DtxManager& dtxMgr;
         ProtocolRegistry& protocols;
+        RecoveredObjects& objects;
     public:
         RecoveryManagerImpl(QueueRegistry& queues, ExchangeRegistry& exchanges, LinkRegistry& links,
-                            DtxManager& dtxMgr, ProtocolRegistry&);
+                            DtxManager& dtxMgr, ProtocolRegistry&, RecoveredObjects&);
         ~RecoveryManagerImpl();
 
         RecoverableExchange::shared_ptr recoverExchange(framing::Buffer& buffer);

Added: qpid/trunk/qpid/cpp/src/qpid/broker/amqp/BrokerContext.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/amqp/BrokerContext.cpp?rev=1497036&view=auto
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/amqp/BrokerContext.cpp (added)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/amqp/BrokerContext.cpp Wed Jun 26 18:32:47 2013
@@ -0,0 +1,32 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "BrokerContext.h"
+
+namespace qpid {
+namespace broker {
+namespace amqp {
+BrokerContext::BrokerContext(Broker& b, Interconnects& i, TopicRegistry& t, const std::string& d) : broker(b), interconnects(i), topics(t), domain(d) {}
+BrokerContext::BrokerContext(BrokerContext& c) : broker(c.broker), interconnects(c.interconnects), topics(c.topics), domain(c.domain) {}
+Broker& BrokerContext::getBroker() { return broker; }
+Interconnects& BrokerContext::getInterconnects() { return interconnects; }
+TopicRegistry& BrokerContext::getTopics() { return topics; }
+std::string BrokerContext::getDomain() { return domain; }
+}}} // namespace qpid::broker::amqp

Added: qpid/trunk/qpid/cpp/src/qpid/broker/amqp/BrokerContext.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/amqp/BrokerContext.h?rev=1497036&view=auto
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/amqp/BrokerContext.h (added)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/amqp/BrokerContext.h Wed Jun 26 18:32:47 2013
@@ -0,0 +1,52 @@
+#ifndef QPID_BROKER_AMQP_BROKERCONTEXT_H
+#define QPID_BROKER_AMQP_BROKERCONTEXT_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include <string>
+
+namespace qpid {
+namespace broker {
+class Broker;
+namespace amqp {
+class Interconnects;
+class TopicRegistry;
+/**
+ * Context providing access to broker scoped entities.
+ */
+class BrokerContext
+{
+  public:
+    BrokerContext(Broker&, Interconnects&, TopicRegistry&, const std::string&);
+    BrokerContext(BrokerContext&);
+    Broker& getBroker();
+    Interconnects& getInterconnects();
+    TopicRegistry& getTopics();
+    std::string getDomain();
+  private:
+    Broker& broker;
+    Interconnects& interconnects;
+    TopicRegistry& topics;
+    std::string domain;
+};
+}}} // namespace qpid::broker::amqp
+
+#endif  /*!QPID_BROKER_AMQP_BROKERCONTEXT_H*/

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Connection.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Connection.cpp?rev=1497036&r1=1497035&r2=1497036&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Connection.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Connection.cpp Wed Jun 26 18:32:47 2013
@@ -39,11 +39,12 @@ extern "C" {
 namespace qpid {
 namespace broker {
 namespace amqp {
-Connection::Connection(qpid::sys::OutputControl& o, const std::string& i, qpid::broker::Broker& b, Interconnects& interconnects_, bool saslInUse, const std::string& d)
-    : ManagedConnection(b, i),
+
+Connection::Connection(qpid::sys::OutputControl& o, const std::string& i, BrokerContext& b, bool saslInUse)
+    : BrokerContext(b), ManagedConnection(getBroker(), i),
       connection(pn_connection()),
       transport(pn_transport()),
-      out(o), id(i), broker(b), haveOutput(true), interconnects(interconnects_), domain(d)
+      out(o), id(i), haveOutput(true)
 {
     if (pn_transport_bind(transport, connection)) {
         //error
@@ -54,7 +55,7 @@ Connection::Connection(qpid::sys::Output
     QPID_LOG_TEST_CAT(trace, protocol, enableTrace);
     if (enableTrace) pn_transport_trace(transport, PN_TRACE_FRM);
 
-    broker.getConnectionObservers().connection(*this);
+    getBroker().getConnectionObservers().connection(*this);
     if (!saslInUse) {
         //feed in a dummy AMQP 1.0 header as engine expects one, but
         //we already read it (if sasl is in use we read the sasl
@@ -72,15 +73,11 @@ Connection::Connection(qpid::sys::Output
 
 Connection::~Connection()
 {
-    broker.getConnectionObservers().closed(*this);
+    getBroker().getConnectionObservers().closed(*this);
     pn_transport_free(transport);
     pn_connection_free(connection);
 }
 
-Interconnects& Connection::getInterconnects()
-{
-    return interconnects;
-}
 pn_transport_t* Connection::getTransport()
 {
     return transport;
@@ -173,11 +170,11 @@ void Connection::open()
 {
     readPeerProperties();
 
-    pn_connection_set_container(connection, broker.getFederationTag().c_str());
+    pn_connection_set_container(connection, getBroker().getFederationTag().c_str());
     pn_connection_open(connection);
     out.connectionEstablished();
     opened();
-    broker.getConnectionObservers().opened(*this);
+    getBroker().getConnectionObservers().opened(*this);
 }
 
 void Connection::readPeerProperties()
@@ -227,7 +224,7 @@ void Connection::process()
     for (pn_session_t* s = pn_session_head(connection, REQUIRES_OPEN); s; s = pn_session_next(s, REQUIRES_OPEN)) {
         QPID_LOG_CAT(debug, model, id << " session begun");
         pn_session_open(s);
-        boost::shared_ptr<Session> ssn(new Session(s, broker, *this, out));
+        boost::shared_ptr<Session> ssn(new Session(s, *this, out));
         sessions[s] = ssn;
     }
     for (pn_link_t* l = pn_link_head(connection, REQUIRES_OPEN); l; l = pn_link_next(l, REQUIRES_OPEN)) {
@@ -323,11 +320,6 @@ std::string Connection::getError()
     return text.str();
 }
 
-std::string Connection::getDomain() const
-{
-    return domain;
-}
-
 void Connection::abort()
 {
     out.abort();
@@ -336,7 +328,7 @@ void Connection::abort()
 void Connection::setUserId(const std::string& user)
 {
     ManagedConnection::setUserId(user);
-    AclModule* acl = broker.getAcl();
+    AclModule* acl = getBroker().getAcl();
     if (acl && !acl->approveConnection(*this))
     {
         throw Exception(qpid::amqp::error_conditions::RESOURCE_LIMIT_EXCEEDED, "User connection denied by configured limit");

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Connection.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Connection.h?rev=1497036&r1=1497035&r2=1497036&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Connection.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Connection.h Wed Jun 26 18:32:47 2013
@@ -22,6 +22,7 @@
  *
  */
 #include "qpid/sys/ConnectionCodec.h"
+#include "qpid/broker/amqp/BrokerContext.h"
 #include "qpid/broker/amqp/ManagedConnection.h"
 #include <map>
 #include <boost/shared_ptr.hpp>
@@ -42,10 +43,10 @@ class Session;
 /**
  * AMQP 1.0 protocol support for broker
  */
-class Connection : public sys::ConnectionCodec, public ManagedConnection
+class Connection : public BrokerContext, public sys::ConnectionCodec, public ManagedConnection
 {
   public:
-    Connection(qpid::sys::OutputControl& out, const std::string& id, qpid::broker::Broker& broker, Interconnects&, bool saslInUse, const std::string& domain);
+    Connection(qpid::sys::OutputControl& out, const std::string& id, BrokerContext& context, bool saslInUse);
     virtual ~Connection();
     size_t decode(const char* buffer, size_t size);
     virtual size_t encode(char* buffer, size_t size);
@@ -56,22 +57,16 @@ class Connection : public sys::Connectio
 
     framing::ProtocolVersion getVersion() const;
     pn_transport_t* getTransport();
-    Interconnects& getInterconnects();
-    std::string getDomain() const;
     void setUserId(const std::string&);
     void abort();
-
   protected:
     typedef std::map<pn_session_t*, boost::shared_ptr<Session> > Sessions;
     pn_connection_t* connection;
     pn_transport_t* transport;
     qpid::sys::OutputControl& out;
     const std::string id;
-    qpid::broker::Broker& broker;
     bool haveOutput;
     Sessions sessions;
-    Interconnects& interconnects;
-    std::string domain;
 
     virtual void process();
     std::string getError();

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Domain.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Domain.cpp?rev=1497036&r1=1497035&r2=1497036&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Domain.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Domain.cpp Wed Jun 26 18:32:47 2013
@@ -50,6 +50,7 @@ const std::string SASL_MECHANISMS("sasl_
 const std::string SASL_SERVICE("sasl_service");
 const std::string MIN_SSF("min_ssf");
 const std::string MAX_SSF("max_ssf");
+const std::string DURABLE("durable");
 class Wrapper : public qpid::sys::ConnectionCodec
 {
   public:
@@ -119,14 +120,23 @@ bool get(qpid::Url& url, const std::stri
         return false;
     }
 }
+bool get(const std::string& key, const qpid::types::Variant::Map& map)
+{
+    qpid::types::Variant::Map::const_iterator i = map.find(key);
+    if (i != map.end()) {
+        return i->second.asBool();
+    } else {
+        return false;
+    }
+}
 }
 
-class InterconnectFactory : public qpid::sys::ConnectionCodec::Factory, public boost::enable_shared_from_this<InterconnectFactory>
+class InterconnectFactory : public BrokerContext, public qpid::sys::ConnectionCodec::Factory, public boost::enable_shared_from_this<InterconnectFactory>
 {
   public:
-    InterconnectFactory(bool incoming, const std::string& name, const qpid::types::Variant::Map& properties, Domain&, Broker&, Interconnects&);
+    InterconnectFactory(bool incoming, const std::string& name, const qpid::types::Variant::Map& properties, Domain&, BrokerContext&);
     InterconnectFactory(bool incoming, const std::string& name, const std::string& source, const std::string& target,
-                        Domain&, Broker&, Interconnects&, boost::shared_ptr<Relay>);
+                        Domain&, BrokerContext&, boost::shared_ptr<Relay>);
     qpid::sys::ConnectionCodec* create(framing::ProtocolVersion, qpid::sys::OutputControl&, const std::string&, const qpid::sys::SecuritySettings&);
     qpid::sys::ConnectionCodec* create(qpid::sys::OutputControl&, const std::string&, const qpid::sys::SecuritySettings&);
     bool connect();
@@ -140,14 +150,12 @@ class InterconnectFactory : public qpid:
     qpid::Url::iterator next;
     std::string hostname;
     Domain& domain;
-    Broker& broker;
-    Interconnects& registry;
     qpid::Address address;
     boost::shared_ptr<Relay> relay;
 };
 
-InterconnectFactory::InterconnectFactory(bool i, const std::string& n, const qpid::types::Variant::Map& properties, Domain& d, Broker& b, Interconnects& r)
-    : incoming(i), name(n), url(d.getUrl()), domain(d), broker(b), registry(r)
+InterconnectFactory::InterconnectFactory(bool i, const std::string& n, const qpid::types::Variant::Map& properties, Domain& d, BrokerContext& c)
+    : BrokerContext(c), incoming(i), name(n), url(d.getUrl()), domain(d)
 {
     get(source, SOURCE, properties);
     get(target, TARGET, properties);
@@ -155,8 +163,8 @@ InterconnectFactory::InterconnectFactory
 }
 
 InterconnectFactory::InterconnectFactory(bool i, const std::string& n, const std::string& source_, const std::string& target_,
-                                         Domain& d, Broker& b, Interconnects& r, boost::shared_ptr<Relay> relay_)
-    : incoming(i), name(n), source(source_), target(target_), url(d.getUrl()), domain(d), broker(b), registry(r), relay(relay_)
+                                         Domain& d, BrokerContext& c, boost::shared_ptr<Relay> relay_)
+    : BrokerContext(c), incoming(i), name(n), source(source_), target(target_), url(d.getUrl()), domain(d), relay(relay_)
 {
     next = url.begin();
 }
@@ -168,8 +176,8 @@ qpid::sys::ConnectionCodec* Interconnect
 qpid::sys::ConnectionCodec* InterconnectFactory::create(qpid::sys::OutputControl& out, const std::string& id, const qpid::sys::SecuritySettings& t)
 {
     bool useSasl = domain.getMechanisms() != NONE;
-    boost::shared_ptr<Interconnect> connection(new Interconnect(out, id, broker, useSasl, incoming, name, source, target, domain, registry));
-    if (!relay) registry.add(name, connection);
+    boost::shared_ptr<Interconnect> connection(new Interconnect(out, id, *this, useSasl, incoming, name, source, target, domain));
+    if (!relay) getInterconnects().add(name, connection);
     else connection->setRelay(relay);
 
     std::auto_ptr<qpid::sys::ConnectionCodec> codec;
@@ -191,7 +199,7 @@ bool InterconnectFactory::connect()
     next++;
     hostname = address.host;
     QPID_LOG (info, "Inter-broker connection initiated (" << address << ")");
-    broker.connect(name, address.host, boost::lexical_cast<std::string>(address.port), address.protocol, this, boost::bind(&InterconnectFactory::failed, this, _1, _2));
+    getBroker().connect(name, address.host, boost::lexical_cast<std::string>(address.port), address.protocol, this, boost::bind(&InterconnectFactory::failed, this, _1, _2));
     return true;
 }
 
@@ -204,7 +212,7 @@ void InterconnectFactory::failed(int, st
 }
 
 Domain::Domain(const std::string& n, const qpid::types::Variant::Map& properties, Broker& b)
-    : name(n), durable(false), broker(b), mechanisms("ANONYMOUS"), service(qpid::saslName), minSsf(0), maxSsf(0), agent(b.getManagementAgent())
+    : PersistableObject(n, "domain", properties), name(n), durable(get(DURABLE, properties)), broker(b), mechanisms("ANONYMOUS"), service(qpid::saslName), minSsf(0), maxSsf(0), agent(b.getManagementAgent())
 {
     if (!get(url, URL, properties)) {
         QPID_LOG(error, "No URL specified for domain " << name << "!");
@@ -212,7 +220,6 @@ Domain::Domain(const std::string& n, con
     } else {
         QPID_LOG(notice, "Created domain " << name << " with url " << url << " from " << properties);
     }
-    //TODO: durable
     get(username, USERNAME, properties);
     get(password, PASSWORD, properties);
     get(mechanisms, SASL_MECHANISMS, properties);
@@ -249,21 +256,26 @@ qpid::Url Domain::getUrl() const
     return url;
 }
 
+bool Domain::isDurable() const
+{
+    return durable;
+}
+
 std::auto_ptr<qpid::Sasl> Domain::sasl(const std::string& hostname)
 {
     return qpid::SaslFactory::getInstance().create(username, password, service, hostname, minSsf, maxSsf, false);
 }
 
-void Domain::connect(bool incoming, const std::string& name, const qpid::types::Variant::Map& properties, Interconnects& registry)
+void Domain::connect(bool incoming, const std::string& name, const qpid::types::Variant::Map& properties, BrokerContext& context)
 {
-    boost::shared_ptr<InterconnectFactory> factory(new InterconnectFactory(incoming, name, properties, *this, broker, registry));
+    boost::shared_ptr<InterconnectFactory> factory(new InterconnectFactory(incoming, name, properties, *this, context));
     factory->connect();
     addPending(factory);
 }
 
-void Domain::connect(bool incoming, const std::string& name, const std::string& source, const std::string& target, Interconnects& registry, boost::shared_ptr<Relay> relay)
+void Domain::connect(bool incoming, const std::string& name, const std::string& source, const std::string& target, BrokerContext& context, boost::shared_ptr<Relay> relay)
 {
-    boost::shared_ptr<InterconnectFactory> factory(new InterconnectFactory(incoming, name, source, target, *this, broker, registry, relay));
+    boost::shared_ptr<InterconnectFactory> factory(new InterconnectFactory(incoming, name, source, target, *this, context, relay));
     factory->connect();
     addPending(factory);
 }

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Domain.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Domain.h?rev=1497036&r1=1497035&r2=1497036&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Domain.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Domain.h Wed Jun 26 18:32:47 2013
@@ -25,6 +25,7 @@
 #include "qpid/types/Variant.h"
 #include "qpid/Url.h"
 #include "qpid/Version.h"
+#include "qpid/broker/PersistableObject.h"
 #include "qpid/management/Manageable.h"
 #include "qpid/sys/Mutex.h"
 #include "qmf/org/apache/qpid/broker/Domain.h"
@@ -42,19 +43,20 @@ namespace broker {
 class Broker;
 namespace amqp {
 class InterconnectFactory;
-class Interconnects;
+class BrokerContext;
 class Relay;
 
-class Domain : public qpid::management::Manageable
+class Domain : public PersistableObject, public qpid::management::Manageable
 {
   public:
     Domain(const std::string& name, const qpid::types::Variant::Map& properties, Broker&);
     ~Domain();
-    void connect(bool incoming, const std::string& name, const qpid::types::Variant::Map& properties, Interconnects&);
-    void connect(bool incoming, const std::string& name, const std::string& source, const std::string& target, Interconnects&, boost::shared_ptr<Relay>);
+    void connect(bool incoming, const std::string& name, const qpid::types::Variant::Map& properties, BrokerContext&);
+    void connect(bool incoming, const std::string& name, const std::string& source, const std::string& target, BrokerContext&, boost::shared_ptr<Relay>);
     std::auto_ptr<qpid::Sasl> sasl(const std::string& hostname);
     const std::string& getMechanisms() const;
     qpid::Url getUrl() const;
+    bool isDurable() const;
     void addPending(boost::shared_ptr<InterconnectFactory>);
     void removePending(boost::shared_ptr<InterconnectFactory>);
     boost::shared_ptr<qpid::management::ManagementObject> GetManagementObject() const;

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Interconnect.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Interconnect.cpp?rev=1497036&r1=1497035&r2=1497036&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Interconnect.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Interconnect.cpp Wed Jun 26 18:32:47 2013
@@ -39,9 +39,9 @@ namespace qpid {
 namespace broker {
 namespace amqp {
 
-Interconnect::Interconnect(qpid::sys::OutputControl& out, const std::string& id, qpid::broker::Broker& broker, bool saslInUse,
-                           bool i, const std::string& n, const std::string& s, const std::string& t, Domain& d, Interconnects& r)
-    : Connection(out, id, broker, r, true, std::string()), incoming(i), name(n), source(s), target(t), domain(d), registry(r), headerDiscarded(saslInUse),
+Interconnect::Interconnect(qpid::sys::OutputControl& out, const std::string& id, BrokerContext& broker, bool saslInUse,
+                           bool i, const std::string& n, const std::string& s, const std::string& t, Domain& d)
+    : Connection(out, id, broker, true), incoming(i), name(n), source(s), target(t), domain(d), headerDiscarded(saslInUse),
       closeRequested(false), isTransportDeleted(false)
 {}
 
@@ -83,10 +83,9 @@ void Interconnect::process()
         if ((pn_connection_state(connection) & UNINIT) == UNINIT) {
             QPID_LOG_CAT(debug, model, id << " interconnect opened");
             open();
-
             pn_session_t* s = pn_session(connection);
             pn_session_open(s);
-            boost::shared_ptr<Session> ssn(new Session(s, broker, *this, out));
+            boost::shared_ptr<Session> ssn(new Session(s, *this, out));
             sessions[s] = ssn;
 
             pn_link_t* l = incoming ? pn_receiver(s, name.c_str()) : pn_sender(s, name.c_str());
@@ -111,7 +110,7 @@ void Interconnect::deletedFromRegistry()
 void Interconnect::transportDeleted()
 {
     isTransportDeleted = true;
-    registry.remove(name);
+    getInterconnects().remove(name);
 }
 
 bool Interconnect::isLink() const

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Interconnect.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Interconnect.h?rev=1497036&r1=1497035&r2=1497036&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Interconnect.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Interconnect.h Wed Jun 26 18:32:47 2013
@@ -37,8 +37,8 @@ class Relay;
 class Interconnect : public Connection
 {
   public:
-    Interconnect(qpid::sys::OutputControl& out, const std::string& id, qpid::broker::Broker& broker, bool saslInUse,
-                 bool incoming, const std::string& name, const std::string& source, const std::string& target, Domain&, Interconnects&);
+    Interconnect(qpid::sys::OutputControl& out, const std::string& id, BrokerContext& broker, bool saslInUse,
+                 bool incoming, const std::string& name, const std::string& source, const std::string& target, Domain&);
     void setRelay(boost::shared_ptr<Relay>);
     ~Interconnect();
     size_t encode(char* buffer, size_t size);
@@ -51,7 +51,6 @@ class Interconnect : public Connection
     std::string source;
     std::string target;
     Domain& domain;
-    Interconnects& registry;
     bool headerDiscarded;
     boost::shared_ptr<Relay> relay;
     bool closeRequested;

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Interconnects.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Interconnects.cpp?rev=1497036&r1=1497035&r2=1497036&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Interconnects.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Interconnects.cpp Wed Jun 26 18:32:47 2013
@@ -30,6 +30,7 @@
 #include "qpid/sys/OutputControl.h"
 #include "qpid/log/Statement.h"
 #include <boost/shared_ptr.hpp>
+#include <assert.h>
 
 namespace qpid {
 namespace broker {
@@ -50,6 +51,7 @@ bool Interconnects::createObject(Broker&
         if (i == domains.end()) {
             boost::shared_ptr<Domain> domain(new Domain(name, properties, broker));
             domains[name] = domain;
+            if (domain->isDurable()) broker.getStore().create(*domain);
             return true;
         } else {
             return false;
@@ -72,20 +74,23 @@ bool Interconnects::createObject(Broker&
                 throw qpid::Exception(QPID_MSG("Domain must be specified"));
             }
         }
-        domain->connect(type == INCOMING_TYPE, name, properties, *this);
+        domain->connect(type == INCOMING_TYPE, name, properties, *context);
         return true;
     } else {
         return false;
     }
 }
-bool Interconnects::deleteObject(Broker&, const std::string& type, const std::string& name, const qpid::types::Variant::Map& /*properties*/,
+bool Interconnects::deleteObject(Broker& broker, const std::string& type, const std::string& name, const qpid::types::Variant::Map& /*properties*/,
                                  const std::string& /*userId*/, const std::string& /*connectionId*/)
 {
     if (type == DOMAIN_TYPE) {
+        boost::shared_ptr<Domain> domain;
         qpid::sys::ScopedLock<qpid::sys::Mutex> l(lock);
         DomainMap::iterator i = domains.find(name);
         if (i != domains.end()) {
+            domain = i->second;
             domains.erase(i);
+            if (domain->isDurable()) broker.getStore().destroy(*domain);
             return true;
         } else {
             throw qpid::Exception(QPID_MSG("No such domain: " << name));
@@ -109,6 +114,20 @@ bool Interconnects::deleteObject(Broker&
     }
 }
 
+bool Interconnects::recoverObject(Broker& broker, const std::string& type, const std::string& name, const qpid::types::Variant::Map& properties,
+                   uint64_t persistenceId)
+{
+    if (type == DOMAIN_TYPE) {
+        boost::shared_ptr<Domain> domain(new Domain(name, properties, broker));
+        domain->setPersistenceId(persistenceId);
+        qpid::sys::ScopedLock<qpid::sys::Mutex> l(lock);
+        domains[name] = domain;
+        return true;
+    } else {
+        return false;
+    }
+}
+
 
 bool Interconnects::add(const std::string& name, boost::shared_ptr<Interconnect> connection)
 {
@@ -145,7 +164,13 @@ boost::shared_ptr<Domain> Interconnects:
     } else {
         return i->second;
     }
-
 }
+void Interconnects::setContext(BrokerContext& c)
+{
+    context = &c;
+    assert(&(context->getInterconnects()) == this);
+}
+
+Interconnects::Interconnects() : context(0) {}
 
 }}} // namespace qpid::broker::amqp

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Interconnects.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Interconnects.h?rev=1497036&r1=1497035&r2=1497036&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Interconnects.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Interconnects.h Wed Jun 26 18:32:47 2013
@@ -30,7 +30,7 @@
 namespace qpid {
 namespace broker {
 namespace amqp {
-
+class BrokerContext;
 class Domain;
 class Interconnect;
 /**
@@ -43,19 +43,23 @@ class Interconnects : public ObjectFacto
                               const std::string& userId, const std::string& connectionId);
     bool deleteObject(Broker&, const std::string& type, const std::string& name, const qpid::types::Variant::Map& properties,
                               const std::string& userId, const std::string& connectionId);
-
+    bool recoverObject(Broker&, const std::string& type, const std::string& name, const qpid::types::Variant::Map& properties,
+                       uint64_t persistenceId);
 
     bool add(const std::string&, boost::shared_ptr<Interconnect>);
     boost::shared_ptr<Interconnect> get(const std::string&);
     bool remove(const std::string&);
 
     boost::shared_ptr<Domain> findDomain(const std::string&);
+    void setContext(BrokerContext&);
+    Interconnects();
   private:
     typedef std::map<std::string, boost::shared_ptr<Interconnect> > InterconnectMap;
     typedef std::map<std::string, boost::shared_ptr<Domain> > DomainMap;
     InterconnectMap interconnects;
     DomainMap domains;
     qpid::sys::Mutex lock;
+    BrokerContext* context;
 };
 }}} // namespace qpid::broker::amqp
 

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/amqp/ProtocolPlugin.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/amqp/ProtocolPlugin.cpp?rev=1497036&r1=1497035&r2=1497036&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/amqp/ProtocolPlugin.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/amqp/ProtocolPlugin.cpp Wed Jun 26 18:32:47 2013
@@ -30,6 +30,7 @@
 #include "qpid/broker/amqp/Interconnects.h"
 #include "qpid/broker/amqp/Message.h"
 #include "qpid/broker/amqp/Sasl.h"
+#include "qpid/broker/amqp/Topic.h"
 #include "qpid/broker/amqp/Translation.h"
 #include "qpid/broker/amqp_0_10/MessageTransfer.h"
 #include "qpid/framing/Buffer.h"
@@ -50,20 +51,20 @@ struct Options : public qpid::Options {
     }
 };
 
-class ProtocolImpl : public Protocol
+class ProtocolImpl : public BrokerContext, public Protocol
 {
   public:
-    ProtocolImpl(Interconnects* i, Broker& b, const std::string& d) : interconnects(i), broker(b), domain(d)
+    ProtocolImpl(Interconnects* interconnects, TopicRegistry* topics, Broker& broker, const std::string& domain)
+        : BrokerContext(broker, *interconnects, *topics, domain)
     {
+        interconnects->setContext(*this);
         broker.getObjectFactoryRegistry().add(interconnects);//registry deletes on shutdown
+        broker.getObjectFactoryRegistry().add(topics);//registry deletes on shutdown
     }
     qpid::sys::ConnectionCodec* create(const qpid::framing::ProtocolVersion&, qpid::sys::OutputControl&, const std::string&, const qpid::sys::SecuritySettings&);
     boost::intrusive_ptr<const qpid::broker::amqp_0_10::MessageTransfer> translate(const qpid::broker::Message&);
     boost::shared_ptr<RecoverableMessage> recover(qpid::framing::Buffer&);
   private:
-    Interconnects* interconnects;
-    Broker& broker;
-    std::string domain;
 };
 
 struct ProtocolPlugin : public Plugin
@@ -76,7 +77,7 @@ struct ProtocolPlugin : public Plugin
         //need to register protocol before recovery from store
         broker::Broker* broker = dynamic_cast<qpid::broker::Broker*>(&target);
         if (broker) {
-            ProtocolImpl* impl = new ProtocolImpl(new Interconnects(), *broker, options.domain);
+            ProtocolImpl* impl = new ProtocolImpl(new Interconnects(), new TopicRegistry(), *broker, options.domain);
             broker->getProtocolRegistry().add("AMQP 1.0", impl);//registry deletes on shutdown
         }
     }
@@ -90,22 +91,21 @@ qpid::sys::ConnectionCodec* ProtocolImpl
 {
     if (v == qpid::framing::ProtocolVersion(1, 0)) {
         if (v.getProtocol() == qpid::framing::ProtocolVersion::SASL) {
-            if (broker.getOptions().auth) {
+            if (getBroker().getOptions().auth) {
                 QPID_LOG(info, "Using AMQP 1.0 (with SASL layer)");
-                return new qpid::broker::amqp::Sasl(out, id, broker, *interconnects,
-                                                    qpid::SaslFactory::getInstance().createServer(broker.getOptions().realm,broker.getOptions().requireEncrypted, external),
-                                                    domain);
+                return new qpid::broker::amqp::Sasl(out, id, *this,
+                                                    qpid::SaslFactory::getInstance().createServer(getBroker().getOptions().realm,getBroker().getOptions().requireEncrypted, external));
             } else {
-                std::auto_ptr<SaslServer> authenticator(new qpid::NullSaslServer(broker.getOptions().realm));
+                std::auto_ptr<SaslServer> authenticator(new qpid::NullSaslServer(getBroker().getOptions().realm));
                 QPID_LOG(info, "Using AMQP 1.0 (with dummy SASL layer)");
-                return new qpid::broker::amqp::Sasl(out, id, broker, *interconnects, authenticator, domain);
+                return new qpid::broker::amqp::Sasl(out, id, *this, authenticator);
             }
         } else {
-            if (broker.getOptions().auth) {
+            if (getBroker().getOptions().auth) {
                 throw qpid::Exception("SASL layer required!");
             } else {
                 QPID_LOG(info, "Using AMQP 1.0 (no SASL layer)");
-                return new qpid::broker::amqp::Connection(out, id, broker, *interconnects, false, domain);
+                return new qpid::broker::amqp::Connection(out, id, *this, false);
             }
         }
     }
@@ -114,7 +114,7 @@ qpid::sys::ConnectionCodec* ProtocolImpl
 
 boost::intrusive_ptr<const qpid::broker::amqp_0_10::MessageTransfer> ProtocolImpl::translate(const qpid::broker::Message& m)
 {
-    qpid::broker::amqp::Translation t(m, &broker);
+    qpid::broker::amqp::Translation t(m, &getBroker());
     return t.getTransfer();
 }
 

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Sasl.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Sasl.cpp?rev=1497036&r1=1497035&r2=1497036&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Sasl.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Sasl.cpp Wed Jun 26 18:32:47 2013
@@ -31,8 +31,8 @@ namespace qpid {
 namespace broker {
 namespace amqp {
 
-Sasl::Sasl(qpid::sys::OutputControl& o, const std::string& id, qpid::broker::Broker& broker, Interconnects& i, std::auto_ptr<qpid::SaslServer> auth, const std::string& domain)
-    : qpid::amqp::SaslServer(id), out(o), connection(out, id, broker, i, true, domain),
+Sasl::Sasl(qpid::sys::OutputControl& o, const std::string& id, BrokerContext& context, std::auto_ptr<qpid::SaslServer> auth)
+    : qpid::amqp::SaslServer(id), out(o), connection(out, id, context, true),
       authenticator(auth),
       state(INCOMPLETE), writeHeader(true), haveOutput(true)
 {

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Sasl.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Sasl.h?rev=1497036&r1=1497035&r2=1497036&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Sasl.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Sasl.h Wed Jun 26 18:32:47 2013
@@ -39,7 +39,7 @@ namespace amqp {
 class Sasl : public sys::ConnectionCodec, qpid::amqp::SaslServer
 {
   public:
-    Sasl(qpid::sys::OutputControl& out, const std::string& id, qpid::broker::Broker& broker, Interconnects&, std::auto_ptr<qpid::SaslServer> authenticator, const std::string& domain);
+    Sasl(qpid::sys::OutputControl& out, const std::string& id, BrokerContext& context, std::auto_ptr<qpid::SaslServer> authenticator);
     ~Sasl();
 
     size_t decode(const char* buffer, size_t size);

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Session.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Session.cpp?rev=1497036&r1=1497035&r2=1497036&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Session.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Session.cpp Wed Jun 26 18:32:47 2013
@@ -26,6 +26,7 @@
 #include "Domain.h"
 #include "Interconnects.h"
 #include "Relay.h"
+#include "Topic.h"
 #include "qpid/broker/Broker.h"
 #include "qpid/broker/DeliverableMessage.h"
 #include "qpid/broker/Exchange.h"
@@ -128,24 +129,27 @@ class IncomingToExchange : public Decodi
     Authorise& authorise;
 };
 
-Session::Session(pn_session_t* s, qpid::broker::Broker& b, Connection& c, qpid::sys::OutputControl& o)
-    : ManagedSession(b, c, (boost::format("%1%") % s).str()), session(s), broker(b), connection(c), out(o), deleted(false), authorise(connection.getUserId(), broker.getAcl()) {}
+Session::Session(pn_session_t* s, Connection& c, qpid::sys::OutputControl& o)
+    : ManagedSession(c.getBroker(), c, (boost::format("%1%") % s).str()), session(s), connection(c), out(o), deleted(false),
+      authorise(connection.getUserId(), connection.getBroker().getAcl()) {}
 
 
 Session::ResolvedNode Session::resolve(const std::string name, pn_terminus_t* terminus, bool incoming)
 {
     ResolvedNode node;
-    node.exchange = broker.getExchanges().find(name);
-    node.queue = broker.getQueues().find(name);
+    node.exchange = connection.getBroker().getExchanges().find(name);
+    node.queue = connection.getBroker().getQueues().find(name);
+    node.topic = connection.getTopics().get(name);
+    if (node.topic) node.exchange = node.topic->getExchange();
     if (!node.queue && !node.exchange) {
         if (pn_terminus_is_dynamic(terminus)  || is_capability_requested(CREATE_ON_DEMAND, pn_terminus_capabilities(terminus))) {
             //is it a queue or an exchange?
             node.properties.read(pn_terminus_properties(terminus));
             if (node.properties.isQueue()) {
-                node.queue = broker.createQueue(name, node.properties.getQueueSettings(), this, node.properties.getAlternateExchange(), connection.getUserId(), connection.getId()).first;
+                node.queue = connection.getBroker().createQueue(name, node.properties.getQueueSettings(), this, node.properties.getAlternateExchange(), connection.getUserId(), connection.getId()).first;
             } else {
                 qpid::framing::FieldTable args;
-                node.exchange = broker.createExchange(name, node.properties.getExchangeType(), node.properties.isDurable(), node.properties.getAlternateExchange(),
+                node.exchange = connection.getBroker().createExchange(name, node.properties.getExchangeType(), node.properties.isDurable(), node.properties.getAlternateExchange(),
                                                       args, connection.getUserId(), connection.getId()).first;
             }
         } else {
@@ -159,13 +163,16 @@ Session::ResolvedNode Session::resolve(c
                 if (d) {
                     node.relay = boost::shared_ptr<Relay>(new Relay(1000));
                     if (incoming) {
-                        d->connect(false, id, name, local, connection.getInterconnects(), node.relay);
+                        d->connect(false, id, name, local, connection, node.relay);
                     } else {
-                        d->connect(true, id, local, name, connection.getInterconnects(), node.relay);
+                        d->connect(true, id, local, name, connection, node.relay);
                     }
                 }
             }
         }
+    } else if (node.queue && node.topic) {
+        QPID_LOG_CAT(warning, protocol, "Ambiguous node name; " << name << " could be queue or topic, assuming topic");
+        node.queue.reset();
     } else if (node.queue && node.exchange) {
         QPID_LOG_CAT(warning, protocol, "Ambiguous node name; " << name << " could be queue or exchange, assuming queue");
         node.exchange.reset();
@@ -255,19 +262,20 @@ void Session::setupIncoming(pn_link_t* l
         source = sourceAddress;
     }
     if (node.queue) {
-        boost::shared_ptr<Incoming> q(new IncomingToQueue(broker, *this, node.queue, link, source));
+        boost::shared_ptr<Incoming> q(new IncomingToQueue(connection.getBroker(), *this, node.queue, link, source));
         incoming[link] = q;
     } else if (node.exchange) {
-        boost::shared_ptr<Incoming> e(new IncomingToExchange(broker, *this, node.exchange, link, source));
+        boost::shared_ptr<Incoming> e(new IncomingToExchange(connection.getBroker(), *this, node.exchange, link, source));
         incoming[link] = e;
     } else if (node.relay) {
-        boost::shared_ptr<Incoming> in(new IncomingToRelay(link, broker, *this, source, name, pn_link_name(link), node.relay));
+        boost::shared_ptr<Incoming> in(new IncomingToRelay(link, connection.getBroker(), *this, source, name, pn_link_name(link), node.relay));
         incoming[link] = in;
     } else {
         pn_terminus_set_type(pn_link_target(link), PN_UNSPECIFIED);
         throw qpid::Exception("Node not found: " + name);/*not-found*/
     }
-    if (broker.getOptions().auth && !connection.isLink()) incoming[link]->verify(connection.getUserId(), broker.getOptions().realm);
+    if (connection.getBroker().getOptions().auth && !connection.isLink())
+        incoming[link]->verify(connection.getUserId(), connection.getBroker().getOptions().realm);
     QPID_LOG(debug, "Incoming link attached");
 }
 
@@ -291,7 +299,7 @@ void Session::setupOutgoing(pn_link_t* l
 
     if (node.queue) {
         authorise.outgoing(node.queue);
-        boost::shared_ptr<Outgoing> q(new OutgoingFromQueue(broker, name, target, node.queue, link, *this, out, false));
+        boost::shared_ptr<Outgoing> q(new OutgoingFromQueue(connection.getBroker(), name, target, node.queue, link, *this, out, false));
         q->init();
         filter.apply(q);
         outgoing[link] = q;
@@ -300,6 +308,11 @@ void Session::setupOutgoing(pn_link_t* l
         bool shared = is_capability_requested(SHARED, pn_terminus_capabilities(source));
         bool durable = pn_terminus_get_durability(source);
         QueueSettings settings(durable, !durable);
+        if (node.topic) {
+            settings = node.topic->getPolicy();
+            settings.durable = durable;
+            settings.autodelete = !durable;
+        }
         filter.configure(settings);
         //TODO: populate settings from source details when available from engine
         std::stringstream queueName;
@@ -313,15 +326,15 @@ void Session::setupOutgoing(pn_link_t* l
             queueName << connection.getContainerId() << "_" << pn_link_name(link);
         }
         boost::shared_ptr<qpid::broker::Queue> queue
-            = broker.createQueue(queueName.str(), settings, this, "", connection.getUserId(), connection.getId()).first;
+            = connection.getBroker().createQueue(queueName.str(), settings, this, "", connection.getUserId(), connection.getId()).first;
         if (!shared) queue->setExclusiveOwner(this);
         authorise.outgoing(node.exchange, queue, filter);
         filter.bind(node.exchange, queue);
-        boost::shared_ptr<Outgoing> q(new OutgoingFromQueue(broker, name, target, queue, link, *this, out, !shared));
+        boost::shared_ptr<Outgoing> q(new OutgoingFromQueue(connection.getBroker(), name, target, queue, link, *this, out, !shared));
         outgoing[link] = q;
         q->init();
     } else if (node.relay) {
-        boost::shared_ptr<Outgoing> out(new OutgoingFromRelay(link, broker, *this, name, target, pn_link_name(link), node.relay));
+        boost::shared_ptr<Outgoing> out(new OutgoingFromRelay(link, connection.getBroker(), *this, name, target, pn_link_name(link), node.relay));
         outgoing[link] = out;
         out->init();
     } else {
@@ -344,11 +357,11 @@ void Session::attach(pn_link_t* link, co
 
     if (relay) {
         if (pn_link_is_sender(link)) {
-            boost::shared_ptr<Outgoing> out(new OutgoingFromRelay(link, broker, *this, src, tgt, pn_link_name(link), relay));
+            boost::shared_ptr<Outgoing> out(new OutgoingFromRelay(link, connection.getBroker(), *this, src, tgt, pn_link_name(link), relay));
             outgoing[link] = out;
             out->init();
         } else {
-            boost::shared_ptr<Incoming> in(new IncomingToRelay(link, broker, *this, src, tgt, pn_link_name(link), relay));
+            boost::shared_ptr<Incoming> in(new IncomingToRelay(link, connection.getBroker(), *this, src, tgt, pn_link_name(link), relay));
             incoming[link] = in;
         }
     } else {

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Session.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Session.h?rev=1497036&r1=1497035&r2=1497036&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Session.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Session.h Wed Jun 26 18:32:47 2013
@@ -50,13 +50,14 @@ class Connection;
 class Incoming;
 class Outgoing;
 class Relay;
+class Topic;
 /**
  *
  */
 class Session : public ManagedSession, public boost::enable_shared_from_this<Session>
 {
   public:
-    Session(pn_session_t*, qpid::broker::Broker&, Connection&, qpid::sys::OutputControl&);
+    Session(pn_session_t*, Connection&, qpid::sys::OutputControl&);
     /**
      * called for links initiated by the peer
      */
@@ -82,7 +83,6 @@ class Session : public ManagedSession, p
     typedef std::map<pn_link_t*, boost::shared_ptr<Outgoing> > OutgoingLinks;
     typedef std::map<pn_link_t*, boost::shared_ptr<Incoming> > IncomingLinks;
     pn_session_t* session;
-    qpid::broker::Broker& broker;
     Connection& connection;
     qpid::sys::OutputControl& out;
     IncomingLinks incoming;
@@ -97,6 +97,7 @@ class Session : public ManagedSession, p
     {
         boost::shared_ptr<qpid::broker::Exchange> exchange;
         boost::shared_ptr<qpid::broker::Queue> queue;
+        boost::shared_ptr<qpid::broker::amqp::Topic> topic;
         boost::shared_ptr<Relay> relay;
         NodeProperties properties;
     };

Added: qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Topic.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Topic.cpp?rev=1497036&view=auto
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Topic.cpp (added)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Topic.cpp Wed Jun 26 18:32:47 2013
@@ -0,0 +1,174 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "qpid/broker/amqp/Topic.h"
+#include "qpid/broker/Broker.h"
+#include "qpid/management/ManagementAgent.h"
+
+namespace _qmf = qmf::org::apache::qpid::broker;
+
+namespace qpid {
+namespace broker {
+namespace amqp {
+namespace {
+const std::string TOPIC("topic");
+const std::string EXCHANGE("exchange");
+const std::string DURABLE("durable");
+const std::string EMPTY;
+
+std::string getProperty(const std::string& k, const qpid::types::Variant::Map& m)
+{
+    qpid::types::Variant::Map::const_iterator i = m.find(k);
+    if (i == m.end()) return EMPTY;
+    else return i->second;
+}
+
+bool testProperty(const std::string& k, const qpid::types::Variant::Map& m)
+{
+    qpid::types::Variant::Map::const_iterator i = m.find(k);
+    if (i == m.end()) return false;
+    else return i->second;
+}
+
+}
+
+Topic::Topic(Broker& broker, const std::string& n, const qpid::types::Variant::Map& properties)
+    : PersistableObject(n, TOPIC, properties), name(n), durable(testProperty(DURABLE, properties)), exchange(broker.getExchanges().get(getProperty(EXCHANGE, properties)))
+{
+    if (exchange->getName().empty()) throw qpid::Exception("Exchange must be specified.");
+
+    qpid::types::Variant::Map unused;
+    policy.populate(properties, unused);
+
+    qpid::management::ManagementAgent* agent = broker.getManagementAgent();
+    if (agent != 0) {
+        topic = _qmf::Topic::shared_ptr(new _qmf::Topic(agent, this, name, exchange->GetManagementObject()->getObjectId(), durable));
+        topic->set_properties(policy.asMap());
+        agent->addObject(topic);
+    }
+}
+
+bool Topic::isDurable() const
+{
+    return durable;
+}
+
+Topic::~Topic()
+{
+    if (topic != 0) topic->resourceDestroy();
+}
+
+boost::shared_ptr<qpid::management::ManagementObject> Topic::GetManagementObject() const
+{
+    return topic;
+}
+
+const QueueSettings& Topic::getPolicy() const
+{
+    return policy;
+}
+boost::shared_ptr<Exchange> Topic::getExchange()
+{
+    return exchange;
+}
+const std::string& Topic::getName() const
+{
+    return name;
+}
+
+boost::shared_ptr<Topic> TopicRegistry::createTopic(Broker& broker, const std::string& name, const qpid::types::Variant::Map& properties)
+{
+    boost::shared_ptr<Topic> topic(new Topic(broker, name, properties));
+    add(topic);
+    return topic;
+}
+
+bool TopicRegistry::createObject(Broker& broker, const std::string& type, const std::string& name, const qpid::types::Variant::Map& props,
+                                 const std::string& /*userId*/, const std::string& /*connectionId*/)
+{
+    if (type == TOPIC) {
+        boost::shared_ptr<Topic> topic = createTopic(broker, name, props);
+        if (topic->isDurable()) broker.getStore().create(*topic);
+        return true;
+    } else {
+        return false;
+    }
+}
+
+bool TopicRegistry::deleteObject(Broker& broker, const std::string& type, const std::string& name, const qpid::types::Variant::Map& /*properties*/,
+                                 const std::string& /*userId*/, const std::string& /*connectionId*/)
+{
+    if (type == TOPIC) {
+        boost::shared_ptr<Topic> topic = remove(name);
+        if (topic->isDurable()) broker.getStore().destroy(*topic);
+        return true;
+    } else {
+        return false;
+    }
+}
+
+bool TopicRegistry::recoverObject(Broker& broker, const std::string& type, const std::string& name, const qpid::types::Variant::Map& properties,
+                   uint64_t persistenceId)
+{
+    if (type == TOPIC) {
+        boost::shared_ptr<Topic> topic = createTopic(broker, name, properties);
+        topic->setPersistenceId(persistenceId);
+        return true;
+    } else {
+        return false;
+    }
+}
+
+bool TopicRegistry::add(boost::shared_ptr<Topic> topic)
+{
+    qpid::sys::Mutex::ScopedLock l(lock);
+    Topics::const_iterator i = topics.find(topic->getName());
+    if (i == topics.end()) {
+        topics.insert(Topics::value_type(topic->getName(), topic));
+        return true;
+    } else {
+        return false;
+    }
+
+}
+boost::shared_ptr<Topic> TopicRegistry::remove(const std::string& name)
+{
+    boost::shared_ptr<Topic> result;
+    qpid::sys::Mutex::ScopedLock l(lock);
+    Topics::iterator i = topics.find(name);
+    if (i != topics.end()) {
+        result = i->second;
+        topics.erase(i);
+    }
+    return result;
+}
+
+boost::shared_ptr<Topic> TopicRegistry::get(const std::string& name)
+{
+    qpid::sys::Mutex::ScopedLock l(lock);
+    Topics::const_iterator i = topics.find(name);
+    if (i == topics.end()) {
+        return boost::shared_ptr<Topic>();
+    } else {
+        return i->second;
+    }
+}
+
+}}} // namespace qpid::broker::amqp

Added: qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Topic.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Topic.h?rev=1497036&view=auto
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Topic.h (added)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Topic.h Wed Jun 26 18:32:47 2013
@@ -0,0 +1,88 @@
+#ifndef QPID_BROKER_AMQP_TOPIC_H
+#define QPID_BROKER_AMQP_TOPIC_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "qpid/broker/ObjectFactory.h"
+#include "qpid/broker/PersistableObject.h"
+#include "qpid/broker/QueueSettings.h"
+#include "qpid/sys/Mutex.h"
+#include "qpid/types/Variant.h"
+#include "qpid/management/Manageable.h"
+#include "qmf/org/apache/qpid/broker/Exchange.h"
+#include "qmf/org/apache/qpid/broker/Topic.h"
+#include <boost/shared_ptr.hpp>
+
+namespace qpid {
+namespace broker {
+class Broker;
+class Exchange;
+class QueueDepth;
+
+namespace amqp {
+
+/**
+ * A topic is a node supporting a pub-sub style. It is at present
+ * implemented by an exchange with an additional policy for handling
+ * subscription queues.
+ */
+class Topic : public PersistableObject, public management::Manageable
+{
+  public:
+    Topic(Broker&, const std::string& name, const qpid::types::Variant::Map& properties);
+    ~Topic();
+    const std::string& getName() const;
+    const QueueSettings& getPolicy() const;
+    boost::shared_ptr<Exchange> getExchange();
+    bool isDurable() const;
+    boost::shared_ptr<qpid::management::ManagementObject> GetManagementObject() const;
+  private:
+    std::string name;
+    bool durable;
+    boost::shared_ptr<Exchange> exchange;
+    QueueSettings policy;
+    qmf::org::apache::qpid::broker::Topic::shared_ptr topic;
+};
+
+class TopicRegistry : public ObjectFactory
+{
+  public:
+    bool createObject(Broker&, const std::string& type, const std::string& name, const qpid::types::Variant::Map& properties,
+                              const std::string& userId, const std::string& connectionId);
+    bool deleteObject(Broker&, const std::string& type, const std::string& name, const qpid::types::Variant::Map& properties,
+                              const std::string& userId, const std::string& connectionId);
+    bool recoverObject(Broker&, const std::string& type, const std::string& name, const qpid::types::Variant::Map& properties,
+                       uint64_t persistenceId);
+
+    bool add(boost::shared_ptr<Topic> topic);
+    boost::shared_ptr<Topic> remove(const std::string& name);
+    boost::shared_ptr<Topic> get(const std::string& name);
+  private:
+    typedef std::map<std::string, boost::shared_ptr<Topic> > Topics;
+    qpid::sys::Mutex lock;
+    Topics topics;
+
+    boost::shared_ptr<Topic> createTopic(Broker&, const std::string& name, const qpid::types::Variant::Map& properties);
+};
+
+}}} // namespace qpid::broker::amqp
+
+#endif  /*!QPID_BROKER_AMQP_TOPIC_H*/

Modified: qpid/trunk/qpid/specs/management-schema.xml
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/specs/management-schema.xml?rev=1497036&r1=1497035&r2=1497036&view=diff
==============================================================================
--- qpid/trunk/qpid/specs/management-schema.xml (original)
+++ qpid/trunk/qpid/specs/management-schema.xml Wed Jun 26 18:32:47 2013
@@ -415,7 +415,7 @@
   </class>
   <!--
   ===============================================================
-  Domain
+  AMQP 1.0 Domain
   ===============================================================
   -->
   <class name="Domain">
@@ -426,6 +426,18 @@
     <property name="username"       type="sstr"   access="RO"/>
     <property name="password"       type="sstr"   access="RO"/>
   </class>
+  <!--
+  ===============================================================
+  AMQP 1.0 Topic
+  ===============================================================
+  -->
+  <class name="Topic">
+    <property name="name"           type="sstr"     access="RC" index="y"/>
+    <property name="exchangeRef"    type="objId"    references="Exchange" access="RC"/>
+    <property name="durable"        type="bool"     access="RC"/>
+    <property name="properties"     type="map"      access="RO"/>
+  </class>
+
 
   <!--
   ===============================================================



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org