You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by gs...@apache.org on 2013/06/25 15:28:16 UTC

svn commit: r1496466 [1/2] - in /qpid/trunk/qpid/cpp/src: ./ qpid/amqp/ qpid/broker/ qpid/broker/amqp/ qpid/broker/amqp_0_10/ qpid/ha/ qpid/management/ qpid/messaging/amqp/

Author: gsim
Date: Tue Jun 25 13:28:15 2013
New Revision: 1496466

URL: http://svn.apache.org/r1496466
Log:
QPID-4712: authorisation for AMQP 1.0 connections

Added:
    qpid/trunk/qpid/cpp/src/qpid/amqp/MapBuilder.cpp
    qpid/trunk/qpid/cpp/src/qpid/amqp/MapBuilder.h
    qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Authorise.cpp
    qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Authorise.h
      - copied, changed from r1496401, qpid/trunk/qpid/cpp/src/qpid/broker/amqp/DataReader.h
    qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Exception.cpp
      - copied, changed from r1496401, qpid/trunk/qpid/cpp/src/qpid/broker/amqp/DataReader.h
    qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Exception.h
      - copied, changed from r1496401, qpid/trunk/qpid/cpp/src/qpid/broker/ConnectionIdentity.h
    qpid/trunk/qpid/cpp/src/qpid/broker/amqp_0_10/Connection.cpp
      - copied, changed from r1496401, qpid/trunk/qpid/cpp/src/qpid/broker/Connection.cpp
    qpid/trunk/qpid/cpp/src/qpid/broker/amqp_0_10/Connection.h
      - copied, changed from r1496401, qpid/trunk/qpid/cpp/src/qpid/broker/Connection.h
Removed:
    qpid/trunk/qpid/cpp/src/qpid/broker/Connection.cpp
    qpid/trunk/qpid/cpp/src/qpid/broker/ConnectionIdentity.h
Modified:
    qpid/trunk/qpid/cpp/src/CMakeLists.txt
    qpid/trunk/qpid/cpp/src/Makefile.am
    qpid/trunk/qpid/cpp/src/amqp.cmake
    qpid/trunk/qpid/cpp/src/qpid/amqp/descriptors.h
    qpid/trunk/qpid/cpp/src/qpid/broker/Bridge.cpp
    qpid/trunk/qpid/cpp/src/qpid/broker/Bridge.h
    qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp
    qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h
    qpid/trunk/qpid/cpp/src/qpid/broker/Connection.h
    qpid/trunk/qpid/cpp/src/qpid/broker/ConnectionHandler.cpp
    qpid/trunk/qpid/cpp/src/qpid/broker/ConnectionHandler.h
    qpid/trunk/qpid/cpp/src/qpid/broker/HandlerImpl.h
    qpid/trunk/qpid/cpp/src/qpid/broker/Link.cpp
    qpid/trunk/qpid/cpp/src/qpid/broker/Link.h
    qpid/trunk/qpid/cpp/src/qpid/broker/LinkRegistry.cpp
    qpid/trunk/qpid/cpp/src/qpid/broker/LinkRegistry.h
    qpid/trunk/qpid/cpp/src/qpid/broker/Message.cpp
    qpid/trunk/qpid/cpp/src/qpid/broker/Message.h
    qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp
    qpid/trunk/qpid/cpp/src/qpid/broker/SaslAuthenticator.cpp
    qpid/trunk/qpid/cpp/src/qpid/broker/SaslAuthenticator.h
    qpid/trunk/qpid/cpp/src/qpid/broker/SecureConnectionFactory.cpp
    qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp
    qpid/trunk/qpid/cpp/src/qpid/broker/SessionAdapter.cpp
    qpid/trunk/qpid/cpp/src/qpid/broker/SessionContext.h
    qpid/trunk/qpid/cpp/src/qpid/broker/SessionHandler.cpp
    qpid/trunk/qpid/cpp/src/qpid/broker/SessionHandler.h
    qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.cpp
    qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.h
    qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Connection.cpp
    qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Connection.h
    qpid/trunk/qpid/cpp/src/qpid/broker/amqp/DataReader.cpp
    qpid/trunk/qpid/cpp/src/qpid/broker/amqp/DataReader.h
    qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Filter.cpp
    qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Filter.h
    qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Incoming.cpp
    qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Incoming.h
    qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Interconnect.cpp
    qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Interconnect.h
    qpid/trunk/qpid/cpp/src/qpid/broker/amqp/ManagedConnection.cpp
    qpid/trunk/qpid/cpp/src/qpid/broker/amqp/ManagedConnection.h
    qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Sasl.cpp
    qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Session.cpp
    qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Session.h
    qpid/trunk/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp
    qpid/trunk/qpid/cpp/src/qpid/ha/ConnectionObserver.cpp
    qpid/trunk/qpid/cpp/src/qpid/management/ManagementAgent.cpp
    qpid/trunk/qpid/cpp/src/qpid/management/ManagementAgent.h
    qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp
    qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.h
    qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/SenderContext.cpp
    qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/SenderContext.h
    qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/SessionContext.cpp

Modified: qpid/trunk/qpid/cpp/src/CMakeLists.txt
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/CMakeLists.txt?rev=1496466&r1=1496465&r2=1496466&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/CMakeLists.txt (original)
+++ qpid/trunk/qpid/cpp/src/CMakeLists.txt Tue Jun 25 13:28:15 2013
@@ -1034,6 +1034,8 @@ set (qpidcommon_SOURCES
      qpid/amqp/MapEncoder.cpp
      qpid/amqp/MapSizeCalculator.h
      qpid/amqp/MapSizeCalculator.cpp
+     qpid/amqp/MapBuilder.h
+     qpid/amqp/MapBuilder.cpp
      qpid/amqp/MapReader.h
      qpid/amqp/MapReader.cpp
      qpid/amqp/MessageEncoder.h
@@ -1261,7 +1263,7 @@ set (qpidbroker_SOURCES
      qpid/broker/MessageGroupManager.cpp
      qpid/broker/PersistableMessage.cpp
      qpid/broker/Bridge.cpp
-     qpid/broker/Connection.cpp
+     qpid/broker/amqp_0_10/Connection.cpp
      qpid/broker/ConnectionHandler.cpp
      qpid/broker/DeliverableMessage.cpp
      qpid/broker/DeliveryRecord.cpp

Modified: qpid/trunk/qpid/cpp/src/Makefile.am
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/Makefile.am?rev=1496466&r1=1496465&r2=1496466&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/Makefile.am (original)
+++ qpid/trunk/qpid/cpp/src/Makefile.am Tue Jun 25 13:28:15 2013
@@ -536,6 +536,8 @@ libqpidcommon_la_SOURCES +=			\
   qpid/amqp/MapEncoder.cpp                      \
   qpid/amqp/MapSizeCalculator.h                 \
   qpid/amqp/MapSizeCalculator.cpp               \
+  qpid/amqp/MapBuilder.h                       	\
+  qpid/amqp/MapBuilder.cpp                      \
   qpid/amqp/MapReader.h                       	\
   qpid/amqp/MapReader.cpp                       \
   qpid/amqp/MessageEncoder.h                    \
@@ -598,15 +600,15 @@ libqpidbroker_la_SOURCES = \
   qpid/broker/Broker.cpp \
   qpid/broker/Broker.h \
   qpid/broker/BrokerImportExport.h \
-  qpid/broker/Connection.cpp \
-  qpid/broker/Connection.h \
+  qpid/broker/amqp_0_10/Connection.cpp \
+  qpid/broker/amqp_0_10/Connection.h \
   qpid/broker/ConnectionHandler.cpp \
   qpid/broker/ConnectionHandler.h \
   qpid/broker/Consumer.h \
   qpid/broker/Credit.h \
   qpid/broker/Credit.cpp \
   qpid/broker/ConsumerFactory.h \
-  qpid/broker/ConnectionIdentity.h \
+  qpid/broker/Connection.h \
   qpid/broker/ConnectionObserver.h \
   qpid/broker/ConnectionObservers.h \
   qpid/broker/ConfigurationObserver.h \
@@ -801,12 +803,16 @@ if HAVE_PROTON
 dmoduleexec_LTLIBRARIES += amqp.la
 amqp_la_LIBADD = libqpidcommon.la
 amqp_la_SOURCES = \
+  qpid/broker/amqp/Authorise.h \
+  qpid/broker/amqp/Authorise.cpp \
   qpid/broker/amqp/Connection.h \
   qpid/broker/amqp/Connection.cpp \
   qpid/broker/amqp/DataReader.h \
   qpid/broker/amqp/DataReader.cpp \
   qpid/broker/amqp/Domain.h \
   qpid/broker/amqp/Domain.cpp \
+  qpid/broker/amqp/Exception.h \
+  qpid/broker/amqp/Exception.cpp \
   qpid/broker/amqp/Filter.h \
   qpid/broker/amqp/Filter.cpp \
   qpid/broker/amqp/Header.h \

Modified: qpid/trunk/qpid/cpp/src/amqp.cmake
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/amqp.cmake?rev=1496466&r1=1496465&r2=1496466&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/amqp.cmake (original)
+++ qpid/trunk/qpid/cpp/src/amqp.cmake Tue Jun 25 13:28:15 2013
@@ -53,12 +53,16 @@ if (BUILD_AMQP)
 
 
     set (amqp_SOURCES
+         qpid/broker/amqp/Authorise.h
+         qpid/broker/amqp/Authorise.cpp
          qpid/broker/amqp/Connection.h
          qpid/broker/amqp/Connection.cpp
          qpid/broker/amqp/DataReader.h
          qpid/broker/amqp/DataReader.cpp
          qpid/broker/amqp/Domain.h
          qpid/broker/amqp/Domain.cpp
+         qpid/broker/amqp/Exception.h
+         qpid/broker/amqp/Exception.cpp
          qpid/broker/amqp/Filter.h
          qpid/broker/amqp/Filter.cpp
          qpid/broker/amqp/Header.h

Added: qpid/trunk/qpid/cpp/src/qpid/amqp/MapBuilder.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/amqp/MapBuilder.cpp?rev=1496466&view=auto
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/amqp/MapBuilder.cpp (added)
+++ qpid/trunk/qpid/cpp/src/qpid/amqp/MapBuilder.cpp Tue Jun 25 13:28:15 2013
@@ -0,0 +1,130 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "MapBuilder.h"
+#include <assert.h>
+
+namespace qpid {
+namespace amqp {
+namespace {
+const std::string BINARY("binary");
+const std::string UTF8("utf8");
+const std::string ASCII("ascii");
+}
+
+qpid::types::Variant::Map MapBuilder::getMap()
+{
+    return map;
+}
+const qpid::types::Variant::Map MapBuilder::getMap() const
+{
+    return map;
+}
+
+void MapBuilder::onNullValue(const CharSequence& key, const Descriptor*)
+{
+    map[std::string(key.data, key.size)] = qpid::types::Variant();
+}
+void MapBuilder::onBooleanValue(const CharSequence& key, bool value, const Descriptor*)
+{
+    map[std::string(key.data, key.size)] = value;
+}
+void MapBuilder::onUByteValue(const CharSequence& key, uint8_t value, const Descriptor*)
+{
+    map[std::string(key.data, key.size)] = value;
+}
+
+void MapBuilder::onUShortValue(const CharSequence& key, uint16_t value, const Descriptor*)
+{
+    map[std::string(key.data, key.size)] = value;
+}
+
+void MapBuilder::onUIntValue(const CharSequence& key, uint32_t value, const Descriptor*)
+{
+    map[std::string(key.data, key.size)] = value;
+}
+
+void MapBuilder::onULongValue(const CharSequence& key, uint64_t value, const Descriptor*)
+{
+    map[std::string(key.data, key.size)] = value;
+}
+
+void MapBuilder::onByteValue(const CharSequence& key, int8_t value, const Descriptor*)
+{
+    map[std::string(key.data, key.size)] = value;
+}
+
+void MapBuilder::onShortValue(const CharSequence& key, int16_t value, const Descriptor*)
+{
+    map[std::string(key.data, key.size)] = value;
+}
+
+void MapBuilder::onIntValue(const CharSequence& key, int32_t value, const Descriptor*)
+{
+    map[std::string(key.data, key.size)] = value;
+}
+
+void MapBuilder::onLongValue(const CharSequence& key, int64_t value, const Descriptor*)
+{
+    map[std::string(key.data, key.size)] = value;
+}
+
+void MapBuilder::onFloatValue(const CharSequence& key, float value, const Descriptor*)
+{
+    map[std::string(key.data, key.size)] = value;
+}
+
+void MapBuilder::onDoubleValue(const CharSequence& key, double value, const Descriptor*)
+{
+    map[std::string(key.data, key.size)] = value;
+}
+
+void MapBuilder::onUuidValue(const CharSequence& key, const CharSequence& value, const Descriptor*)
+{
+    assert(value.size == 16);
+    map[std::string(key.data, key.size)] = qpid::types::Uuid(value.data);
+}
+
+void MapBuilder::onTimestampValue(const CharSequence& key, int64_t value, const Descriptor*)
+{
+    map[std::string(key.data, key.size)] = value;
+}
+
+void MapBuilder::onBinaryValue(const CharSequence& key, const CharSequence& value, const Descriptor*)
+{
+    qpid::types::Variant& v = map[std::string(key.data, key.size)];
+    v = std::string(value.data, value.size);
+    v.setEncoding(BINARY);
+}
+
+void MapBuilder::onStringValue(const CharSequence& key, const CharSequence& value, const Descriptor*)
+{
+    qpid::types::Variant& v = map[std::string(key.data, key.size)];
+    v = std::string(value.data, value.size);
+    v.setEncoding(UTF8);
+}
+
+void MapBuilder::onSymbolValue(const CharSequence& key, const CharSequence& value, const Descriptor*)
+{
+    qpid::types::Variant& v = map[std::string(key.data, key.size)];
+    v = std::string(value.data, value.size);
+    v.setEncoding(ASCII);
+}
+}} // namespace qpid::amqp

Added: qpid/trunk/qpid/cpp/src/qpid/amqp/MapBuilder.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/amqp/MapBuilder.h?rev=1496466&view=auto
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/amqp/MapBuilder.h (added)
+++ qpid/trunk/qpid/cpp/src/qpid/amqp/MapBuilder.h Tue Jun 25 13:28:15 2013
@@ -0,0 +1,63 @@
+#ifndef QPID_AMQP_MAPBUILDER_H
+#define QPID_AMQP_MAPBUILDER_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "MapReader.h"
+#include "qpid/types/Variant.h"
+
+namespace qpid {
+namespace amqp {
+
+/**
+ * Utility to build a Variant::Map from a data stream (doesn't handle
+ * nested maps or lists yet)
+ */
+class MapBuilder : public MapReader
+{
+  public:
+    void onNullValue(const CharSequence& /*key*/, const Descriptor*);
+    void onBooleanValue(const CharSequence& /*key*/, bool, const Descriptor*);
+    void onUByteValue(const CharSequence& /*key*/, uint8_t, const Descriptor*);
+    void onUShortValue(const CharSequence& /*key*/, uint16_t, const Descriptor*);
+    void onUIntValue(const CharSequence& /*key*/, uint32_t, const Descriptor*);
+    void onULongValue(const CharSequence& /*key*/, uint64_t, const Descriptor*);
+    void onByteValue(const CharSequence& /*key*/, int8_t, const Descriptor*);
+    void onShortValue(const CharSequence& /*key*/, int16_t, const Descriptor*);
+    void onIntValue(const CharSequence& /*key*/, int32_t, const Descriptor*);
+    void onLongValue(const CharSequence& /*key*/, int64_t, const Descriptor*);
+    void onFloatValue(const CharSequence& /*key*/, float, const Descriptor*);
+    void onDoubleValue(const CharSequence& /*key*/, double, const Descriptor*);
+    void onUuidValue(const CharSequence& /*key*/, const CharSequence&, const Descriptor*);
+    void onTimestampValue(const CharSequence& /*key*/, int64_t, const Descriptor*);
+
+    void onBinaryValue(const CharSequence& /*key*/, const CharSequence&, const Descriptor*);
+    void onStringValue(const CharSequence& /*key*/, const CharSequence&, const Descriptor*);
+    void onSymbolValue(const CharSequence& /*key*/, const CharSequence&, const Descriptor*);
+
+    qpid::types::Variant::Map getMap();
+    const qpid::types::Variant::Map getMap() const;
+  private:
+    qpid::types::Variant::Map map;
+};
+}} // namespace qpid::amqp
+
+#endif  /*!QPID_AMQP_MAPBUILDER_H*/

Modified: qpid/trunk/qpid/cpp/src/qpid/amqp/descriptors.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/amqp/descriptors.h?rev=1496466&r1=1496465&r2=1496466&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/amqp/descriptors.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/amqp/descriptors.h Tue Jun 25 13:28:15 2013
@@ -89,6 +89,15 @@ const uint64_t SELECTOR_FILTER_CODE(0x00
 const uint64_t XQUERY_FILTER_CODE(0x0000468C00000005ULL);
 }
 
+namespace error_conditions {
+//note these are not actually descriptors
+const std::string INTERNAL_ERROR("amqp:internal-error");
+const std::string NOT_FOUND("amqp:not-found");
+const std::string UNAUTHORIZED_ACCESS("amqp:unauthorized-access");
+const std::string DECODE_ERROR("amqp:decode-error");
+const std::string NOT_ALLOWED("amqp:not-allowed");
+const std::string RESOURCE_LIMIT_EXCEEDED("amqp:resource-limit-exceeded");
+}
 }} // namespace qpid::amqp
 
 #endif  /*!QPID_AMQP_DESCRIPTORS_H*/

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Bridge.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Bridge.cpp?rev=1496466&r1=1496465&r2=1496466&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Bridge.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Bridge.cpp Tue Jun 25 13:28:15 2013
@@ -22,7 +22,7 @@
 
 #include "qpid/broker/Broker.h"
 #include "qpid/broker/FedOps.h"
-#include "qpid/broker/Connection.h"
+#include "qpid/broker/amqp_0_10/Connection.h"
 #include "qpid/broker/Link.h"
 #include "qpid/broker/LinkRegistry.h"
 #include "qpid/broker/SessionState.h"
@@ -100,7 +100,7 @@ Bridge::~Bridge()
     mgmtObject->resourceDestroy();
 }
 
-void Bridge::create(Connection& c)
+void Bridge::create(amqp_0_10::Connection& c)
 {
     detached = false;           // Reset detached in case we are recovering.
     conn = &c;
@@ -200,7 +200,7 @@ void Bridge::create(Connection& c)
     if (args.i_srcIsLocal) sessionHandler.getSession()->enableReceiverTracking();
 }
 
-void Bridge::cancel(Connection&)
+void Bridge::cancel(amqp_0_10::Connection&)
 {
     if (resetProxy()) {
         peer->getMessage().cancel(args.i_dest);

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Bridge.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Bridge.h?rev=1496466&r1=1496465&r2=1496466&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Bridge.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Bridge.h Tue Jun 25 13:28:15 2013
@@ -39,8 +39,9 @@
 
 namespace qpid {
 namespace broker {
-
+namespace amqp_0_10 {
 class Connection;
+}
 class Link;
 class LinkRegistry;
 
@@ -115,9 +116,9 @@ class Bridge : public PersistableConfig,
     void setErrorListener(boost::shared_ptr<ErrorListener> e) { errorListener = e; }
   private:
     struct PushHandler : framing::FrameHandler {
-        PushHandler(Connection* c) { conn = c; }
+        PushHandler(amqp_0_10::Connection* c) { conn = c; }
         void handle(framing::AMQFrame& frame);
-        Connection* conn;
+        amqp_0_10::Connection* conn;
     };
 
     std::auto_ptr<PushHandler>                        pushHandler;
@@ -134,14 +135,14 @@ class Bridge : public PersistableConfig,
     std::string queueName;
     std::string altEx;
     mutable uint64_t  persistenceId;
-    Connection* conn;
+    amqp_0_10::Connection* conn;
     InitializeCallback initialize;
     bool detached;              // Set when session is detached.
     bool resetProxy();
 
     // connection Management (called by owning Link)
-    void create(Connection& c);
-    void cancel(Connection& c);
+    void create(amqp_0_10::Connection& c);
+    void cancel(amqp_0_10::Connection& c);
     void closed();
     friend class Link; // to call create, cancel, closed()
     boost::shared_ptr<ErrorListener> errorListener;

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp?rev=1496466&r1=1496465&r2=1496466&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp Tue Jun 25 13:28:15 2013
@@ -22,7 +22,7 @@
 #include "qpid/broker/Broker.h"
 
 #include "qpid/broker/AclModule.h"
-#include "qpid/broker/ConnectionIdentity.h"
+#include "qpid/broker/Connection.h"
 #include "qpid/broker/DirectExchange.h"
 #include "qpid/broker/FanOutExchange.h"
 #include "qpid/broker/HeadersExchange.h"
@@ -707,13 +707,13 @@ struct InvalidParameter : public qpid::E
 };
 
 void Broker::createObject(const std::string& type, const std::string& name,
-                          const Variant::Map& properties, bool /*strict*/, const ConnectionIdentity* context)
+                          const Variant::Map& properties, bool /*strict*/, const Connection* context)
 {
     std::string userId;
     std::string connectionId;
     if (context) {
         userId = context->getUserId();
-        connectionId = context->getUrl();
+        connectionId = context->getMgmtId();
     }
     //TODO: implement 'strict' option (check there are no unrecognised properties)
     QPID_LOG (debug, "Broker::create(" << type << ", " << name << "," << properties << ")");
@@ -898,13 +898,13 @@ void Broker::createObject(const std::str
 }
 
 void Broker::deleteObject(const std::string& type, const std::string& name,
-                          const Variant::Map& options, const ConnectionIdentity* context)
+                          const Variant::Map& options, const Connection* context)
 {
     std::string userId;
     std::string connectionId;
     if (context) {
         userId = context->getUserId();
-        connectionId = context->getUrl();
+        connectionId = context->getMgmtId();
     }
     QPID_LOG (debug, "Broker::delete(" << type << ", " << name << "," << options << ")");
     if (objectFactory.deleteObject(*this, type, name, options, userId, connectionId)) {
@@ -952,13 +952,13 @@ void Broker::checkDeleteQueue(Queue::sha
 Manageable::status_t Broker::queryObject(const std::string& type,
                                          const std::string& name,
                                          Variant::Map& results,
-                                         const ConnectionIdentity* context)
+                                         const Connection* context)
 {
     std::string userId;
     std::string connectionId;
     if (context) {
         userId = context->getUserId();
-        connectionId = context->getUrl();
+        connectionId = context->getMgmtId();
     }
     QPID_LOG (debug, "Broker::query(" << type << ", " << name << ")");
 
@@ -994,7 +994,7 @@ Manageable::status_t Broker::queryQueue(
 }
 
 Manageable::status_t Broker::getTimestampConfig(bool& receive,
-                                                const ConnectionIdentity* context)
+                                                const Connection* context)
 {
     std::string name;   // none needed for broker
     std::string userId = context->getUserId();
@@ -1006,7 +1006,7 @@ Manageable::status_t Broker::getTimestam
 }
 
 Manageable::status_t Broker::setTimestampConfig(const bool receive,
-                                                const ConnectionIdentity* context)
+                                                const Connection* context)
 {
     std::string name;   // none needed for broker
     std::string userId = context->getUserId();

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h?rev=1496466&r1=1496465&r2=1496466&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h Tue Jun 25 13:28:15 2013
@@ -149,20 +149,20 @@ class Broker : public sys::Runnable, pub
     void setLogHiresTimestamp(bool enabled);
     bool getLogHiresTimestamp();
     void createObject(const std::string& type, const std::string& name,
-                      const qpid::types::Variant::Map& properties, bool strict, const ConnectionIdentity* context);
+                      const qpid::types::Variant::Map& properties, bool strict, const Connection* context);
     void deleteObject(const std::string& type, const std::string& name,
-                      const qpid::types::Variant::Map& options, const ConnectionIdentity* context);
+                      const qpid::types::Variant::Map& options, const Connection* context);
     void checkDeleteQueue(boost::shared_ptr<Queue> queue, bool ifUnused, bool ifEmpty);
     Manageable::status_t queryObject(const std::string& type, const std::string& name,
-                                     qpid::types::Variant::Map& results, const ConnectionIdentity* context);
+                                     qpid::types::Variant::Map& results, const Connection* context);
     Manageable::status_t queryQueue( const std::string& name,
                                      const std::string& userId,
                                      const std::string& connectionId,
                                      qpid::types::Variant::Map& results);
     Manageable::status_t getTimestampConfig(bool& receive,
-                                            const ConnectionIdentity* context);
+                                            const Connection* context);
     Manageable::status_t setTimestampConfig(const bool receive,
-                                            const ConnectionIdentity* context);
+                                            const Connection* context);
     Manageable::status_t queueRedirect(const std::string& srcQueue, const std::string& tgtQueue);
     void queueRedirectDestroy(boost::shared_ptr<Queue> srcQ, boost::shared_ptr<Queue> tgtQ, bool moveMsgs);
     boost::shared_ptr<sys::Poller> poller;

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Connection.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Connection.h?rev=1496466&r1=1496465&r2=1496466&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Connection.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Connection.h Tue Jun 25 13:28:15 2013
@@ -21,215 +21,35 @@
  * under the License.
  *
  */
-
-#include <memory>
-#include <sstream>
-#include <vector>
-#include <queue>
-
-#include "qpid/broker/BrokerImportExport.h"
-
-#include "qpid/broker/ConnectionHandler.h"
-#include "qpid/broker/ConnectionIdentity.h"
-#include "qpid/broker/OwnershipToken.h"
-#include "qpid/management/Manageable.h"
-#include "qpid/sys/AggregateOutput.h"
-#include "qpid/sys/ConnectionInputHandler.h"
-#include "qpid/sys/SecuritySettings.h"
-#include "qpid/sys/Mutex.h"
-#include "qpid/RefCounted.h"
-#include "qpid/Url.h"
-#include "qpid/ptr_map.h"
-
-#include "qmf/org/apache/qpid/broker/Connection.h"
-
-#include <boost/ptr_container/ptr_map.hpp>
-#include <boost/scoped_ptr.hpp>
-#include <boost/bind.hpp>
-
-#include <algorithm>
+#include <map>
+#include <string>
 
 namespace qpid {
-namespace sys {
-class ConnectionOutputHandler;
-class Timer;
-class TimerTask;
+namespace management {
+class ObjectId;
+}
+namespace types {
+class Variant;
 }
-namespace broker {
-
-class Broker;
-class LinkRegistry;
-class Queue;
-class SecureConnection;
-class SessionHandler;
-struct ConnectionTimeoutTask;
-
-class Connection : public sys::ConnectionInputHandler, public ConnectionIdentity,
-                   public OwnershipToken, public management::Manageable,
-                   public RefCounted
-{
-  public:
-    uint32_t getFrameMax() const { return framemax; }
-    uint16_t getHeartbeat() const { return heartbeat; }
-    uint16_t getHeartbeatMax() const { return heartbeatmax; }
-
-    void setFrameMax(uint32_t fm) { framemax = std::max(fm, (uint32_t) 4096); }
-    void setHeartbeat(uint16_t hb) { heartbeat = hb; }
-    void setHeartbeatMax(uint16_t hbm) { heartbeatmax = hbm; }
-
-    void setUrl(const std::string& _url) { url = _url; }
-
-    const OwnershipToken* getOwnership() const { return this; };
-    const management::ObjectId getObjectId() const { return GetManagementObject()->getObjectId(); };
-    const std::string& getUserId() const { return userId; }
-    const std::string& getUrl() const { return url; }
-
-    void setUserProxyAuth(const bool b);
-    bool isUserProxyAuth() const { return userProxyAuth || federationPeerTag.size() > 0; } // links can proxy msgs with non-matching auth ids
-    bool isFederationLink() const { return federationPeerTag.size() > 0; }
-    void setFederationPeerTag(const std::string& tag) { federationPeerTag = std::string(tag); }
-    const std::string& getFederationPeerTag() const { return federationPeerTag; }
-    std::vector<Url>& getKnownHosts() { return knownHosts; }
-
-    /**@return true if user is the authenticated user on this connection.
-     * If id has the default realm will also compare plain username.
-     */
-    bool isAuthenticatedUser(const std::string& id) const {
-        return (id == userId || (isDefaultRealm && id == userName));
-    }
-
-    Broker& getBroker() { return broker; }
-
-    sys::ConnectionOutputHandler& getOutput() { return *out; }
-    void activateOutput();
-    void addOutputTask(OutputTask*);
-    void removeOutputTask(OutputTask*);
-    framing::ProtocolVersion getVersion() const { return version; }
-
-    Connection(sys::ConnectionOutputHandler* out,
-               Broker& broker,
-               const std::string& mgmtId,
-               const qpid::sys::SecuritySettings&,
-               bool isLink = false,
-               uint64_t objectId = 0);
-
-    ~Connection ();
-
-    /** Get the SessionHandler for channel. Create if it does not already exist */
-    SessionHandler& getChannel(framing::ChannelId channel);
-
-    /** Close the connection. Waits for the client to respond with close-ok
-     * before actually destroying the connection.
-     */
-    QPID_BROKER_EXTERN void close(
-        framing::connection::CloseCode code, const std::string& text);
-
-    /** Abort the connection. Close abruptly and immediately. */
-    QPID_BROKER_EXTERN void abort();
-
-    // ConnectionInputHandler methods
-    void received(framing::AMQFrame& frame);
-    bool doOutput();
-    void closed();
-
-    void closeChannel(framing::ChannelId channel);
-
-    // Manageable entry points
-    management::ManagementObject::shared_ptr GetManagementObject(void) const;
-    management::Manageable::status_t
-        ManagementMethod (uint32_t methodId, management::Args& args, std::string&);
-
-    void requestIOProcessing (boost::function0<void>);
-    void recordFromServer (const framing::AMQFrame& frame);
-    void recordFromClient (const framing::AMQFrame& frame);
-
-    // gets for configured federation links
-    std::string getAuthMechanism();
-    std::string getAuthCredentials();
-    std::string getUsername();
-    std::string getPassword();
-    std::string getHost();
-    uint16_t    getPort();
-
-    void notifyConnectionForced(const std::string& text);
-    void setUserId(const std::string& uid);
-
-    // credentials for connected client
-    const std::string& getMgmtId() const { return mgmtId; }
-    management::ManagementAgent* getAgent() const { return agent; }
-
-    void setHeartbeatInterval(uint16_t heartbeat);
-    void sendHeartbeat();
-    void restartTimeout();
-
-    void setSecureConnection(SecureConnection* secured);
-
-    const qpid::sys::SecuritySettings& getExternalSecuritySettings() const
-    {
-        return securitySettings;
-    }
-
-    /** @return true if the initial connection negotiation is complete. */
-    bool isOpen();
-
-    bool isLink() { return link; }
-    void startLinkHeartbeatTimeoutTask();
-
-    void setClientProperties(const framing::FieldTable& cp) { clientProperties = cp; }
-    const framing::FieldTable& getClientProperties() const { return clientProperties; }
-
-  private:
-    // Management object is used in the constructor so must be early
-    qmf::org::apache::qpid::broker::Connection::shared_ptr mgmtObject;
-
-    //contained output tasks
-    sys::AggregateOutput outputTasks;
-
-    boost::scoped_ptr<framing::FrameHandler> outboundTracker;
-    boost::scoped_ptr<sys::ConnectionOutputHandler> out;
-
-    Broker& broker;
-
-    framing::ProtocolVersion version;
-    uint32_t framemax;
-    uint16_t heartbeat;
-    uint16_t heartbeatmax;
-    std::string userId;
-    std::string url;
-    bool userProxyAuth;
-    std::string federationPeerTag;
-    std::vector<Url> knownHosts;
-    std::string userName;
-    bool isDefaultRealm;
-
-    typedef boost::ptr_map<framing::ChannelId, SessionHandler> ChannelMap;
-
-    ChannelMap channels;
-    qpid::sys::SecuritySettings securitySettings;
-    const bool link;
-    ConnectionHandler adapter;
-    bool mgmtClosing;
-    const std::string mgmtId;
-    sys::Mutex ioCallbackLock;
-    std::queue<boost::function0<void> > ioCallbacks;
-    LinkRegistry& links;
-    management::ManagementAgent* agent;
-    sys::Timer& timer;
-    boost::intrusive_ptr<sys::TimerTask> heartbeatTimer, linkHeartbeatTimer;
-    boost::intrusive_ptr<ConnectionTimeoutTask> timeoutTimer;
-    uint64_t objectId;
-    framing::FieldTable clientProperties;
-
-friend class OutboundFrameTracker;
 
-    void sent(const framing::AMQFrame& f);
-    void doIoCallbacks();
+namespace broker {
 
-  public:
+class OwnershipToken;
 
-    qmf::org::apache::qpid::broker::Connection::shared_ptr getMgmtObject() { return mgmtObject; }
+/**
+ * Protocol independent connection abstraction.
+ */
+class Connection {
+public:
+    virtual ~Connection() {}
+    virtual const OwnershipToken* getOwnership() const = 0;
+    virtual const management::ObjectId getObjectId() const = 0;
+    virtual const std::string& getUserId() const = 0;
+    virtual const std::string& getMgmtId() const = 0;
+    virtual const std::map<std::string, types::Variant>& getClientProperties() const = 0;
+    virtual bool isLink() const = 0;
+    virtual void abort() = 0;
 };
-
-}}
+}} // namespace qpid::broker
 
 #endif  /*!QPID_BROKER_CONNECTION_H*/

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/ConnectionHandler.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/ConnectionHandler.cpp?rev=1496466&r1=1496465&r2=1496466&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/ConnectionHandler.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/ConnectionHandler.cpp Tue Jun 25 13:28:15 2013
@@ -24,7 +24,7 @@
 
 #include "qpid/SaslFactory.h"
 #include "qpid/broker/Broker.h"
-#include "qpid/broker/Connection.h"
+#include "qpid/broker/amqp_0_10/Connection.h"
 #include "qpid/broker/SecureConnection.h"
 #include "qpid/Url.h"
 #include "qpid/framing/AllInvoker.h"
@@ -109,10 +109,10 @@ void ConnectionHandler::setSecureConnect
     handler->secured = secured;
 }
 
-ConnectionHandler::ConnectionHandler(Connection& connection, bool isClient)  :
+ConnectionHandler::ConnectionHandler(amqp_0_10::Connection& connection, bool isClient)  :
     handler(new Handler(connection, isClient)) {}
 
-ConnectionHandler::Handler::Handler(Connection& c, bool isClient) :
+ConnectionHandler::Handler::Handler(amqp_0_10::Connection& c, bool isClient) :
     proxy(c.getOutput()),
     connection(c), serverMode(!isClient), secured(0),
     isOpen(false)
@@ -153,14 +153,14 @@ void ConnectionHandler::Handler::startOk
 {
     const framing::FieldTable& clientProperties = body.getClientProperties();
     qmf::org::apache::qpid::broker::Connection::shared_ptr mgmtObject = connection.getMgmtObject();
+    types::Variant::Map properties;
+    qpid::amqp_0_10::translate(clientProperties, properties);
 
     if (mgmtObject != 0) {
         string procName = clientProperties.getAsString(CLIENT_PROCESS_NAME);
         uint32_t pid = clientProperties.getAsInt(CLIENT_PID);
         uint32_t ppid = clientProperties.getAsInt(CLIENT_PPID);
 
-        types::Variant::Map properties;
-        qpid::amqp_0_10::translate(clientProperties, properties);
         mgmtObject->set_remoteProperties(properties);
         if (!procName.empty())
             mgmtObject->set_remoteProcessName(procName);
@@ -192,7 +192,7 @@ void ConnectionHandler::Handler::startOk
         throw;
     }
 
-    connection.setClientProperties(clientProperties);
+    connection.setClientProperties(properties);
     if (clientProperties.isSet(QPID_FED_TAG)) {
         connection.setFederationPeerTag(clientProperties.getAsString(QPID_FED_TAG));
     }

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/ConnectionHandler.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/ConnectionHandler.h?rev=1496466&r1=1496465&r2=1496466&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/ConnectionHandler.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/ConnectionHandler.h Tue Jun 25 13:28:15 2013
@@ -47,7 +47,9 @@ struct SecuritySettings;
 
 namespace broker {
 
+namespace amqp_0_10 {
 class Connection;
+}
 class SecureConnection;
 
 class ConnectionHandler : public framing::FrameHandler
@@ -55,13 +57,13 @@ class ConnectionHandler : public framing
     struct Handler : public framing::AMQP_AllOperations::ConnectionHandler
     {
         framing::AMQP_AllProxy::Connection proxy;
-        Connection& connection;
+        amqp_0_10::Connection& connection;
         bool serverMode;
         std::auto_ptr<SaslAuthenticator> authenticator;
         SecureConnection* secured;
         bool isOpen;
 
-        Handler(Connection& connection, bool isClient);
+        Handler(amqp_0_10::Connection& connection, bool isClient);
         ~Handler();
         void startOk(const qpid::framing::ConnectionStartOkBody& body);
         void startOk(const qpid::framing::FieldTable& clientProperties,
@@ -99,7 +101,7 @@ class ConnectionHandler : public framing
 
     bool handle(const qpid::framing::AMQMethodBody& method);
   public:
-    ConnectionHandler(Connection& connection, bool isClient );
+    ConnectionHandler(amqp_0_10::Connection& connection, bool isClient );
     void close(framing::connection::CloseCode code, const std::string& text);
     void heartbeat();
     void handle(framing::AMQFrame& frame);

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/HandlerImpl.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/HandlerImpl.h?rev=1496466&r1=1496465&r2=1496466&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/HandlerImpl.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/HandlerImpl.h Tue Jun 25 13:28:15 2013
@@ -21,7 +21,7 @@
 
 #include "qpid/broker/SemanticState.h"
 #include "qpid/broker/SessionContext.h"
-#include "qpid/broker/Connection.h"
+#include "qpid/broker/amqp_0_10/Connection.h"
 
 namespace qpid {
 namespace broker {
@@ -40,7 +40,7 @@ class HandlerImpl {
     HandlerImpl(SemanticState& s) : state(s), session(s.getSession()) {}
 
     framing::AMQP_ClientProxy& getProxy() { return session.getProxy(); }
-    Connection& getConnection() { return session.getConnection(); }
+    amqp_0_10::Connection& getConnection() { return session.getConnection(); }
     Broker& getBroker() { return session.getConnection().getBroker(); }
 };
 

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Link.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Link.cpp?rev=1496466&r1=1496465&r2=1496466&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Link.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Link.cpp Tue Jun 25 13:28:15 2013
@@ -22,7 +22,7 @@
 #include "qpid/broker/Link.h"
 #include "qpid/broker/LinkRegistry.h"
 #include "qpid/broker/Broker.h"
-#include "qpid/broker/Connection.h"
+#include "qpid/broker/amqp_0_10/Connection.h"
 #include "qpid/sys/Timer.h"
 #include "qmf/org/apache/qpid/broker/EventBrokerLinkUp.h"
 #include "qmf/org/apache/qpid/broker/EventBrokerLinkDown.h"
@@ -233,7 +233,7 @@ void Link::startConnectionLH ()
     }
 }
 
-void Link::established(Connection* c)
+void Link::established(qpid::broker::amqp_0_10::Connection* c)
 {
     stringstream addr;
     addr << host << ":" << port;

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Link.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Link.h?rev=1496466&r1=1496465&r2=1496466&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Link.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Link.h Tue Jun 25 13:28:15 2013
@@ -45,8 +45,10 @@ namespace broker {
 
 class LinkRegistry;
 class Broker;
-class Connection;
 class LinkExchange;
+namespace amqp_0_10 {
+class Connection;
+}
 
 class Link : public PersistableConfig, public management::Manageable {
   private:
@@ -83,7 +85,7 @@ class Link : public PersistableConfig, p
     Bridges cancellations;    // Bridges pending cancellation
     framing::ChannelId nextFreeChannel;
     RangeSet<framing::ChannelId> freeChannels;
-    Connection* connection;
+    amqp_0_10::Connection* connection;
     management::ManagementAgent* agent;
     boost::function<void(Link*)> listener;
     boost::intrusive_ptr<sys::TimerTask> timerTask;
@@ -109,7 +111,7 @@ class Link : public PersistableConfig, p
     void reconnectLH(const Address&); //called by LinkRegistry
 
     // connection management (called by LinkRegistry)
-    void established(Connection*); // Called when connection is created
+    void established(amqp_0_10::Connection*); // Called when connection is created
     void opened();      // Called when connection is open (after create)
     void closed(int, std::string);   // Called when connection goes away
     void notifyConnectionForced(const std::string text);
@@ -194,7 +196,7 @@ class Link : public PersistableConfig, p
     /** The current connction for this link. Note returns 0 if the link is not
      * presently connected.
      */
-    Connection* getConnection() { return connection; }
+    amqp_0_10::Connection* getConnection() { return connection; }
 };
 }
 }

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/LinkRegistry.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/LinkRegistry.cpp?rev=1496466&r1=1496465&r2=1496466&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/LinkRegistry.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/LinkRegistry.cpp Tue Jun 25 13:28:15 2013
@@ -21,7 +21,7 @@
 #include "qpid/broker/LinkRegistry.h"
 
 #include "qpid/broker/Broker.h"
-#include "qpid/broker/Connection.h"
+#include "qpid/broker/amqp_0_10/Connection.h"
 #include "qpid/broker/Link.h"
 #include "qpid/log/Statement.h"
 #include <iostream>
@@ -53,10 +53,26 @@ class LinkRegistryConnectionObserver : p
     LinkRegistry& links;
   public:
     LinkRegistryConnectionObserver(LinkRegistry& l) : links(l) {}
-    void connection(Connection& c) { links.notifyConnection(c.getMgmtId(), &c); }
-    void opened(Connection& c) { links.notifyOpened(c.getMgmtId()); }
-    void closed(Connection& c) { links.notifyClosed(c.getMgmtId()); }
-    void forced(Connection& c, const string& text) { links.notifyConnectionForced(c.getMgmtId(), text); }
+    void connection(Connection& in)
+    {
+        amqp_0_10::Connection* c = dynamic_cast<amqp_0_10::Connection*>(&in);
+        if (c) links.notifyConnection(c->getMgmtId(), c);
+    }
+    void opened(Connection& in)
+    {
+        amqp_0_10::Connection* c = dynamic_cast<amqp_0_10::Connection*>(&in);
+        if (c) links.notifyOpened(c->getMgmtId());
+    }
+    void closed(Connection& in)
+    {
+        amqp_0_10::Connection* c = dynamic_cast<amqp_0_10::Connection*>(&in);
+        if (c) links.notifyClosed(c->getMgmtId());
+    }
+    void forced(Connection& in, const string& text)
+    {
+        amqp_0_10::Connection* c = dynamic_cast<amqp_0_10::Connection*>(&in);
+        if (c) links.notifyConnectionForced(c->getMgmtId(), text);
+    }
 };
 
 LinkRegistry::LinkRegistry (Broker* _broker) :
@@ -287,7 +303,7 @@ Link::shared_ptr LinkRegistry::findLink(
     return Link::shared_ptr();
 }
 
-void LinkRegistry::notifyConnection(const std::string& key, Connection* c)
+void LinkRegistry::notifyConnection(const std::string& key, amqp_0_10::Connection* c)
 {
     // find a link that is attempting to connect to the remote, and
     // create a mapping from connection id to link

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/LinkRegistry.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/LinkRegistry.h?rev=1496466&r1=1496465&r2=1496466&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/LinkRegistry.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/LinkRegistry.h Tue Jun 25 13:28:15 2013
@@ -35,10 +35,11 @@
 
 namespace qpid {
 namespace broker {
-
+namespace amqp_0_10 {
+    class Connection;
+}
     class Link;
     class Broker;
-    class Connection;
     class LinkRegistry {
         typedef std::map<std::string, boost::shared_ptr<Link> > LinkMap;
         typedef std::map<std::string, Bridge::shared_ptr> BridgeMap;
@@ -58,7 +59,7 @@ namespace broker {
         boost::shared_ptr<Link> findLink(const std::string& key);
 
         // Methods called by the connection observer, key is connection identifier
-        void notifyConnection (const std::string& key, Connection* c);
+        void notifyConnection (const std::string& key, amqp_0_10::Connection* c);
         void notifyOpened     (const std::string& key);
         void notifyClosed     (const std::string& key);
         void notifyConnectionForced    (const std::string& key, const std::string& text);

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Message.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Message.cpp?rev=1496466&r1=1496465&r2=1496466&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Message.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Message.cpp Tue Jun 25 13:28:15 2013
@@ -23,7 +23,8 @@
 
 #include "qpid/amqp/CharSequence.h"
 #include "qpid/amqp/MapHandler.h"
-#include "qpid/broker/ConnectionIdentity.h"
+#include "qpid/broker/Connection.h"
+#include "qpid/broker/OwnershipToken.h"
 #include "qpid/management/ManagementObject.h"
 #include "qpid/management/Manageable.h"
 #include "qpid/StringUtils.h"
@@ -203,10 +204,10 @@ uint8_t Message::getPriority() const
 bool Message::getIsManagementMessage() const { return isManagementMessage; }
 void Message::setIsManagementMessage(bool b) { isManagementMessage = b; }
 
-const OwnershipToken* Message::getPublisherOwnership() const {  return publisher->getOwnership(); }
-const management::ObjectId Message::getPublisherObjectId() const { return publisher->getObjectId(); }
-const std::string& Message::getPublisherUserId() const { return publisher->getUserId(); }
-const std::string& Message::getPublisherUrl() const { return publisher->getUrl(); }
+const Connection* Message::getPublisher() const { return publisher; }
+void Message::setPublisher(const Connection& p) { publisher = &p; }
+bool Message::isLocalTo(const OwnershipToken* token) const { return token && publisher && token->isLocal(publisher->getOwnership()); }
+
 
 qpid::framing::SequenceNumber Message::getSequence() const
 {

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Message.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Message.h?rev=1496466&r1=1496465&r2=1496466&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Message.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Message.h Tue Jun 25 13:28:15 2013
@@ -47,7 +47,7 @@ class Manageable;
 
 namespace broker {
 class OwnershipToken;
-class ConnectionIdentity;
+class Connection;
 
 enum MessageState
 {
@@ -85,12 +85,9 @@ public:
     int getDeliveryCount() const { return deliveryCount; }
     void resetDeliveryCount() { deliveryCount = -1; }
 
-    void setPublisher(const ConnectionIdentity& p) { publisher = &p; }
-    const ConnectionIdentity& getPublisher() const { return *publisher; }
-    const OwnershipToken* getPublisherOwnership() const;
-    const management::ObjectId getPublisherObjectId() const;
-    const std::string& getPublisherUserId() const;
-    const std::string& getPublisherUrl() const;
+    void setPublisher(const Connection& p);
+    const Connection* getPublisher() const;
+    bool isLocalTo(const OwnershipToken*) const;
 
     QPID_BROKER_EXTERN std::string getRoutingKey() const;
     QPID_BROKER_EXTERN bool isPersistent() const;
@@ -148,7 +145,7 @@ public:
     boost::intrusive_ptr<Encoding> encoding;
     boost::intrusive_ptr<PersistableMessage> persistentContext;
     int deliveryCount;
-    const ConnectionIdentity* publisher;
+    const Connection* publisher;
     qpid::sys::AbsTime expiration;
     boost::intrusive_ptr<ExpiryPolicy> expiryPolicy;
     uint64_t timestamp;

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp?rev=1496466&r1=1496465&r2=1496466&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp Tue Jun 25 13:28:15 2013
@@ -220,18 +220,13 @@ Queue::~Queue()
 {
 }
 
-bool isLocalTo(const OwnershipToken* token, const Message& msg)
-{
-    return token && token->isLocal(msg.getPublisherOwnership());
-}
-
 bool Queue::isLocal(const Message& msg)
 {
     //message is considered local if it was published on the same
     //connection as that of the session which declared this queue
     //exclusive (owner) or which has an exclusive subscription
     //(exclusive)
-    return settings.noLocal && (isLocalTo(owner, msg) || isLocalTo(exclusive, msg));
+    return settings.noLocal && (msg.isLocalTo(owner) || msg.isLocalTo(exclusive));
 }
 
 bool Queue::isExcluded(const Message& msg)

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/SaslAuthenticator.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/SaslAuthenticator.cpp?rev=1496466&r1=1496465&r2=1496466&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/SaslAuthenticator.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/SaslAuthenticator.cpp Tue Jun 25 13:28:15 2013
@@ -21,7 +21,7 @@
 
 #include "qpid/broker/AclModule.h"
 #include "qpid/broker/Broker.h"
-#include "qpid/broker/Connection.h"
+#include "qpid/broker/amqp_0_10/Connection.h"
 #include "qpid/framing/reply_exceptions.h"
 #include "qpid/framing/FieldValue.h"
 #include "qpid/log/Statement.h"
@@ -54,12 +54,12 @@ namespace broker {
 
 class NullAuthenticator : public SaslAuthenticator
 {
-    Connection& connection;
+    amqp_0_10::Connection& connection;
     framing::AMQP_ClientProxy::Connection client;
     std::string realm;
     const bool encrypt;
 public:
-    NullAuthenticator(Connection& connection, bool encrypt);
+    NullAuthenticator(amqp_0_10::Connection& connection, bool encrypt);
     ~NullAuthenticator();
     void getMechanisms(framing::Array& mechanisms);
     void start(const std::string& mechanism, const std::string* response);
@@ -74,7 +74,7 @@ public:
 class CyrusAuthenticator : public SaslAuthenticator
 {
     sasl_conn_t *sasl_conn;
-    Connection& connection;
+    amqp_0_10::Connection& connection;
     framing::AMQP_ClientProxy::Connection client;
     const bool encrypt;
 
@@ -82,7 +82,7 @@ class CyrusAuthenticator : public SaslAu
     bool getUsername(std::string& uid);
 
 public:
-    CyrusAuthenticator(Connection& connection, bool encrypt);
+    CyrusAuthenticator(amqp_0_10::Connection& connection, bool encrypt);
     ~CyrusAuthenticator();
     void init();
     void getMechanisms(framing::Array& mechanisms);
@@ -167,7 +167,7 @@ void SaslAuthenticator::fini(void)
 
 #endif
 
-std::auto_ptr<SaslAuthenticator> SaslAuthenticator::createAuthenticator(Connection& c )
+std::auto_ptr<SaslAuthenticator> SaslAuthenticator::createAuthenticator(amqp_0_10::Connection& c )
 {
     if (c.getBroker().getOptions().auth) {
         return std::auto_ptr<SaslAuthenticator>(
@@ -179,7 +179,7 @@ std::auto_ptr<SaslAuthenticator> SaslAut
 }
 
 
-NullAuthenticator::NullAuthenticator(Connection& c, bool e) : connection(c), client(c.getOutput()),
+NullAuthenticator::NullAuthenticator(amqp_0_10::Connection& c, bool e) : connection(c), client(c.getOutput()),
                                                               realm(c.getBroker().getOptions().realm), encrypt(e) {}
 NullAuthenticator::~NullAuthenticator() {}
 
@@ -246,7 +246,7 @@ std::auto_ptr<SecurityLayer> NullAuthent
 
 #if HAVE_SASL
 
-CyrusAuthenticator::CyrusAuthenticator(Connection& c, bool _encrypt) :
+CyrusAuthenticator::CyrusAuthenticator(amqp_0_10::Connection& c, bool _encrypt) :
     sasl_conn(0), connection(c), client(c.getOutput()), encrypt(_encrypt)
 {
     init();

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/SaslAuthenticator.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/SaslAuthenticator.h?rev=1496466&r1=1496465&r2=1496466&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/SaslAuthenticator.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/SaslAuthenticator.h Tue Jun 25 13:28:15 2013
@@ -34,7 +34,9 @@
 namespace qpid {
 namespace broker {
 
+namespace amqp_0_10 {
 class Connection;
+}
 
 class SaslAuthenticator
 {
@@ -54,7 +56,7 @@ public:
     static void init(const std::string& saslName, std::string const & saslConfigPath );
     static void fini(void);
 
-    static std::auto_ptr<SaslAuthenticator> createAuthenticator(Connection& connection);
+    static std::auto_ptr<SaslAuthenticator> createAuthenticator(amqp_0_10::Connection& connection);
 
     virtual void callUserIdCallbacks() { }
 };

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/SecureConnectionFactory.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/SecureConnectionFactory.cpp?rev=1496466&r1=1496465&r2=1496466&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/SecureConnectionFactory.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/SecureConnectionFactory.cpp Tue Jun 25 13:28:15 2013
@@ -22,7 +22,7 @@
 
 #include "qpid/amqp_0_10/Connection.h"
 #include "qpid/broker/Broker.h"
-#include "qpid/broker/Connection.h"
+#include "qpid/broker/amqp_0_10/Connection.h"
 #include "qpid/broker/SecureConnection.h"
 #include "qpid/framing/ProtocolVersion.h"
 #include "qpid/log/Statement.h"
@@ -35,7 +35,7 @@ using framing::ProtocolVersion;
 using qpid::sys::SecuritySettings;
 typedef std::auto_ptr<qpid::amqp_0_10::Connection> CodecPtr;
 typedef std::auto_ptr<SecureConnection> SecureConnectionPtr;
-typedef std::auto_ptr<Connection> ConnectionPtr;
+typedef std::auto_ptr<qpid::broker::amqp_0_10::Connection> ConnectionPtr;
 typedef std::auto_ptr<sys::ConnectionInputHandler> InputPtr;
 
 SecureConnectionFactory::SecureConnectionFactory(Broker& b) : broker(b) {}
@@ -64,7 +64,7 @@ SecureConnectionFactory::create_0_10(sys
 {
     SecureConnectionPtr sc(new SecureConnection());
     CodecPtr c(new qpid::amqp_0_10::Connection(out, id, brokerActsAsClient));
-    ConnectionPtr i(new broker::Connection(c.get(), broker, id, external, brokerActsAsClient));
+    ConnectionPtr i(new broker::amqp_0_10::Connection(c.get(), broker, id, external, brokerActsAsClient));
     i->setSecureConnection(sc.get());
     c->setInputHandler(InputPtr(i.release()));
     sc->setCodec(std::auto_ptr<sys::ConnectionCodec>(c));

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp?rev=1496466&r1=1496465&r2=1496466&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp Tue Jun 25 13:28:15 2013
@@ -22,7 +22,7 @@
 #include "qpid/broker/SessionState.h"
 
 #include "qpid/broker/Broker.h"
-#include "qpid/broker/Connection.h"
+#include "qpid/broker/amqp_0_10/Connection.h"
 #include "qpid/broker/DeliverableMessage.h"
 #include "qpid/broker/DtxAck.h"
 #include "qpid/broker/DtxTimeout.h"
@@ -83,7 +83,7 @@ SemanticState::SemanticState(SessionStat
       authMsg(getSession().getBroker().getOptions().auth && !getSession().getConnection().isUserProxyAuth()),
       userID(getSession().getConnection().getUserId()),
       closeComplete(false),
-      connectionId(getSession().getConnection().getUrl())
+      connectionId(getSession().getConnection().getMgmtId())
 {}
 
 SemanticState::~SemanticState() {

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/SessionAdapter.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/SessionAdapter.cpp?rev=1496466&r1=1496465&r2=1496466&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/SessionAdapter.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/SessionAdapter.cpp Tue Jun 25 13:28:15 2013
@@ -18,7 +18,7 @@
 #include "qpid/broker/SessionAdapter.h"
 
 #include "qpid/broker/Broker.h"
-#include "qpid/broker/Connection.h"
+#include "qpid/broker/amqp_0_10/Connection.h"
 #include "qpid/broker/DtxTimeout.h"
 #include "qpid/broker/Queue.h"
 #include "qpid/Exception.h"
@@ -96,14 +96,14 @@ void SessionAdapter::ExchangeHandlerImpl
         try{
             std::pair<Exchange::shared_ptr, bool> response =
                 getBroker().createExchange(exchange, type, durable, alternateExchange, args,
-                                           getConnection().getUserId(), getConnection().getUrl());
+                                           getConnection().getUserId(), getConnection().getMgmtId());
             if (!response.second) {
                 //exchange already there, not created
                 checkType(response.first, type);
                 checkAlternate(response.first, alternate);
                 QPID_LOG_CAT(debug, model, "Create exchange. name:" << exchange
                     << " user:" << getConnection().getUserId()
-                    << " rhost:" << getConnection().getUrl()
+                    << " rhost:" << getConnection().getMgmtId()
                     << " type:" << type
                     << " alternateExchange:" << alternateExchange
                     << " durable:" << (durable ? "T" : "F"));
@@ -134,7 +134,7 @@ void SessionAdapter::ExchangeHandlerImpl
 void SessionAdapter::ExchangeHandlerImpl::delete_(const string& name, bool /*ifUnused*/)
 {
     //TODO: implement if-unused
-    getBroker().deleteExchange(name, getConnection().getUserId(), getConnection().getUrl());
+    getBroker().deleteExchange(name, getConnection().getUserId(), getConnection().getMgmtId());
 }
 
 ExchangeQueryResult SessionAdapter::ExchangeHandlerImpl::query(const string& name)
@@ -156,7 +156,7 @@ void SessionAdapter::ExchangeHandlerImpl
                                                const FieldTable& arguments)
 {
     getBroker().bind(queueName, exchangeName, routingKey, arguments,
-                     getConnection().getUserId(), getConnection().getUrl());
+                     getConnection().getUserId(), getConnection().getMgmtId());
     state.addBinding(queueName, exchangeName, routingKey, arguments);
 }
 
@@ -166,7 +166,7 @@ void SessionAdapter::ExchangeHandlerImpl
 {
     state.removeBinding(queueName, exchangeName, routingKey);
     getBroker().unbind(queueName, exchangeName, routingKey,
-                       getConnection().getUserId(), getConnection().getUrl());
+                       getConnection().getUserId(), getConnection().getMgmtId());
 }
 
 ExchangeBoundResult SessionAdapter::ExchangeHandlerImpl::bound(const std::string& exchangeName,
@@ -209,7 +209,7 @@ ExchangeBoundResult SessionAdapter::Exch
 SessionAdapter::QueueHandlerImpl::QueueHandlerImpl(SemanticState& session)
     : HandlerHelper(session), broker(getBroker()),
       //record connection id and userid for deleting exclsuive queues after session has ended:
-      connectionId(getConnection().getUrl()), userId(getConnection().getUserId())
+      connectionId(getConnection().getMgmtId()), userId(getConnection().getUserId())
 {}
 
 
@@ -302,7 +302,7 @@ void SessionAdapter::QueueHandlerImpl::d
                                     exclusive ? &session : 0,
                                     alternateExchange,
                                     getConnection().getUserId(),
-                                    getConnection().getUrl());
+                                    getConnection().getMgmtId());
         queue = queue_created.first;
         assert(queue);
         if (queue_created.second) { // This is a new queue
@@ -316,7 +316,7 @@ void SessionAdapter::QueueHandlerImpl::d
             }
             QPID_LOG_CAT(debug, model, "Create queue. name:" << name
                 << " user:" << getConnection().getUserId()
-                << " rhost:" << getConnection().getUrl()
+                << " rhost:" << getConnection().getMgmtId()
                 << " durable:" << (durable ? "T" : "F")
                 << " exclusive:" << (exclusive ? "T" : "F")
                 << " autodelete:" << (autoDelete ? "T" : "F")
@@ -363,7 +363,7 @@ void SessionAdapter::QueueHandlerImpl::c
 
 void SessionAdapter::QueueHandlerImpl::delete_(const string& queue, bool ifUnused, bool ifEmpty)
 {
-    getBroker().deleteQueue(queue, getConnection().getUserId(), getConnection().getUrl(),
+    getBroker().deleteQueue(queue, getConnection().getUserId(), getConnection().getMgmtId(),
                             boost::bind(&SessionAdapter::QueueHandlerImpl::checkDelete, this, _1, ifUnused, ifEmpty));
 }
 
@@ -429,12 +429,12 @@ SessionAdapter::MessageHandlerImpl::subs
 
     ManagementAgent* agent = getBroker().getManagementAgent();
     if (agent)
-        agent->raiseEvent(_qmf::EventSubscribe(getConnection().getUrl(), getConnection().getUserId(),
+        agent->raiseEvent(_qmf::EventSubscribe(getConnection().getMgmtId(), getConnection().getUserId(),
                                                queueName, destination, exclusive, ManagementAgent::toMap(arguments)));
     QPID_LOG_CAT(debug, model, "Create subscription. queue:" << queueName
         << " destination:" << destination
         << " user:" << getConnection().getUserId()
-        << " rhost:" << getConnection().getUrl()
+        << " rhost:" << getConnection().getMgmtId()
         << " exclusive:" << (exclusive ? "T" : "F")
     );
 }
@@ -448,10 +448,10 @@ SessionAdapter::MessageHandlerImpl::canc
 
     ManagementAgent* agent = getBroker().getManagementAgent();
     if (agent)
-        agent->raiseEvent(_qmf::EventUnsubscribe(getConnection().getUrl(), getConnection().getUserId(), destination));
+        agent->raiseEvent(_qmf::EventUnsubscribe(getConnection().getMgmtId(), getConnection().getUserId(), destination));
     QPID_LOG_CAT(debug, model, "Delete subscription. destination:" << destination
         << " user:" << getConnection().getUserId()
-        << " rhost:" << getConnection().getUrl() );
+        << " rhost:" << getConnection().getMgmtId() );
 }
 
 void

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/SessionContext.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/SessionContext.h?rev=1496466&r1=1496465&r2=1496466&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/SessionContext.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/SessionContext.h Tue Jun 25 13:28:15 2013
@@ -36,14 +36,16 @@ class AMQP_ClientProxy;
 namespace broker {
 
 class Broker;
+namespace amqp_0_10 {
 class Connection;
+}
 
 class SessionContext : public OwnershipToken
 {
   public:
     virtual ~SessionContext(){}
     virtual bool isAttached() const = 0;
-    virtual Connection& getConnection() = 0;
+    virtual amqp_0_10::Connection& getConnection() = 0;
     virtual framing::AMQP_ClientProxy& getProxy() = 0;
     virtual Broker& getBroker() = 0;
     virtual uint16_t getChannel() const = 0;

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/SessionHandler.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/SessionHandler.cpp?rev=1496466&r1=1496465&r2=1496466&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/SessionHandler.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/SessionHandler.cpp Tue Jun 25 13:28:15 2013
@@ -20,7 +20,7 @@
 
 #include "qpid/broker/SessionHandler.h"
 #include "qpid/broker/Broker.h"
-#include "qpid/broker/Connection.h"
+#include "qpid/broker/amqp_0_10/Connection.h"
 #include "qpid/broker/SessionState.h"
 #include "qpid/log/Statement.h"
 #include "qpid/sys/ConnectionOutputHandler.h"
@@ -33,7 +33,7 @@ using namespace framing;
 using namespace std;
 using namespace qpid::sys;
 
-SessionHandler::SessionHandler(Connection& c, ChannelId ch)
+SessionHandler::SessionHandler(amqp_0_10::Connection& c, ChannelId ch)
     : qpid::amqp_0_10::SessionHandler(&c.getOutput(), ch),
       connection(c),
       proxy(out)
@@ -64,9 +64,9 @@ void SessionHandler::executionException(
         errorListener->executionException(code, msg);
 }
 
-Connection& SessionHandler::getConnection() { return connection; }
+amqp_0_10::Connection& SessionHandler::getConnection() { return connection; }
 
-const Connection& SessionHandler::getConnection() const { return connection; }
+const amqp_0_10::Connection& SessionHandler::getConnection() const { return connection; }
 
 void SessionHandler::handleDetach() {
     qpid::amqp_0_10::SessionHandler::handleDetach();

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/SessionHandler.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/SessionHandler.h?rev=1496466&r1=1496465&r2=1496466&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/SessionHandler.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/SessionHandler.h Tue Jun 25 13:28:15 2013
@@ -31,8 +31,9 @@ namespace qpid {
 class SessionState;
 
 namespace broker {
-
+namespace amqp_0_10 {
 class Connection;
+}
 class SessionState;
 
 /**
@@ -57,15 +58,15 @@ class SessionHandler : public qpid::amqp
 
     /**
      *@param e must not be deleted until ErrorListener::detach has been called */
-    SessionHandler(Connection&, framing::ChannelId);
+    SessionHandler(amqp_0_10::Connection&, framing::ChannelId);
     ~SessionHandler();
 
     /** Get broker::SessionState */
     SessionState* getSession() { return session.get(); }
     const SessionState* getSession() const { return session.get(); }
 
-    Connection& getConnection();
-    const Connection& getConnection() const;
+    amqp_0_10::Connection& getConnection();
+    const amqp_0_10::Connection& getConnection() const;
 
     framing::AMQP_ClientProxy& getProxy() { return proxy; }
     const framing::AMQP_ClientProxy& getProxy() const { return proxy; }
@@ -93,7 +94,7 @@ class SessionHandler : public qpid::amqp
             : framing::AMQP_ClientProxy(setChannel), setChannel(ch, out) {}
     };
 
-    Connection& connection;
+    amqp_0_10::Connection& connection;
     framing::AMQP_ClientProxy proxy;
     std::auto_ptr<SessionState> session;
     boost::shared_ptr<ErrorListener> errorListener;

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.cpp?rev=1496466&r1=1496465&r2=1496466&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.cpp Tue Jun 25 13:28:15 2013
@@ -96,7 +96,7 @@ uint16_t SessionState::getChannel() cons
     return handler->getChannel();
 }
 
-Connection& SessionState::getConnection() {
+amqp_0_10::Connection& SessionState::getConnection() {
     assert(isAttached());
     return handler->getConnection();
 }

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.h?rev=1496466&r1=1496465&r2=1496466&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.h Tue Jun 25 13:28:15 2013
@@ -89,7 +89,7 @@ class SessionState : public qpid::Sessio
     uint16_t getChannel() const;
 
     /** @pre isAttached() */
-    Connection& getConnection();
+    amqp_0_10::Connection& getConnection();
     bool isLocal(const OwnershipToken* t) const;
 
     Broker& getBroker();

Added: qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Authorise.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Authorise.cpp?rev=1496466&view=auto
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Authorise.cpp (added)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Authorise.cpp Tue Jun 25 13:28:15 2013
@@ -0,0 +1,131 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "Authorise.h"
+#include "Exception.h"
+#include "Filter.h"
+#include "qpid/amqp/descriptors.h"
+#include "qpid/broker/AclModule.h"
+#include "qpid/broker/Exchange.h"
+#include "qpid/broker/Message.h"
+#include "qpid/broker/Queue.h"
+#include "qpid/types/Variant.h"
+#include <map>
+#include <boost/lexical_cast.hpp>
+
+namespace qpid {
+namespace broker {
+namespace amqp {
+namespace {
+const std::string B_TRUE("true");
+const std::string B_FALSE("false");
+const std::string POLICY_TYPE("qpid.policy_type");
+}
+
+Authorise::Authorise(const std::string& u, AclModule* a) : user(u), acl(a) {}
+void Authorise::access(boost::shared_ptr<Exchange> exchange)
+{
+    if (acl) {
+        std::map<acl::Property, std::string> params;
+        params.insert(std::make_pair(acl::PROP_TYPE, exchange->getType()));
+        params.insert(std::make_pair(acl::PROP_DURABLE, exchange->isDurable() ? B_TRUE : B_FALSE));
+        if (!acl->authorise(user, acl::ACT_ACCESS, acl::OBJ_EXCHANGE, exchange->getName(), &params))
+            throw Exception(qpid::amqp::error_conditions::UNAUTHORIZED_ACCESS, QPID_MSG("ACL denied exchange access request from " << user));
+    }
+}
+void Authorise::access(boost::shared_ptr<Queue> queue)
+{
+    if (acl) {
+        const QueueSettings& settings = queue->getSettings();
+        std::map<acl::Property, std::string> params;
+        boost::shared_ptr<Exchange> altex = queue->getAlternateExchange();
+        if (altex)
+            params.insert(std::make_pair(acl::PROP_ALTERNATE, altex->getName()));
+        params.insert(std::make_pair(acl::PROP_DURABLE, settings.durable ? B_TRUE : B_FALSE));
+        params.insert(std::make_pair(acl::PROP_EXCLUSIVE, queue->hasExclusiveOwner() ? B_TRUE : B_FALSE));
+        params.insert(std::make_pair(acl::PROP_AUTODELETE, settings.autodelete ? B_TRUE : B_FALSE));
+        qpid::types::Variant::Map::const_iterator i = settings.original.find(POLICY_TYPE);
+        if (i != settings.original.end())
+            params.insert(std::make_pair(acl::PROP_POLICYTYPE, i->second.asString()));
+        if (settings.maxDepth.hasCount())
+            params.insert(std::make_pair(acl::PROP_MAXQUEUECOUNT, boost::lexical_cast<std::string>(settings.maxDepth.getCount())));
+        if (settings.maxDepth.hasCount())
+            params.insert(std::make_pair(acl::PROP_MAXQUEUESIZE, boost::lexical_cast<std::string>(settings.maxDepth.getSize())));
+        if (!acl->authorise(user, acl::ACT_ACCESS, acl::OBJ_QUEUE, queue->getName(), &params) )
+            throw Exception(qpid::amqp::error_conditions::UNAUTHORIZED_ACCESS, QPID_MSG("ACL denied queue access request from " << user));
+    }
+}
+
+void Authorise::incoming(boost::shared_ptr<Exchange> exchange)
+{
+    access(exchange);
+    //can't check publish permission here as do not yet know routing key
+}
+void Authorise::incoming(boost::shared_ptr<Queue> queue)
+{
+    access(queue);
+    if (acl) {
+        if (!acl->authorise(user, acl::ACT_PUBLISH, acl::OBJ_EXCHANGE, std::string()/*default exchange*/, queue->getName()))
+            throw Exception(qpid::amqp::error_conditions::UNAUTHORIZED_ACCESS, QPID_MSG(user << " cannot publish to queue " << queue->getName()));
+    }
+}
+void Authorise::outgoing(boost::shared_ptr<Exchange> exchange, boost::shared_ptr<Queue> queue, const Filter& filter)
+{
+    access(exchange);
+    if (acl) {
+        std::map<qpid::acl::Property, std::string> params;
+        params.insert(std::make_pair(acl::PROP_QUEUENAME, queue->getName()));
+        params.insert(std::make_pair(acl::PROP_ROUTINGKEY, filter.getBindingKey(exchange)));
+
+        if (!acl->authorise(user, acl::ACT_BIND, acl::OBJ_EXCHANGE, exchange->getName(), &params))
+            throw Exception(qpid::amqp::error_conditions::UNAUTHORIZED_ACCESS, QPID_MSG("ACL denied exchange bind request from " << user));
+
+        if (!acl->authorise(user, acl::ACT_CONSUME, acl::OBJ_QUEUE, queue->getName(), NULL))
+            throw Exception(qpid::amqp::error_conditions::UNAUTHORIZED_ACCESS, QPID_MSG("ACL denied queue subscribe request from " << user));
+    }
+}
+
+void Authorise::outgoing(boost::shared_ptr<Queue> queue)
+{
+    access(queue);
+    if (acl) {
+        if (!acl->authorise(user, acl::ACT_CONSUME, acl::OBJ_QUEUE, queue->getName(), NULL))
+            throw Exception(qpid::amqp::error_conditions::UNAUTHORIZED_ACCESS, QPID_MSG("ACL denied queue subscribe request from " << user));
+    }
+}
+
+void Authorise::route(boost::shared_ptr<Exchange> exchange, const Message& msg)
+{
+    if (acl && acl->doTransferAcl()) {
+        if (!acl->authorise(user, acl::ACT_PUBLISH, acl::OBJ_EXCHANGE, exchange->getName(), msg.getRoutingKey()))
+            throw Exception(qpid::amqp::error_conditions::UNAUTHORIZED_ACCESS, QPID_MSG(user << " cannot publish to " << exchange->getName() << " with routing-key " << msg.getRoutingKey()));
+    }
+}
+
+void Authorise::interlink()
+{
+    if (acl) {
+        if (!acl->authorise(user, acl::ACT_CREATE, acl::OBJ_LINK, "")){
+            throw Exception(qpid::amqp::error_conditions::UNAUTHORIZED_ACCESS, QPID_MSG("ACL denied " << user << "  a AMQP 1.0 link"));
+        }
+    }
+}
+
+}}} // namespace qpid::broker::amqp

Copied: qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Authorise.h (from r1496401, qpid/trunk/qpid/cpp/src/qpid/broker/amqp/DataReader.h)
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Authorise.h?p2=qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Authorise.h&p1=qpid/trunk/qpid/cpp/src/qpid/broker/amqp/DataReader.h&r1=1496401&r2=1496466&rev=1496466&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/amqp/DataReader.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Authorise.h Tue Jun 25 13:28:15 2013
@@ -1,5 +1,5 @@
-#ifndef QPID_BROKER_AMQP_DATAREADER_H
-#define QPID_BROKER_AMQP_DATAREADER_H
+#ifndef QPID_BROKER_AMQP_AUTHORISE_H
+#define QPID_BROKER_AMQP_AUTHORISE_H
 
 /*
  *
@@ -21,33 +21,37 @@
  * under the License.
  *
  */
-#include "qpid/amqp/Reader.h"
-
-struct pn_data_t;
+#include <boost/shared_ptr.hpp>
 
 namespace qpid {
-namespace amqp {
-struct Descriptor;
-}
 namespace broker {
+class AclModule;
+class Exchange;
+class Message;
+class Queue;
 namespace amqp {
+class Filter;
 
 /**
- * Allows use of Reader interface to read pn_data_t* data.
+ * Class to handle authorisation requests (and hide the ACL mess behind)
  */
-class DataReader
+class Authorise
 {
   public:
-    DataReader(qpid::amqp::Reader& reader);
-    void read(pn_data_t*);
+    Authorise(const std::string& user, AclModule*);
+    void access(boost::shared_ptr<Exchange>);
+    void access(boost::shared_ptr<Queue>);
+    void incoming(boost::shared_ptr<Exchange>);
+    void incoming(boost::shared_ptr<Queue>);
+    void outgoing(boost::shared_ptr<Exchange>, boost::shared_ptr<Queue>, const Filter&);
+    void outgoing(boost::shared_ptr<Queue>);
+    void route(boost::shared_ptr<Exchange>, const Message&);
+    void interlink();
   private:
-    qpid::amqp::Reader& reader;
+    const std::string user;
+    AclModule* const acl;
 
-    void readOne(pn_data_t*);
-    void readMap(pn_data_t*, const qpid::amqp::Descriptor*);
-    void readList(pn_data_t*, const qpid::amqp::Descriptor*);
-    void readArray(pn_data_t*, const qpid::amqp::Descriptor*);
 };
 }}} // namespace qpid::broker::amqp
 
-#endif  /*!QPID_BROKER_AMQP_DATAREADER_H*/
+#endif  /*!QPID_BROKER_AMQP_AUTHORISE_H*/

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Connection.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Connection.cpp?rev=1496466&r1=1496465&r2=1496466&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Connection.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Connection.cpp Tue Jun 25 13:28:15 2013
@@ -19,9 +19,12 @@
  *
  */
 #include "Connection.h"
+#include "DataReader.h"
 #include "Session.h"
-#include "qpid/Exception.h"
+#include "Exception.h"
+#include "qpid/broker/AclModule.h"
 #include "qpid/broker/Broker.h"
+#include "qpid/amqp/descriptors.h"
 #include "qpid/framing/Buffer.h"
 #include "qpid/framing/ProtocolInitiation.h"
 #include "qpid/framing/ProtocolVersion.h"
@@ -36,7 +39,6 @@ extern "C" {
 namespace qpid {
 namespace broker {
 namespace amqp {
-
 Connection::Connection(qpid::sys::OutputControl& o, const std::string& i, qpid::broker::Broker& b, Interconnects& interconnects_, bool saslInUse, const std::string& d)
     : ManagedConnection(b, i),
       connection(pn_connection()),
@@ -52,6 +54,7 @@ Connection::Connection(qpid::sys::Output
     QPID_LOG_TEST_CAT(trace, protocol, enableTrace);
     if (enableTrace) pn_transport_trace(transport, PN_TRACE_FRM);
 
+    broker.getConnectionObservers().connection(*this);
     if (!saslInUse) {
         //feed in a dummy AMQP 1.0 header as engine expects one, but
         //we already read it (if sasl is in use we read the sasl
@@ -62,15 +65,14 @@ Connection::Connection(qpid::sys::Output
         pi.encode(buffer);
         pn_transport_input(transport, &protocolHeader[0], protocolHeader.size());
 
-        //wont get a userid, so set a dummy one on the ManagedConnection to trigger event
-        setUserid("no authentication used");
+        setUserId("none");
     }
 }
 
 
 Connection::~Connection()
 {
-
+    broker.getConnectionObservers().closed(*this);
     pn_transport_free(transport);
     pn_connection_free(connection);
 }
@@ -97,8 +99,17 @@ size_t Connection::decode(const char* bu
         QPID_LOG_CAT(debug, network, id << " decoded " << n << " bytes from " << size);
         try {
             process();
+        } catch (const Exception& e) {
+            QPID_LOG(error, id << ": " << e.what());
+            pn_condition_t* error = pn_connection_condition(connection);
+            pn_condition_set_name(error, e.symbol());
+            pn_condition_set_description(error, e.what());
+            close();
         } catch (const std::exception& e) {
             QPID_LOG(error, id << ": " << e.what());
+            pn_condition_t* error = pn_connection_condition(connection);
+            pn_condition_set_name(error, qpid::amqp::error_conditions::INTERNAL_ERROR.c_str());
+            pn_condition_set_description(error, e.what());
             close();
         }
         pn_transport_tick(transport, 0);
@@ -108,7 +119,7 @@ size_t Connection::decode(const char* bu
         }
         return n;
     } else if (n == PN_ERR) {
-        throw qpid::Exception(QPID_MSG("Error on input: " << getError()));
+        throw Exception(qpid::amqp::error_conditions::DECODE_ERROR, QPID_MSG("Error on input: " << getError()));
     } else {
         return 0;
     }
@@ -126,7 +137,7 @@ size_t Connection::encode(char* buffer, 
         haveOutput = size;
         return size;//Is this right?
     } else if (n == PN_ERR) {
-        throw qpid::Exception(QPID_MSG("Error on output: " << getError()));
+        throw Exception(qpid::amqp::error_conditions::INTERNAL_ERROR, QPID_MSG("Error on output: " << getError()));
     } else {
         haveOutput = false;
         return 0;
@@ -139,8 +150,17 @@ bool Connection::canEncode()
             if (i->second->dispatch()) haveOutput = true;
         }
         process();
+    } catch (const Exception& e) {
+        QPID_LOG(error, id << ": " << e.what());
+        pn_condition_t* error = pn_connection_condition(connection);
+        pn_condition_set_name(error, e.symbol());
+        pn_condition_set_description(error, e.what());
+        close();
     } catch (const std::exception& e) {
         QPID_LOG(error, id << ": " << e.what());
+        pn_condition_t* error = pn_connection_condition(connection);
+        pn_condition_set_name(error, qpid::amqp::error_conditions::INTERNAL_ERROR.c_str());
+        pn_condition_set_description(error, e.what());
         close();
     }
     //TODO: proper handling of time in and out of tick
@@ -148,6 +168,28 @@ bool Connection::canEncode()
     QPID_LOG_CAT(trace, network, id << " canEncode(): " << haveOutput)
     return haveOutput;
 }
+
+void Connection::open()
+{
+    readPeerProperties();
+
+    pn_connection_set_container(connection, broker.getFederationTag().c_str());
+    pn_connection_open(connection);
+    out.connectionEstablished();
+    opened();
+    broker.getConnectionObservers().opened(*this);
+}
+
+void Connection::readPeerProperties()
+{
+    /**
+     * TODO: enable when proton 0.5 has been released:
+    qpid::types::Variant::Map properties;
+    DataReader::read(pn_connection_remote_properties(connection), properties);
+    setPeerProperties(properties);
+    */
+}
+
 void Connection::closed()
 {
     for (Sessions::iterator i = sessions.begin(); i != sessions.end(); ++i) {
@@ -178,10 +220,8 @@ void Connection::process()
     QPID_LOG(trace, id << " process()");
     if ((pn_connection_state(connection) & REQUIRES_OPEN) == REQUIRES_OPEN) {
         QPID_LOG_CAT(debug, model, id << " connection opened");
-        pn_connection_set_container(connection, broker.getFederationTag().c_str());
+        open();
         setContainerId(pn_connection_remote_container(connection));
-        pn_connection_open(connection);
-        out.connectionEstablished();
     }
 
     for (pn_session_t* s = pn_session_head(connection, REQUIRES_OPEN); s; s = pn_session_next(s, REQUIRES_OPEN)) {
@@ -200,9 +240,17 @@ void Connection::process()
             try {
                 session->second->attach(l);
                 QPID_LOG_CAT(debug, protocol, id << " link " << l << " attached on " << pn_link_session(l));
+            } catch (const Exception& e) {
+                QPID_LOG_CAT(error, protocol, "Error on attach: " << e.what());
+                pn_condition_t* error = pn_link_condition(l);
+                pn_condition_set_name(error, e.symbol());
+                pn_condition_set_description(error, e.what());
+                pn_link_close(l);
             } catch (const std::exception& e) {
                 QPID_LOG_CAT(error, protocol, "Error on attach: " << e.what());
-                //TODO: set error details on detach when that is exposed via engine API
+                pn_condition_t* error = pn_link_condition(l);
+                pn_condition_set_name(error, qpid::amqp::error_conditions::INTERNAL_ERROR.c_str());
+                pn_condition_set_description(error, e.what());
                 pn_link_close(l);
             }
         }
@@ -214,7 +262,15 @@ void Connection::process()
         if (pn_link_is_receiver(link)) {
             Sessions::iterator i = sessions.find(pn_link_session(link));
             if (i != sessions.end()) {
-                i->second->readable(link, delivery);
+                try {
+                    i->second->readable(link, delivery);
+                } catch (const Exception& e) {
+                    QPID_LOG_CAT(error, protocol, "Error on publish: " << e.what());
+                    pn_condition_t* error = pn_link_condition(link);
+                    pn_condition_set_name(error, e.symbol());
+                    pn_condition_set_description(error, e.what());
+                    pn_link_close(link);
+                }
             } else {
                 pn_delivery_update(delivery, PN_REJECTED);
             }
@@ -271,4 +327,19 @@ std::string Connection::getDomain() cons
 {
     return domain;
 }
+
+void Connection::abort()
+{
+    out.abort();
+}
+
+void Connection::setUserId(const std::string& user)
+{
+    ManagedConnection::setUserId(user);
+    AclModule* acl = broker.getAcl();
+    if (acl && !acl->approveConnection(*this))
+    {
+        throw Exception(qpid::amqp::error_conditions::RESOURCE_LIMIT_EXCEEDED, "User connection denied by configured limit");
+    }
+}
 }}} // namespace qpid::broker::amqp

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Connection.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Connection.h?rev=1496466&r1=1496465&r2=1496466&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Connection.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Connection.h Tue Jun 25 13:28:15 2013
@@ -58,6 +58,9 @@ class Connection : public sys::Connectio
     pn_transport_t* getTransport();
     Interconnects& getInterconnects();
     std::string getDomain() const;
+    void setUserId(const std::string&);
+    void abort();
+
   protected:
     typedef std::map<pn_session_t*, boost::shared_ptr<Session> > Sessions;
     pn_connection_t* connection;
@@ -73,6 +76,8 @@ class Connection : public sys::Connectio
     virtual void process();
     std::string getError();
     void close();
+    void open();
+    void readPeerProperties();
 };
 }}} // namespace qpid::broker::amqp
 

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/amqp/DataReader.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/amqp/DataReader.cpp?rev=1496466&r1=1496465&r2=1496466&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/amqp/DataReader.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/amqp/DataReader.cpp Tue Jun 25 13:28:15 2013
@@ -21,6 +21,7 @@
 #include "DataReader.h"
 #include "qpid/amqp/CharSequence.h"
 #include "qpid/amqp/Descriptor.h"
+#include "qpid/amqp/MapBuilder.h"
 #include "qpid/log/Statement.h"
 #include <string>
 extern "C" {
@@ -52,11 +53,6 @@ DataReader::DataReader(qpid::amqp::Reade
 
 void DataReader::read(pn_data_t* data)
 {
-    /*
-    while (pn_data_next(data)) {
-        readOne(data);
-    }
-    */
     do {
         readOne(data);
     } while (pn_data_next(data));
@@ -184,4 +180,12 @@ void DataReader::readMap(pn_data_t* data
     pn_data_exit(data);
     reader.onEndMap(count, descriptor);
 }
+
+void DataReader::read(pn_data_t* data, std::map<std::string, qpid::types::Variant>& out)
+{
+    qpid::amqp::MapBuilder builder;
+    DataReader reader(builder);
+    reader.read(data);
+    out = builder.getMap();
+}
 }}} // namespace qpid::broker::amqp

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/amqp/DataReader.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/amqp/DataReader.h?rev=1496466&r1=1496465&r2=1496466&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/amqp/DataReader.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/amqp/DataReader.h Tue Jun 25 13:28:15 2013
@@ -22,10 +22,15 @@
  *
  */
 #include "qpid/amqp/Reader.h"
+#include <map>
+#include <string>
 
 struct pn_data_t;
 
 namespace qpid {
+namespace types {
+class Variant;
+}
 namespace amqp {
 struct Descriptor;
 }
@@ -40,6 +45,7 @@ class DataReader
   public:
     DataReader(qpid::amqp::Reader& reader);
     void read(pn_data_t*);
+    static void read(pn_data_t*, std::map<std::string, qpid::types::Variant>&);
   private:
     qpid::amqp::Reader& reader;
 



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org