You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by gs...@apache.org on 2013/06/25 15:28:16 UTC
svn commit: r1496466 [1/2] - in /qpid/trunk/qpid/cpp/src: ./ qpid/amqp/
qpid/broker/ qpid/broker/amqp/ qpid/broker/amqp_0_10/ qpid/ha/
qpid/management/ qpid/messaging/amqp/
Author: gsim
Date: Tue Jun 25 13:28:15 2013
New Revision: 1496466
URL: http://svn.apache.org/r1496466
Log:
QPID-4712: authorisation for AMQP 1.0 connections
Added:
qpid/trunk/qpid/cpp/src/qpid/amqp/MapBuilder.cpp
qpid/trunk/qpid/cpp/src/qpid/amqp/MapBuilder.h
qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Authorise.cpp
qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Authorise.h
- copied, changed from r1496401, qpid/trunk/qpid/cpp/src/qpid/broker/amqp/DataReader.h
qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Exception.cpp
- copied, changed from r1496401, qpid/trunk/qpid/cpp/src/qpid/broker/amqp/DataReader.h
qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Exception.h
- copied, changed from r1496401, qpid/trunk/qpid/cpp/src/qpid/broker/ConnectionIdentity.h
qpid/trunk/qpid/cpp/src/qpid/broker/amqp_0_10/Connection.cpp
- copied, changed from r1496401, qpid/trunk/qpid/cpp/src/qpid/broker/Connection.cpp
qpid/trunk/qpid/cpp/src/qpid/broker/amqp_0_10/Connection.h
- copied, changed from r1496401, qpid/trunk/qpid/cpp/src/qpid/broker/Connection.h
Removed:
qpid/trunk/qpid/cpp/src/qpid/broker/Connection.cpp
qpid/trunk/qpid/cpp/src/qpid/broker/ConnectionIdentity.h
Modified:
qpid/trunk/qpid/cpp/src/CMakeLists.txt
qpid/trunk/qpid/cpp/src/Makefile.am
qpid/trunk/qpid/cpp/src/amqp.cmake
qpid/trunk/qpid/cpp/src/qpid/amqp/descriptors.h
qpid/trunk/qpid/cpp/src/qpid/broker/Bridge.cpp
qpid/trunk/qpid/cpp/src/qpid/broker/Bridge.h
qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp
qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h
qpid/trunk/qpid/cpp/src/qpid/broker/Connection.h
qpid/trunk/qpid/cpp/src/qpid/broker/ConnectionHandler.cpp
qpid/trunk/qpid/cpp/src/qpid/broker/ConnectionHandler.h
qpid/trunk/qpid/cpp/src/qpid/broker/HandlerImpl.h
qpid/trunk/qpid/cpp/src/qpid/broker/Link.cpp
qpid/trunk/qpid/cpp/src/qpid/broker/Link.h
qpid/trunk/qpid/cpp/src/qpid/broker/LinkRegistry.cpp
qpid/trunk/qpid/cpp/src/qpid/broker/LinkRegistry.h
qpid/trunk/qpid/cpp/src/qpid/broker/Message.cpp
qpid/trunk/qpid/cpp/src/qpid/broker/Message.h
qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp
qpid/trunk/qpid/cpp/src/qpid/broker/SaslAuthenticator.cpp
qpid/trunk/qpid/cpp/src/qpid/broker/SaslAuthenticator.h
qpid/trunk/qpid/cpp/src/qpid/broker/SecureConnectionFactory.cpp
qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp
qpid/trunk/qpid/cpp/src/qpid/broker/SessionAdapter.cpp
qpid/trunk/qpid/cpp/src/qpid/broker/SessionContext.h
qpid/trunk/qpid/cpp/src/qpid/broker/SessionHandler.cpp
qpid/trunk/qpid/cpp/src/qpid/broker/SessionHandler.h
qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.cpp
qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.h
qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Connection.cpp
qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Connection.h
qpid/trunk/qpid/cpp/src/qpid/broker/amqp/DataReader.cpp
qpid/trunk/qpid/cpp/src/qpid/broker/amqp/DataReader.h
qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Filter.cpp
qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Filter.h
qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Incoming.cpp
qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Incoming.h
qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Interconnect.cpp
qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Interconnect.h
qpid/trunk/qpid/cpp/src/qpid/broker/amqp/ManagedConnection.cpp
qpid/trunk/qpid/cpp/src/qpid/broker/amqp/ManagedConnection.h
qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Sasl.cpp
qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Session.cpp
qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Session.h
qpid/trunk/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp
qpid/trunk/qpid/cpp/src/qpid/ha/ConnectionObserver.cpp
qpid/trunk/qpid/cpp/src/qpid/management/ManagementAgent.cpp
qpid/trunk/qpid/cpp/src/qpid/management/ManagementAgent.h
qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp
qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.h
qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/SenderContext.cpp
qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/SenderContext.h
qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/SessionContext.cpp
Modified: qpid/trunk/qpid/cpp/src/CMakeLists.txt
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/CMakeLists.txt?rev=1496466&r1=1496465&r2=1496466&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/CMakeLists.txt (original)
+++ qpid/trunk/qpid/cpp/src/CMakeLists.txt Tue Jun 25 13:28:15 2013
@@ -1034,6 +1034,8 @@ set (qpidcommon_SOURCES
qpid/amqp/MapEncoder.cpp
qpid/amqp/MapSizeCalculator.h
qpid/amqp/MapSizeCalculator.cpp
+ qpid/amqp/MapBuilder.h
+ qpid/amqp/MapBuilder.cpp
qpid/amqp/MapReader.h
qpid/amqp/MapReader.cpp
qpid/amqp/MessageEncoder.h
@@ -1261,7 +1263,7 @@ set (qpidbroker_SOURCES
qpid/broker/MessageGroupManager.cpp
qpid/broker/PersistableMessage.cpp
qpid/broker/Bridge.cpp
- qpid/broker/Connection.cpp
+ qpid/broker/amqp_0_10/Connection.cpp
qpid/broker/ConnectionHandler.cpp
qpid/broker/DeliverableMessage.cpp
qpid/broker/DeliveryRecord.cpp
Modified: qpid/trunk/qpid/cpp/src/Makefile.am
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/Makefile.am?rev=1496466&r1=1496465&r2=1496466&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/Makefile.am (original)
+++ qpid/trunk/qpid/cpp/src/Makefile.am Tue Jun 25 13:28:15 2013
@@ -536,6 +536,8 @@ libqpidcommon_la_SOURCES += \
qpid/amqp/MapEncoder.cpp \
qpid/amqp/MapSizeCalculator.h \
qpid/amqp/MapSizeCalculator.cpp \
+ qpid/amqp/MapBuilder.h \
+ qpid/amqp/MapBuilder.cpp \
qpid/amqp/MapReader.h \
qpid/amqp/MapReader.cpp \
qpid/amqp/MessageEncoder.h \
@@ -598,15 +600,15 @@ libqpidbroker_la_SOURCES = \
qpid/broker/Broker.cpp \
qpid/broker/Broker.h \
qpid/broker/BrokerImportExport.h \
- qpid/broker/Connection.cpp \
- qpid/broker/Connection.h \
+ qpid/broker/amqp_0_10/Connection.cpp \
+ qpid/broker/amqp_0_10/Connection.h \
qpid/broker/ConnectionHandler.cpp \
qpid/broker/ConnectionHandler.h \
qpid/broker/Consumer.h \
qpid/broker/Credit.h \
qpid/broker/Credit.cpp \
qpid/broker/ConsumerFactory.h \
- qpid/broker/ConnectionIdentity.h \
+ qpid/broker/Connection.h \
qpid/broker/ConnectionObserver.h \
qpid/broker/ConnectionObservers.h \
qpid/broker/ConfigurationObserver.h \
@@ -801,12 +803,16 @@ if HAVE_PROTON
dmoduleexec_LTLIBRARIES += amqp.la
amqp_la_LIBADD = libqpidcommon.la
amqp_la_SOURCES = \
+ qpid/broker/amqp/Authorise.h \
+ qpid/broker/amqp/Authorise.cpp \
qpid/broker/amqp/Connection.h \
qpid/broker/amqp/Connection.cpp \
qpid/broker/amqp/DataReader.h \
qpid/broker/amqp/DataReader.cpp \
qpid/broker/amqp/Domain.h \
qpid/broker/amqp/Domain.cpp \
+ qpid/broker/amqp/Exception.h \
+ qpid/broker/amqp/Exception.cpp \
qpid/broker/amqp/Filter.h \
qpid/broker/amqp/Filter.cpp \
qpid/broker/amqp/Header.h \
Modified: qpid/trunk/qpid/cpp/src/amqp.cmake
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/amqp.cmake?rev=1496466&r1=1496465&r2=1496466&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/amqp.cmake (original)
+++ qpid/trunk/qpid/cpp/src/amqp.cmake Tue Jun 25 13:28:15 2013
@@ -53,12 +53,16 @@ if (BUILD_AMQP)
set (amqp_SOURCES
+ qpid/broker/amqp/Authorise.h
+ qpid/broker/amqp/Authorise.cpp
qpid/broker/amqp/Connection.h
qpid/broker/amqp/Connection.cpp
qpid/broker/amqp/DataReader.h
qpid/broker/amqp/DataReader.cpp
qpid/broker/amqp/Domain.h
qpid/broker/amqp/Domain.cpp
+ qpid/broker/amqp/Exception.h
+ qpid/broker/amqp/Exception.cpp
qpid/broker/amqp/Filter.h
qpid/broker/amqp/Filter.cpp
qpid/broker/amqp/Header.h
Added: qpid/trunk/qpid/cpp/src/qpid/amqp/MapBuilder.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/amqp/MapBuilder.cpp?rev=1496466&view=auto
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/amqp/MapBuilder.cpp (added)
+++ qpid/trunk/qpid/cpp/src/qpid/amqp/MapBuilder.cpp Tue Jun 25 13:28:15 2013
@@ -0,0 +1,130 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "MapBuilder.h"
+#include <assert.h>
+
+namespace qpid {
+namespace amqp {
+namespace {
+const std::string BINARY("binary");
+const std::string UTF8("utf8");
+const std::string ASCII("ascii");
+}
+
+qpid::types::Variant::Map MapBuilder::getMap()
+{
+ return map;
+}
+const qpid::types::Variant::Map MapBuilder::getMap() const
+{
+ return map;
+}
+
+void MapBuilder::onNullValue(const CharSequence& key, const Descriptor*)
+{
+ map[std::string(key.data, key.size)] = qpid::types::Variant();
+}
+void MapBuilder::onBooleanValue(const CharSequence& key, bool value, const Descriptor*)
+{
+ map[std::string(key.data, key.size)] = value;
+}
+void MapBuilder::onUByteValue(const CharSequence& key, uint8_t value, const Descriptor*)
+{
+ map[std::string(key.data, key.size)] = value;
+}
+
+void MapBuilder::onUShortValue(const CharSequence& key, uint16_t value, const Descriptor*)
+{
+ map[std::string(key.data, key.size)] = value;
+}
+
+void MapBuilder::onUIntValue(const CharSequence& key, uint32_t value, const Descriptor*)
+{
+ map[std::string(key.data, key.size)] = value;
+}
+
+void MapBuilder::onULongValue(const CharSequence& key, uint64_t value, const Descriptor*)
+{
+ map[std::string(key.data, key.size)] = value;
+}
+
+void MapBuilder::onByteValue(const CharSequence& key, int8_t value, const Descriptor*)
+{
+ map[std::string(key.data, key.size)] = value;
+}
+
+void MapBuilder::onShortValue(const CharSequence& key, int16_t value, const Descriptor*)
+{
+ map[std::string(key.data, key.size)] = value;
+}
+
+void MapBuilder::onIntValue(const CharSequence& key, int32_t value, const Descriptor*)
+{
+ map[std::string(key.data, key.size)] = value;
+}
+
+void MapBuilder::onLongValue(const CharSequence& key, int64_t value, const Descriptor*)
+{
+ map[std::string(key.data, key.size)] = value;
+}
+
+void MapBuilder::onFloatValue(const CharSequence& key, float value, const Descriptor*)
+{
+ map[std::string(key.data, key.size)] = value;
+}
+
+void MapBuilder::onDoubleValue(const CharSequence& key, double value, const Descriptor*)
+{
+ map[std::string(key.data, key.size)] = value;
+}
+
+void MapBuilder::onUuidValue(const CharSequence& key, const CharSequence& value, const Descriptor*)
+{
+ assert(value.size == 16);
+ map[std::string(key.data, key.size)] = qpid::types::Uuid(value.data);
+}
+
+void MapBuilder::onTimestampValue(const CharSequence& key, int64_t value, const Descriptor*)
+{
+ map[std::string(key.data, key.size)] = value;
+}
+
+void MapBuilder::onBinaryValue(const CharSequence& key, const CharSequence& value, const Descriptor*)
+{
+ qpid::types::Variant& v = map[std::string(key.data, key.size)];
+ v = std::string(value.data, value.size);
+ v.setEncoding(BINARY);
+}
+
+void MapBuilder::onStringValue(const CharSequence& key, const CharSequence& value, const Descriptor*)
+{
+ qpid::types::Variant& v = map[std::string(key.data, key.size)];
+ v = std::string(value.data, value.size);
+ v.setEncoding(UTF8);
+}
+
+void MapBuilder::onSymbolValue(const CharSequence& key, const CharSequence& value, const Descriptor*)
+{
+ qpid::types::Variant& v = map[std::string(key.data, key.size)];
+ v = std::string(value.data, value.size);
+ v.setEncoding(ASCII);
+}
+}} // namespace qpid::amqp
Added: qpid/trunk/qpid/cpp/src/qpid/amqp/MapBuilder.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/amqp/MapBuilder.h?rev=1496466&view=auto
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/amqp/MapBuilder.h (added)
+++ qpid/trunk/qpid/cpp/src/qpid/amqp/MapBuilder.h Tue Jun 25 13:28:15 2013
@@ -0,0 +1,63 @@
+#ifndef QPID_AMQP_MAPBUILDER_H
+#define QPID_AMQP_MAPBUILDER_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "MapReader.h"
+#include "qpid/types/Variant.h"
+
+namespace qpid {
+namespace amqp {
+
+/**
+ * Utility to build a Variant::Map from a data stream (doesn't handle
+ * nested maps or lists yet)
+ */
+class MapBuilder : public MapReader
+{
+ public:
+ void onNullValue(const CharSequence& /*key*/, const Descriptor*);
+ void onBooleanValue(const CharSequence& /*key*/, bool, const Descriptor*);
+ void onUByteValue(const CharSequence& /*key*/, uint8_t, const Descriptor*);
+ void onUShortValue(const CharSequence& /*key*/, uint16_t, const Descriptor*);
+ void onUIntValue(const CharSequence& /*key*/, uint32_t, const Descriptor*);
+ void onULongValue(const CharSequence& /*key*/, uint64_t, const Descriptor*);
+ void onByteValue(const CharSequence& /*key*/, int8_t, const Descriptor*);
+ void onShortValue(const CharSequence& /*key*/, int16_t, const Descriptor*);
+ void onIntValue(const CharSequence& /*key*/, int32_t, const Descriptor*);
+ void onLongValue(const CharSequence& /*key*/, int64_t, const Descriptor*);
+ void onFloatValue(const CharSequence& /*key*/, float, const Descriptor*);
+ void onDoubleValue(const CharSequence& /*key*/, double, const Descriptor*);
+ void onUuidValue(const CharSequence& /*key*/, const CharSequence&, const Descriptor*);
+ void onTimestampValue(const CharSequence& /*key*/, int64_t, const Descriptor*);
+
+ void onBinaryValue(const CharSequence& /*key*/, const CharSequence&, const Descriptor*);
+ void onStringValue(const CharSequence& /*key*/, const CharSequence&, const Descriptor*);
+ void onSymbolValue(const CharSequence& /*key*/, const CharSequence&, const Descriptor*);
+
+ qpid::types::Variant::Map getMap();
+ const qpid::types::Variant::Map getMap() const;
+ private:
+ qpid::types::Variant::Map map;
+};
+}} // namespace qpid::amqp
+
+#endif /*!QPID_AMQP_MAPBUILDER_H*/
Modified: qpid/trunk/qpid/cpp/src/qpid/amqp/descriptors.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/amqp/descriptors.h?rev=1496466&r1=1496465&r2=1496466&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/amqp/descriptors.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/amqp/descriptors.h Tue Jun 25 13:28:15 2013
@@ -89,6 +89,15 @@ const uint64_t SELECTOR_FILTER_CODE(0x00
const uint64_t XQUERY_FILTER_CODE(0x0000468C00000005ULL);
}
+namespace error_conditions {
+//note these are not actually descriptors
+const std::string INTERNAL_ERROR("amqp:internal-error");
+const std::string NOT_FOUND("amqp:not-found");
+const std::string UNAUTHORIZED_ACCESS("amqp:unauthorized-access");
+const std::string DECODE_ERROR("amqp:decode-error");
+const std::string NOT_ALLOWED("amqp:not-allowed");
+const std::string RESOURCE_LIMIT_EXCEEDED("amqp:resource-limit-exceeded");
+}
}} // namespace qpid::amqp
#endif /*!QPID_AMQP_DESCRIPTORS_H*/
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Bridge.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Bridge.cpp?rev=1496466&r1=1496465&r2=1496466&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Bridge.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Bridge.cpp Tue Jun 25 13:28:15 2013
@@ -22,7 +22,7 @@
#include "qpid/broker/Broker.h"
#include "qpid/broker/FedOps.h"
-#include "qpid/broker/Connection.h"
+#include "qpid/broker/amqp_0_10/Connection.h"
#include "qpid/broker/Link.h"
#include "qpid/broker/LinkRegistry.h"
#include "qpid/broker/SessionState.h"
@@ -100,7 +100,7 @@ Bridge::~Bridge()
mgmtObject->resourceDestroy();
}
-void Bridge::create(Connection& c)
+void Bridge::create(amqp_0_10::Connection& c)
{
detached = false; // Reset detached in case we are recovering.
conn = &c;
@@ -200,7 +200,7 @@ void Bridge::create(Connection& c)
if (args.i_srcIsLocal) sessionHandler.getSession()->enableReceiverTracking();
}
-void Bridge::cancel(Connection&)
+void Bridge::cancel(amqp_0_10::Connection&)
{
if (resetProxy()) {
peer->getMessage().cancel(args.i_dest);
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Bridge.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Bridge.h?rev=1496466&r1=1496465&r2=1496466&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Bridge.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Bridge.h Tue Jun 25 13:28:15 2013
@@ -39,8 +39,9 @@
namespace qpid {
namespace broker {
-
+namespace amqp_0_10 {
class Connection;
+}
class Link;
class LinkRegistry;
@@ -115,9 +116,9 @@ class Bridge : public PersistableConfig,
void setErrorListener(boost::shared_ptr<ErrorListener> e) { errorListener = e; }
private:
struct PushHandler : framing::FrameHandler {
- PushHandler(Connection* c) { conn = c; }
+ PushHandler(amqp_0_10::Connection* c) { conn = c; }
void handle(framing::AMQFrame& frame);
- Connection* conn;
+ amqp_0_10::Connection* conn;
};
std::auto_ptr<PushHandler> pushHandler;
@@ -134,14 +135,14 @@ class Bridge : public PersistableConfig,
std::string queueName;
std::string altEx;
mutable uint64_t persistenceId;
- Connection* conn;
+ amqp_0_10::Connection* conn;
InitializeCallback initialize;
bool detached; // Set when session is detached.
bool resetProxy();
// connection Management (called by owning Link)
- void create(Connection& c);
- void cancel(Connection& c);
+ void create(amqp_0_10::Connection& c);
+ void cancel(amqp_0_10::Connection& c);
void closed();
friend class Link; // to call create, cancel, closed()
boost::shared_ptr<ErrorListener> errorListener;
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp?rev=1496466&r1=1496465&r2=1496466&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp Tue Jun 25 13:28:15 2013
@@ -22,7 +22,7 @@
#include "qpid/broker/Broker.h"
#include "qpid/broker/AclModule.h"
-#include "qpid/broker/ConnectionIdentity.h"
+#include "qpid/broker/Connection.h"
#include "qpid/broker/DirectExchange.h"
#include "qpid/broker/FanOutExchange.h"
#include "qpid/broker/HeadersExchange.h"
@@ -707,13 +707,13 @@ struct InvalidParameter : public qpid::E
};
void Broker::createObject(const std::string& type, const std::string& name,
- const Variant::Map& properties, bool /*strict*/, const ConnectionIdentity* context)
+ const Variant::Map& properties, bool /*strict*/, const Connection* context)
{
std::string userId;
std::string connectionId;
if (context) {
userId = context->getUserId();
- connectionId = context->getUrl();
+ connectionId = context->getMgmtId();
}
//TODO: implement 'strict' option (check there are no unrecognised properties)
QPID_LOG (debug, "Broker::create(" << type << ", " << name << "," << properties << ")");
@@ -898,13 +898,13 @@ void Broker::createObject(const std::str
}
void Broker::deleteObject(const std::string& type, const std::string& name,
- const Variant::Map& options, const ConnectionIdentity* context)
+ const Variant::Map& options, const Connection* context)
{
std::string userId;
std::string connectionId;
if (context) {
userId = context->getUserId();
- connectionId = context->getUrl();
+ connectionId = context->getMgmtId();
}
QPID_LOG (debug, "Broker::delete(" << type << ", " << name << "," << options << ")");
if (objectFactory.deleteObject(*this, type, name, options, userId, connectionId)) {
@@ -952,13 +952,13 @@ void Broker::checkDeleteQueue(Queue::sha
Manageable::status_t Broker::queryObject(const std::string& type,
const std::string& name,
Variant::Map& results,
- const ConnectionIdentity* context)
+ const Connection* context)
{
std::string userId;
std::string connectionId;
if (context) {
userId = context->getUserId();
- connectionId = context->getUrl();
+ connectionId = context->getMgmtId();
}
QPID_LOG (debug, "Broker::query(" << type << ", " << name << ")");
@@ -994,7 +994,7 @@ Manageable::status_t Broker::queryQueue(
}
Manageable::status_t Broker::getTimestampConfig(bool& receive,
- const ConnectionIdentity* context)
+ const Connection* context)
{
std::string name; // none needed for broker
std::string userId = context->getUserId();
@@ -1006,7 +1006,7 @@ Manageable::status_t Broker::getTimestam
}
Manageable::status_t Broker::setTimestampConfig(const bool receive,
- const ConnectionIdentity* context)
+ const Connection* context)
{
std::string name; // none needed for broker
std::string userId = context->getUserId();
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h?rev=1496466&r1=1496465&r2=1496466&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h Tue Jun 25 13:28:15 2013
@@ -149,20 +149,20 @@ class Broker : public sys::Runnable, pub
void setLogHiresTimestamp(bool enabled);
bool getLogHiresTimestamp();
void createObject(const std::string& type, const std::string& name,
- const qpid::types::Variant::Map& properties, bool strict, const ConnectionIdentity* context);
+ const qpid::types::Variant::Map& properties, bool strict, const Connection* context);
void deleteObject(const std::string& type, const std::string& name,
- const qpid::types::Variant::Map& options, const ConnectionIdentity* context);
+ const qpid::types::Variant::Map& options, const Connection* context);
void checkDeleteQueue(boost::shared_ptr<Queue> queue, bool ifUnused, bool ifEmpty);
Manageable::status_t queryObject(const std::string& type, const std::string& name,
- qpid::types::Variant::Map& results, const ConnectionIdentity* context);
+ qpid::types::Variant::Map& results, const Connection* context);
Manageable::status_t queryQueue( const std::string& name,
const std::string& userId,
const std::string& connectionId,
qpid::types::Variant::Map& results);
Manageable::status_t getTimestampConfig(bool& receive,
- const ConnectionIdentity* context);
+ const Connection* context);
Manageable::status_t setTimestampConfig(const bool receive,
- const ConnectionIdentity* context);
+ const Connection* context);
Manageable::status_t queueRedirect(const std::string& srcQueue, const std::string& tgtQueue);
void queueRedirectDestroy(boost::shared_ptr<Queue> srcQ, boost::shared_ptr<Queue> tgtQ, bool moveMsgs);
boost::shared_ptr<sys::Poller> poller;
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Connection.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Connection.h?rev=1496466&r1=1496465&r2=1496466&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Connection.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Connection.h Tue Jun 25 13:28:15 2013
@@ -21,215 +21,35 @@
* under the License.
*
*/
-
-#include <memory>
-#include <sstream>
-#include <vector>
-#include <queue>
-
-#include "qpid/broker/BrokerImportExport.h"
-
-#include "qpid/broker/ConnectionHandler.h"
-#include "qpid/broker/ConnectionIdentity.h"
-#include "qpid/broker/OwnershipToken.h"
-#include "qpid/management/Manageable.h"
-#include "qpid/sys/AggregateOutput.h"
-#include "qpid/sys/ConnectionInputHandler.h"
-#include "qpid/sys/SecuritySettings.h"
-#include "qpid/sys/Mutex.h"
-#include "qpid/RefCounted.h"
-#include "qpid/Url.h"
-#include "qpid/ptr_map.h"
-
-#include "qmf/org/apache/qpid/broker/Connection.h"
-
-#include <boost/ptr_container/ptr_map.hpp>
-#include <boost/scoped_ptr.hpp>
-#include <boost/bind.hpp>
-
-#include <algorithm>
+#include <map>
+#include <string>
namespace qpid {
-namespace sys {
-class ConnectionOutputHandler;
-class Timer;
-class TimerTask;
+namespace management {
+class ObjectId;
+}
+namespace types {
+class Variant;
}
-namespace broker {
-
-class Broker;
-class LinkRegistry;
-class Queue;
-class SecureConnection;
-class SessionHandler;
-struct ConnectionTimeoutTask;
-
-class Connection : public sys::ConnectionInputHandler, public ConnectionIdentity,
- public OwnershipToken, public management::Manageable,
- public RefCounted
-{
- public:
- uint32_t getFrameMax() const { return framemax; }
- uint16_t getHeartbeat() const { return heartbeat; }
- uint16_t getHeartbeatMax() const { return heartbeatmax; }
-
- void setFrameMax(uint32_t fm) { framemax = std::max(fm, (uint32_t) 4096); }
- void setHeartbeat(uint16_t hb) { heartbeat = hb; }
- void setHeartbeatMax(uint16_t hbm) { heartbeatmax = hbm; }
-
- void setUrl(const std::string& _url) { url = _url; }
-
- const OwnershipToken* getOwnership() const { return this; };
- const management::ObjectId getObjectId() const { return GetManagementObject()->getObjectId(); };
- const std::string& getUserId() const { return userId; }
- const std::string& getUrl() const { return url; }
-
- void setUserProxyAuth(const bool b);
- bool isUserProxyAuth() const { return userProxyAuth || federationPeerTag.size() > 0; } // links can proxy msgs with non-matching auth ids
- bool isFederationLink() const { return federationPeerTag.size() > 0; }
- void setFederationPeerTag(const std::string& tag) { federationPeerTag = std::string(tag); }
- const std::string& getFederationPeerTag() const { return federationPeerTag; }
- std::vector<Url>& getKnownHosts() { return knownHosts; }
-
- /**@return true if user is the authenticated user on this connection.
- * If id has the default realm will also compare plain username.
- */
- bool isAuthenticatedUser(const std::string& id) const {
- return (id == userId || (isDefaultRealm && id == userName));
- }
-
- Broker& getBroker() { return broker; }
-
- sys::ConnectionOutputHandler& getOutput() { return *out; }
- void activateOutput();
- void addOutputTask(OutputTask*);
- void removeOutputTask(OutputTask*);
- framing::ProtocolVersion getVersion() const { return version; }
-
- Connection(sys::ConnectionOutputHandler* out,
- Broker& broker,
- const std::string& mgmtId,
- const qpid::sys::SecuritySettings&,
- bool isLink = false,
- uint64_t objectId = 0);
-
- ~Connection ();
-
- /** Get the SessionHandler for channel. Create if it does not already exist */
- SessionHandler& getChannel(framing::ChannelId channel);
-
- /** Close the connection. Waits for the client to respond with close-ok
- * before actually destroying the connection.
- */
- QPID_BROKER_EXTERN void close(
- framing::connection::CloseCode code, const std::string& text);
-
- /** Abort the connection. Close abruptly and immediately. */
- QPID_BROKER_EXTERN void abort();
-
- // ConnectionInputHandler methods
- void received(framing::AMQFrame& frame);
- bool doOutput();
- void closed();
-
- void closeChannel(framing::ChannelId channel);
-
- // Manageable entry points
- management::ManagementObject::shared_ptr GetManagementObject(void) const;
- management::Manageable::status_t
- ManagementMethod (uint32_t methodId, management::Args& args, std::string&);
-
- void requestIOProcessing (boost::function0<void>);
- void recordFromServer (const framing::AMQFrame& frame);
- void recordFromClient (const framing::AMQFrame& frame);
-
- // gets for configured federation links
- std::string getAuthMechanism();
- std::string getAuthCredentials();
- std::string getUsername();
- std::string getPassword();
- std::string getHost();
- uint16_t getPort();
-
- void notifyConnectionForced(const std::string& text);
- void setUserId(const std::string& uid);
-
- // credentials for connected client
- const std::string& getMgmtId() const { return mgmtId; }
- management::ManagementAgent* getAgent() const { return agent; }
-
- void setHeartbeatInterval(uint16_t heartbeat);
- void sendHeartbeat();
- void restartTimeout();
-
- void setSecureConnection(SecureConnection* secured);
-
- const qpid::sys::SecuritySettings& getExternalSecuritySettings() const
- {
- return securitySettings;
- }
-
- /** @return true if the initial connection negotiation is complete. */
- bool isOpen();
-
- bool isLink() { return link; }
- void startLinkHeartbeatTimeoutTask();
-
- void setClientProperties(const framing::FieldTable& cp) { clientProperties = cp; }
- const framing::FieldTable& getClientProperties() const { return clientProperties; }
-
- private:
- // Management object is used in the constructor so must be early
- qmf::org::apache::qpid::broker::Connection::shared_ptr mgmtObject;
-
- //contained output tasks
- sys::AggregateOutput outputTasks;
-
- boost::scoped_ptr<framing::FrameHandler> outboundTracker;
- boost::scoped_ptr<sys::ConnectionOutputHandler> out;
-
- Broker& broker;
-
- framing::ProtocolVersion version;
- uint32_t framemax;
- uint16_t heartbeat;
- uint16_t heartbeatmax;
- std::string userId;
- std::string url;
- bool userProxyAuth;
- std::string federationPeerTag;
- std::vector<Url> knownHosts;
- std::string userName;
- bool isDefaultRealm;
-
- typedef boost::ptr_map<framing::ChannelId, SessionHandler> ChannelMap;
-
- ChannelMap channels;
- qpid::sys::SecuritySettings securitySettings;
- const bool link;
- ConnectionHandler adapter;
- bool mgmtClosing;
- const std::string mgmtId;
- sys::Mutex ioCallbackLock;
- std::queue<boost::function0<void> > ioCallbacks;
- LinkRegistry& links;
- management::ManagementAgent* agent;
- sys::Timer& timer;
- boost::intrusive_ptr<sys::TimerTask> heartbeatTimer, linkHeartbeatTimer;
- boost::intrusive_ptr<ConnectionTimeoutTask> timeoutTimer;
- uint64_t objectId;
- framing::FieldTable clientProperties;
-
-friend class OutboundFrameTracker;
- void sent(const framing::AMQFrame& f);
- void doIoCallbacks();
+namespace broker {
- public:
+class OwnershipToken;
- qmf::org::apache::qpid::broker::Connection::shared_ptr getMgmtObject() { return mgmtObject; }
+/**
+ * Protocol independent connection abstraction.
+ */
+class Connection {
+public:
+ virtual ~Connection() {}
+ virtual const OwnershipToken* getOwnership() const = 0;
+ virtual const management::ObjectId getObjectId() const = 0;
+ virtual const std::string& getUserId() const = 0;
+ virtual const std::string& getMgmtId() const = 0;
+ virtual const std::map<std::string, types::Variant>& getClientProperties() const = 0;
+ virtual bool isLink() const = 0;
+ virtual void abort() = 0;
};
-
-}}
+}} // namespace qpid::broker
#endif /*!QPID_BROKER_CONNECTION_H*/
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/ConnectionHandler.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/ConnectionHandler.cpp?rev=1496466&r1=1496465&r2=1496466&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/ConnectionHandler.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/ConnectionHandler.cpp Tue Jun 25 13:28:15 2013
@@ -24,7 +24,7 @@
#include "qpid/SaslFactory.h"
#include "qpid/broker/Broker.h"
-#include "qpid/broker/Connection.h"
+#include "qpid/broker/amqp_0_10/Connection.h"
#include "qpid/broker/SecureConnection.h"
#include "qpid/Url.h"
#include "qpid/framing/AllInvoker.h"
@@ -109,10 +109,10 @@ void ConnectionHandler::setSecureConnect
handler->secured = secured;
}
-ConnectionHandler::ConnectionHandler(Connection& connection, bool isClient) :
+ConnectionHandler::ConnectionHandler(amqp_0_10::Connection& connection, bool isClient) :
handler(new Handler(connection, isClient)) {}
-ConnectionHandler::Handler::Handler(Connection& c, bool isClient) :
+ConnectionHandler::Handler::Handler(amqp_0_10::Connection& c, bool isClient) :
proxy(c.getOutput()),
connection(c), serverMode(!isClient), secured(0),
isOpen(false)
@@ -153,14 +153,14 @@ void ConnectionHandler::Handler::startOk
{
const framing::FieldTable& clientProperties = body.getClientProperties();
qmf::org::apache::qpid::broker::Connection::shared_ptr mgmtObject = connection.getMgmtObject();
+ types::Variant::Map properties;
+ qpid::amqp_0_10::translate(clientProperties, properties);
if (mgmtObject != 0) {
string procName = clientProperties.getAsString(CLIENT_PROCESS_NAME);
uint32_t pid = clientProperties.getAsInt(CLIENT_PID);
uint32_t ppid = clientProperties.getAsInt(CLIENT_PPID);
- types::Variant::Map properties;
- qpid::amqp_0_10::translate(clientProperties, properties);
mgmtObject->set_remoteProperties(properties);
if (!procName.empty())
mgmtObject->set_remoteProcessName(procName);
@@ -192,7 +192,7 @@ void ConnectionHandler::Handler::startOk
throw;
}
- connection.setClientProperties(clientProperties);
+ connection.setClientProperties(properties);
if (clientProperties.isSet(QPID_FED_TAG)) {
connection.setFederationPeerTag(clientProperties.getAsString(QPID_FED_TAG));
}
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/ConnectionHandler.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/ConnectionHandler.h?rev=1496466&r1=1496465&r2=1496466&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/ConnectionHandler.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/ConnectionHandler.h Tue Jun 25 13:28:15 2013
@@ -47,7 +47,9 @@ struct SecuritySettings;
namespace broker {
+namespace amqp_0_10 {
class Connection;
+}
class SecureConnection;
class ConnectionHandler : public framing::FrameHandler
@@ -55,13 +57,13 @@ class ConnectionHandler : public framing
struct Handler : public framing::AMQP_AllOperations::ConnectionHandler
{
framing::AMQP_AllProxy::Connection proxy;
- Connection& connection;
+ amqp_0_10::Connection& connection;
bool serverMode;
std::auto_ptr<SaslAuthenticator> authenticator;
SecureConnection* secured;
bool isOpen;
- Handler(Connection& connection, bool isClient);
+ Handler(amqp_0_10::Connection& connection, bool isClient);
~Handler();
void startOk(const qpid::framing::ConnectionStartOkBody& body);
void startOk(const qpid::framing::FieldTable& clientProperties,
@@ -99,7 +101,7 @@ class ConnectionHandler : public framing
bool handle(const qpid::framing::AMQMethodBody& method);
public:
- ConnectionHandler(Connection& connection, bool isClient );
+ ConnectionHandler(amqp_0_10::Connection& connection, bool isClient );
void close(framing::connection::CloseCode code, const std::string& text);
void heartbeat();
void handle(framing::AMQFrame& frame);
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/HandlerImpl.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/HandlerImpl.h?rev=1496466&r1=1496465&r2=1496466&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/HandlerImpl.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/HandlerImpl.h Tue Jun 25 13:28:15 2013
@@ -21,7 +21,7 @@
#include "qpid/broker/SemanticState.h"
#include "qpid/broker/SessionContext.h"
-#include "qpid/broker/Connection.h"
+#include "qpid/broker/amqp_0_10/Connection.h"
namespace qpid {
namespace broker {
@@ -40,7 +40,7 @@ class HandlerImpl {
HandlerImpl(SemanticState& s) : state(s), session(s.getSession()) {}
framing::AMQP_ClientProxy& getProxy() { return session.getProxy(); }
- Connection& getConnection() { return session.getConnection(); }
+ amqp_0_10::Connection& getConnection() { return session.getConnection(); }
Broker& getBroker() { return session.getConnection().getBroker(); }
};
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Link.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Link.cpp?rev=1496466&r1=1496465&r2=1496466&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Link.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Link.cpp Tue Jun 25 13:28:15 2013
@@ -22,7 +22,7 @@
#include "qpid/broker/Link.h"
#include "qpid/broker/LinkRegistry.h"
#include "qpid/broker/Broker.h"
-#include "qpid/broker/Connection.h"
+#include "qpid/broker/amqp_0_10/Connection.h"
#include "qpid/sys/Timer.h"
#include "qmf/org/apache/qpid/broker/EventBrokerLinkUp.h"
#include "qmf/org/apache/qpid/broker/EventBrokerLinkDown.h"
@@ -233,7 +233,7 @@ void Link::startConnectionLH ()
}
}
-void Link::established(Connection* c)
+void Link::established(qpid::broker::amqp_0_10::Connection* c)
{
stringstream addr;
addr << host << ":" << port;
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Link.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Link.h?rev=1496466&r1=1496465&r2=1496466&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Link.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Link.h Tue Jun 25 13:28:15 2013
@@ -45,8 +45,10 @@ namespace broker {
class LinkRegistry;
class Broker;
-class Connection;
class LinkExchange;
+namespace amqp_0_10 {
+class Connection;
+}
class Link : public PersistableConfig, public management::Manageable {
private:
@@ -83,7 +85,7 @@ class Link : public PersistableConfig, p
Bridges cancellations; // Bridges pending cancellation
framing::ChannelId nextFreeChannel;
RangeSet<framing::ChannelId> freeChannels;
- Connection* connection;
+ amqp_0_10::Connection* connection;
management::ManagementAgent* agent;
boost::function<void(Link*)> listener;
boost::intrusive_ptr<sys::TimerTask> timerTask;
@@ -109,7 +111,7 @@ class Link : public PersistableConfig, p
void reconnectLH(const Address&); //called by LinkRegistry
// connection management (called by LinkRegistry)
- void established(Connection*); // Called when connection is created
+ void established(amqp_0_10::Connection*); // Called when connection is created
void opened(); // Called when connection is open (after create)
void closed(int, std::string); // Called when connection goes away
void notifyConnectionForced(const std::string text);
@@ -194,7 +196,7 @@ class Link : public PersistableConfig, p
/** The current connction for this link. Note returns 0 if the link is not
* presently connected.
*/
- Connection* getConnection() { return connection; }
+ amqp_0_10::Connection* getConnection() { return connection; }
};
}
}
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/LinkRegistry.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/LinkRegistry.cpp?rev=1496466&r1=1496465&r2=1496466&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/LinkRegistry.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/LinkRegistry.cpp Tue Jun 25 13:28:15 2013
@@ -21,7 +21,7 @@
#include "qpid/broker/LinkRegistry.h"
#include "qpid/broker/Broker.h"
-#include "qpid/broker/Connection.h"
+#include "qpid/broker/amqp_0_10/Connection.h"
#include "qpid/broker/Link.h"
#include "qpid/log/Statement.h"
#include <iostream>
@@ -53,10 +53,26 @@ class LinkRegistryConnectionObserver : p
LinkRegistry& links;
public:
LinkRegistryConnectionObserver(LinkRegistry& l) : links(l) {}
- void connection(Connection& c) { links.notifyConnection(c.getMgmtId(), &c); }
- void opened(Connection& c) { links.notifyOpened(c.getMgmtId()); }
- void closed(Connection& c) { links.notifyClosed(c.getMgmtId()); }
- void forced(Connection& c, const string& text) { links.notifyConnectionForced(c.getMgmtId(), text); }
+ void connection(Connection& in)
+ {
+ amqp_0_10::Connection* c = dynamic_cast<amqp_0_10::Connection*>(&in);
+ if (c) links.notifyConnection(c->getMgmtId(), c);
+ }
+ void opened(Connection& in)
+ {
+ amqp_0_10::Connection* c = dynamic_cast<amqp_0_10::Connection*>(&in);
+ if (c) links.notifyOpened(c->getMgmtId());
+ }
+ void closed(Connection& in)
+ {
+ amqp_0_10::Connection* c = dynamic_cast<amqp_0_10::Connection*>(&in);
+ if (c) links.notifyClosed(c->getMgmtId());
+ }
+ void forced(Connection& in, const string& text)
+ {
+ amqp_0_10::Connection* c = dynamic_cast<amqp_0_10::Connection*>(&in);
+ if (c) links.notifyConnectionForced(c->getMgmtId(), text);
+ }
};
LinkRegistry::LinkRegistry (Broker* _broker) :
@@ -287,7 +303,7 @@ Link::shared_ptr LinkRegistry::findLink(
return Link::shared_ptr();
}
-void LinkRegistry::notifyConnection(const std::string& key, Connection* c)
+void LinkRegistry::notifyConnection(const std::string& key, amqp_0_10::Connection* c)
{
// find a link that is attempting to connect to the remote, and
// create a mapping from connection id to link
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/LinkRegistry.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/LinkRegistry.h?rev=1496466&r1=1496465&r2=1496466&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/LinkRegistry.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/LinkRegistry.h Tue Jun 25 13:28:15 2013
@@ -35,10 +35,11 @@
namespace qpid {
namespace broker {
-
+namespace amqp_0_10 {
+ class Connection;
+}
class Link;
class Broker;
- class Connection;
class LinkRegistry {
typedef std::map<std::string, boost::shared_ptr<Link> > LinkMap;
typedef std::map<std::string, Bridge::shared_ptr> BridgeMap;
@@ -58,7 +59,7 @@ namespace broker {
boost::shared_ptr<Link> findLink(const std::string& key);
// Methods called by the connection observer, key is connection identifier
- void notifyConnection (const std::string& key, Connection* c);
+ void notifyConnection (const std::string& key, amqp_0_10::Connection* c);
void notifyOpened (const std::string& key);
void notifyClosed (const std::string& key);
void notifyConnectionForced (const std::string& key, const std::string& text);
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Message.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Message.cpp?rev=1496466&r1=1496465&r2=1496466&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Message.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Message.cpp Tue Jun 25 13:28:15 2013
@@ -23,7 +23,8 @@
#include "qpid/amqp/CharSequence.h"
#include "qpid/amqp/MapHandler.h"
-#include "qpid/broker/ConnectionIdentity.h"
+#include "qpid/broker/Connection.h"
+#include "qpid/broker/OwnershipToken.h"
#include "qpid/management/ManagementObject.h"
#include "qpid/management/Manageable.h"
#include "qpid/StringUtils.h"
@@ -203,10 +204,10 @@ uint8_t Message::getPriority() const
bool Message::getIsManagementMessage() const { return isManagementMessage; }
void Message::setIsManagementMessage(bool b) { isManagementMessage = b; }
-const OwnershipToken* Message::getPublisherOwnership() const { return publisher->getOwnership(); }
-const management::ObjectId Message::getPublisherObjectId() const { return publisher->getObjectId(); }
-const std::string& Message::getPublisherUserId() const { return publisher->getUserId(); }
-const std::string& Message::getPublisherUrl() const { return publisher->getUrl(); }
+const Connection* Message::getPublisher() const { return publisher; }
+void Message::setPublisher(const Connection& p) { publisher = &p; }
+bool Message::isLocalTo(const OwnershipToken* token) const { return token && publisher && token->isLocal(publisher->getOwnership()); }
+
qpid::framing::SequenceNumber Message::getSequence() const
{
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Message.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Message.h?rev=1496466&r1=1496465&r2=1496466&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Message.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Message.h Tue Jun 25 13:28:15 2013
@@ -47,7 +47,7 @@ class Manageable;
namespace broker {
class OwnershipToken;
-class ConnectionIdentity;
+class Connection;
enum MessageState
{
@@ -85,12 +85,9 @@ public:
int getDeliveryCount() const { return deliveryCount; }
void resetDeliveryCount() { deliveryCount = -1; }
- void setPublisher(const ConnectionIdentity& p) { publisher = &p; }
- const ConnectionIdentity& getPublisher() const { return *publisher; }
- const OwnershipToken* getPublisherOwnership() const;
- const management::ObjectId getPublisherObjectId() const;
- const std::string& getPublisherUserId() const;
- const std::string& getPublisherUrl() const;
+ void setPublisher(const Connection& p);
+ const Connection* getPublisher() const;
+ bool isLocalTo(const OwnershipToken*) const;
QPID_BROKER_EXTERN std::string getRoutingKey() const;
QPID_BROKER_EXTERN bool isPersistent() const;
@@ -148,7 +145,7 @@ public:
boost::intrusive_ptr<Encoding> encoding;
boost::intrusive_ptr<PersistableMessage> persistentContext;
int deliveryCount;
- const ConnectionIdentity* publisher;
+ const Connection* publisher;
qpid::sys::AbsTime expiration;
boost::intrusive_ptr<ExpiryPolicy> expiryPolicy;
uint64_t timestamp;
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp?rev=1496466&r1=1496465&r2=1496466&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp Tue Jun 25 13:28:15 2013
@@ -220,18 +220,13 @@ Queue::~Queue()
{
}
-bool isLocalTo(const OwnershipToken* token, const Message& msg)
-{
- return token && token->isLocal(msg.getPublisherOwnership());
-}
-
bool Queue::isLocal(const Message& msg)
{
//message is considered local if it was published on the same
//connection as that of the session which declared this queue
//exclusive (owner) or which has an exclusive subscription
//(exclusive)
- return settings.noLocal && (isLocalTo(owner, msg) || isLocalTo(exclusive, msg));
+ return settings.noLocal && (msg.isLocalTo(owner) || msg.isLocalTo(exclusive));
}
bool Queue::isExcluded(const Message& msg)
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/SaslAuthenticator.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/SaslAuthenticator.cpp?rev=1496466&r1=1496465&r2=1496466&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/SaslAuthenticator.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/SaslAuthenticator.cpp Tue Jun 25 13:28:15 2013
@@ -21,7 +21,7 @@
#include "qpid/broker/AclModule.h"
#include "qpid/broker/Broker.h"
-#include "qpid/broker/Connection.h"
+#include "qpid/broker/amqp_0_10/Connection.h"
#include "qpid/framing/reply_exceptions.h"
#include "qpid/framing/FieldValue.h"
#include "qpid/log/Statement.h"
@@ -54,12 +54,12 @@ namespace broker {
class NullAuthenticator : public SaslAuthenticator
{
- Connection& connection;
+ amqp_0_10::Connection& connection;
framing::AMQP_ClientProxy::Connection client;
std::string realm;
const bool encrypt;
public:
- NullAuthenticator(Connection& connection, bool encrypt);
+ NullAuthenticator(amqp_0_10::Connection& connection, bool encrypt);
~NullAuthenticator();
void getMechanisms(framing::Array& mechanisms);
void start(const std::string& mechanism, const std::string* response);
@@ -74,7 +74,7 @@ public:
class CyrusAuthenticator : public SaslAuthenticator
{
sasl_conn_t *sasl_conn;
- Connection& connection;
+ amqp_0_10::Connection& connection;
framing::AMQP_ClientProxy::Connection client;
const bool encrypt;
@@ -82,7 +82,7 @@ class CyrusAuthenticator : public SaslAu
bool getUsername(std::string& uid);
public:
- CyrusAuthenticator(Connection& connection, bool encrypt);
+ CyrusAuthenticator(amqp_0_10::Connection& connection, bool encrypt);
~CyrusAuthenticator();
void init();
void getMechanisms(framing::Array& mechanisms);
@@ -167,7 +167,7 @@ void SaslAuthenticator::fini(void)
#endif
-std::auto_ptr<SaslAuthenticator> SaslAuthenticator::createAuthenticator(Connection& c )
+std::auto_ptr<SaslAuthenticator> SaslAuthenticator::createAuthenticator(amqp_0_10::Connection& c )
{
if (c.getBroker().getOptions().auth) {
return std::auto_ptr<SaslAuthenticator>(
@@ -179,7 +179,7 @@ std::auto_ptr<SaslAuthenticator> SaslAut
}
-NullAuthenticator::NullAuthenticator(Connection& c, bool e) : connection(c), client(c.getOutput()),
+NullAuthenticator::NullAuthenticator(amqp_0_10::Connection& c, bool e) : connection(c), client(c.getOutput()),
realm(c.getBroker().getOptions().realm), encrypt(e) {}
NullAuthenticator::~NullAuthenticator() {}
@@ -246,7 +246,7 @@ std::auto_ptr<SecurityLayer> NullAuthent
#if HAVE_SASL
-CyrusAuthenticator::CyrusAuthenticator(Connection& c, bool _encrypt) :
+CyrusAuthenticator::CyrusAuthenticator(amqp_0_10::Connection& c, bool _encrypt) :
sasl_conn(0), connection(c), client(c.getOutput()), encrypt(_encrypt)
{
init();
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/SaslAuthenticator.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/SaslAuthenticator.h?rev=1496466&r1=1496465&r2=1496466&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/SaslAuthenticator.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/SaslAuthenticator.h Tue Jun 25 13:28:15 2013
@@ -34,7 +34,9 @@
namespace qpid {
namespace broker {
+namespace amqp_0_10 {
class Connection;
+}
class SaslAuthenticator
{
@@ -54,7 +56,7 @@ public:
static void init(const std::string& saslName, std::string const & saslConfigPath );
static void fini(void);
- static std::auto_ptr<SaslAuthenticator> createAuthenticator(Connection& connection);
+ static std::auto_ptr<SaslAuthenticator> createAuthenticator(amqp_0_10::Connection& connection);
virtual void callUserIdCallbacks() { }
};
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/SecureConnectionFactory.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/SecureConnectionFactory.cpp?rev=1496466&r1=1496465&r2=1496466&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/SecureConnectionFactory.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/SecureConnectionFactory.cpp Tue Jun 25 13:28:15 2013
@@ -22,7 +22,7 @@
#include "qpid/amqp_0_10/Connection.h"
#include "qpid/broker/Broker.h"
-#include "qpid/broker/Connection.h"
+#include "qpid/broker/amqp_0_10/Connection.h"
#include "qpid/broker/SecureConnection.h"
#include "qpid/framing/ProtocolVersion.h"
#include "qpid/log/Statement.h"
@@ -35,7 +35,7 @@ using framing::ProtocolVersion;
using qpid::sys::SecuritySettings;
typedef std::auto_ptr<qpid::amqp_0_10::Connection> CodecPtr;
typedef std::auto_ptr<SecureConnection> SecureConnectionPtr;
-typedef std::auto_ptr<Connection> ConnectionPtr;
+typedef std::auto_ptr<qpid::broker::amqp_0_10::Connection> ConnectionPtr;
typedef std::auto_ptr<sys::ConnectionInputHandler> InputPtr;
SecureConnectionFactory::SecureConnectionFactory(Broker& b) : broker(b) {}
@@ -64,7 +64,7 @@ SecureConnectionFactory::create_0_10(sys
{
SecureConnectionPtr sc(new SecureConnection());
CodecPtr c(new qpid::amqp_0_10::Connection(out, id, brokerActsAsClient));
- ConnectionPtr i(new broker::Connection(c.get(), broker, id, external, brokerActsAsClient));
+ ConnectionPtr i(new broker::amqp_0_10::Connection(c.get(), broker, id, external, brokerActsAsClient));
i->setSecureConnection(sc.get());
c->setInputHandler(InputPtr(i.release()));
sc->setCodec(std::auto_ptr<sys::ConnectionCodec>(c));
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp?rev=1496466&r1=1496465&r2=1496466&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp Tue Jun 25 13:28:15 2013
@@ -22,7 +22,7 @@
#include "qpid/broker/SessionState.h"
#include "qpid/broker/Broker.h"
-#include "qpid/broker/Connection.h"
+#include "qpid/broker/amqp_0_10/Connection.h"
#include "qpid/broker/DeliverableMessage.h"
#include "qpid/broker/DtxAck.h"
#include "qpid/broker/DtxTimeout.h"
@@ -83,7 +83,7 @@ SemanticState::SemanticState(SessionStat
authMsg(getSession().getBroker().getOptions().auth && !getSession().getConnection().isUserProxyAuth()),
userID(getSession().getConnection().getUserId()),
closeComplete(false),
- connectionId(getSession().getConnection().getUrl())
+ connectionId(getSession().getConnection().getMgmtId())
{}
SemanticState::~SemanticState() {
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/SessionAdapter.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/SessionAdapter.cpp?rev=1496466&r1=1496465&r2=1496466&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/SessionAdapter.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/SessionAdapter.cpp Tue Jun 25 13:28:15 2013
@@ -18,7 +18,7 @@
#include "qpid/broker/SessionAdapter.h"
#include "qpid/broker/Broker.h"
-#include "qpid/broker/Connection.h"
+#include "qpid/broker/amqp_0_10/Connection.h"
#include "qpid/broker/DtxTimeout.h"
#include "qpid/broker/Queue.h"
#include "qpid/Exception.h"
@@ -96,14 +96,14 @@ void SessionAdapter::ExchangeHandlerImpl
try{
std::pair<Exchange::shared_ptr, bool> response =
getBroker().createExchange(exchange, type, durable, alternateExchange, args,
- getConnection().getUserId(), getConnection().getUrl());
+ getConnection().getUserId(), getConnection().getMgmtId());
if (!response.second) {
//exchange already there, not created
checkType(response.first, type);
checkAlternate(response.first, alternate);
QPID_LOG_CAT(debug, model, "Create exchange. name:" << exchange
<< " user:" << getConnection().getUserId()
- << " rhost:" << getConnection().getUrl()
+ << " rhost:" << getConnection().getMgmtId()
<< " type:" << type
<< " alternateExchange:" << alternateExchange
<< " durable:" << (durable ? "T" : "F"));
@@ -134,7 +134,7 @@ void SessionAdapter::ExchangeHandlerImpl
void SessionAdapter::ExchangeHandlerImpl::delete_(const string& name, bool /*ifUnused*/)
{
//TODO: implement if-unused
- getBroker().deleteExchange(name, getConnection().getUserId(), getConnection().getUrl());
+ getBroker().deleteExchange(name, getConnection().getUserId(), getConnection().getMgmtId());
}
ExchangeQueryResult SessionAdapter::ExchangeHandlerImpl::query(const string& name)
@@ -156,7 +156,7 @@ void SessionAdapter::ExchangeHandlerImpl
const FieldTable& arguments)
{
getBroker().bind(queueName, exchangeName, routingKey, arguments,
- getConnection().getUserId(), getConnection().getUrl());
+ getConnection().getUserId(), getConnection().getMgmtId());
state.addBinding(queueName, exchangeName, routingKey, arguments);
}
@@ -166,7 +166,7 @@ void SessionAdapter::ExchangeHandlerImpl
{
state.removeBinding(queueName, exchangeName, routingKey);
getBroker().unbind(queueName, exchangeName, routingKey,
- getConnection().getUserId(), getConnection().getUrl());
+ getConnection().getUserId(), getConnection().getMgmtId());
}
ExchangeBoundResult SessionAdapter::ExchangeHandlerImpl::bound(const std::string& exchangeName,
@@ -209,7 +209,7 @@ ExchangeBoundResult SessionAdapter::Exch
SessionAdapter::QueueHandlerImpl::QueueHandlerImpl(SemanticState& session)
: HandlerHelper(session), broker(getBroker()),
//record connection id and userid for deleting exclsuive queues after session has ended:
- connectionId(getConnection().getUrl()), userId(getConnection().getUserId())
+ connectionId(getConnection().getMgmtId()), userId(getConnection().getUserId())
{}
@@ -302,7 +302,7 @@ void SessionAdapter::QueueHandlerImpl::d
exclusive ? &session : 0,
alternateExchange,
getConnection().getUserId(),
- getConnection().getUrl());
+ getConnection().getMgmtId());
queue = queue_created.first;
assert(queue);
if (queue_created.second) { // This is a new queue
@@ -316,7 +316,7 @@ void SessionAdapter::QueueHandlerImpl::d
}
QPID_LOG_CAT(debug, model, "Create queue. name:" << name
<< " user:" << getConnection().getUserId()
- << " rhost:" << getConnection().getUrl()
+ << " rhost:" << getConnection().getMgmtId()
<< " durable:" << (durable ? "T" : "F")
<< " exclusive:" << (exclusive ? "T" : "F")
<< " autodelete:" << (autoDelete ? "T" : "F")
@@ -363,7 +363,7 @@ void SessionAdapter::QueueHandlerImpl::c
void SessionAdapter::QueueHandlerImpl::delete_(const string& queue, bool ifUnused, bool ifEmpty)
{
- getBroker().deleteQueue(queue, getConnection().getUserId(), getConnection().getUrl(),
+ getBroker().deleteQueue(queue, getConnection().getUserId(), getConnection().getMgmtId(),
boost::bind(&SessionAdapter::QueueHandlerImpl::checkDelete, this, _1, ifUnused, ifEmpty));
}
@@ -429,12 +429,12 @@ SessionAdapter::MessageHandlerImpl::subs
ManagementAgent* agent = getBroker().getManagementAgent();
if (agent)
- agent->raiseEvent(_qmf::EventSubscribe(getConnection().getUrl(), getConnection().getUserId(),
+ agent->raiseEvent(_qmf::EventSubscribe(getConnection().getMgmtId(), getConnection().getUserId(),
queueName, destination, exclusive, ManagementAgent::toMap(arguments)));
QPID_LOG_CAT(debug, model, "Create subscription. queue:" << queueName
<< " destination:" << destination
<< " user:" << getConnection().getUserId()
- << " rhost:" << getConnection().getUrl()
+ << " rhost:" << getConnection().getMgmtId()
<< " exclusive:" << (exclusive ? "T" : "F")
);
}
@@ -448,10 +448,10 @@ SessionAdapter::MessageHandlerImpl::canc
ManagementAgent* agent = getBroker().getManagementAgent();
if (agent)
- agent->raiseEvent(_qmf::EventUnsubscribe(getConnection().getUrl(), getConnection().getUserId(), destination));
+ agent->raiseEvent(_qmf::EventUnsubscribe(getConnection().getMgmtId(), getConnection().getUserId(), destination));
QPID_LOG_CAT(debug, model, "Delete subscription. destination:" << destination
<< " user:" << getConnection().getUserId()
- << " rhost:" << getConnection().getUrl() );
+ << " rhost:" << getConnection().getMgmtId() );
}
void
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/SessionContext.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/SessionContext.h?rev=1496466&r1=1496465&r2=1496466&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/SessionContext.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/SessionContext.h Tue Jun 25 13:28:15 2013
@@ -36,14 +36,16 @@ class AMQP_ClientProxy;
namespace broker {
class Broker;
+namespace amqp_0_10 {
class Connection;
+}
class SessionContext : public OwnershipToken
{
public:
virtual ~SessionContext(){}
virtual bool isAttached() const = 0;
- virtual Connection& getConnection() = 0;
+ virtual amqp_0_10::Connection& getConnection() = 0;
virtual framing::AMQP_ClientProxy& getProxy() = 0;
virtual Broker& getBroker() = 0;
virtual uint16_t getChannel() const = 0;
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/SessionHandler.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/SessionHandler.cpp?rev=1496466&r1=1496465&r2=1496466&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/SessionHandler.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/SessionHandler.cpp Tue Jun 25 13:28:15 2013
@@ -20,7 +20,7 @@
#include "qpid/broker/SessionHandler.h"
#include "qpid/broker/Broker.h"
-#include "qpid/broker/Connection.h"
+#include "qpid/broker/amqp_0_10/Connection.h"
#include "qpid/broker/SessionState.h"
#include "qpid/log/Statement.h"
#include "qpid/sys/ConnectionOutputHandler.h"
@@ -33,7 +33,7 @@ using namespace framing;
using namespace std;
using namespace qpid::sys;
-SessionHandler::SessionHandler(Connection& c, ChannelId ch)
+SessionHandler::SessionHandler(amqp_0_10::Connection& c, ChannelId ch)
: qpid::amqp_0_10::SessionHandler(&c.getOutput(), ch),
connection(c),
proxy(out)
@@ -64,9 +64,9 @@ void SessionHandler::executionException(
errorListener->executionException(code, msg);
}
-Connection& SessionHandler::getConnection() { return connection; }
+amqp_0_10::Connection& SessionHandler::getConnection() { return connection; }
-const Connection& SessionHandler::getConnection() const { return connection; }
+const amqp_0_10::Connection& SessionHandler::getConnection() const { return connection; }
void SessionHandler::handleDetach() {
qpid::amqp_0_10::SessionHandler::handleDetach();
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/SessionHandler.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/SessionHandler.h?rev=1496466&r1=1496465&r2=1496466&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/SessionHandler.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/SessionHandler.h Tue Jun 25 13:28:15 2013
@@ -31,8 +31,9 @@ namespace qpid {
class SessionState;
namespace broker {
-
+namespace amqp_0_10 {
class Connection;
+}
class SessionState;
/**
@@ -57,15 +58,15 @@ class SessionHandler : public qpid::amqp
/**
*@param e must not be deleted until ErrorListener::detach has been called */
- SessionHandler(Connection&, framing::ChannelId);
+ SessionHandler(amqp_0_10::Connection&, framing::ChannelId);
~SessionHandler();
/** Get broker::SessionState */
SessionState* getSession() { return session.get(); }
const SessionState* getSession() const { return session.get(); }
- Connection& getConnection();
- const Connection& getConnection() const;
+ amqp_0_10::Connection& getConnection();
+ const amqp_0_10::Connection& getConnection() const;
framing::AMQP_ClientProxy& getProxy() { return proxy; }
const framing::AMQP_ClientProxy& getProxy() const { return proxy; }
@@ -93,7 +94,7 @@ class SessionHandler : public qpid::amqp
: framing::AMQP_ClientProxy(setChannel), setChannel(ch, out) {}
};
- Connection& connection;
+ amqp_0_10::Connection& connection;
framing::AMQP_ClientProxy proxy;
std::auto_ptr<SessionState> session;
boost::shared_ptr<ErrorListener> errorListener;
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.cpp?rev=1496466&r1=1496465&r2=1496466&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.cpp Tue Jun 25 13:28:15 2013
@@ -96,7 +96,7 @@ uint16_t SessionState::getChannel() cons
return handler->getChannel();
}
-Connection& SessionState::getConnection() {
+amqp_0_10::Connection& SessionState::getConnection() {
assert(isAttached());
return handler->getConnection();
}
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.h?rev=1496466&r1=1496465&r2=1496466&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.h Tue Jun 25 13:28:15 2013
@@ -89,7 +89,7 @@ class SessionState : public qpid::Sessio
uint16_t getChannel() const;
/** @pre isAttached() */
- Connection& getConnection();
+ amqp_0_10::Connection& getConnection();
bool isLocal(const OwnershipToken* t) const;
Broker& getBroker();
Added: qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Authorise.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Authorise.cpp?rev=1496466&view=auto
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Authorise.cpp (added)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Authorise.cpp Tue Jun 25 13:28:15 2013
@@ -0,0 +1,131 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "Authorise.h"
+#include "Exception.h"
+#include "Filter.h"
+#include "qpid/amqp/descriptors.h"
+#include "qpid/broker/AclModule.h"
+#include "qpid/broker/Exchange.h"
+#include "qpid/broker/Message.h"
+#include "qpid/broker/Queue.h"
+#include "qpid/types/Variant.h"
+#include <map>
+#include <boost/lexical_cast.hpp>
+
+namespace qpid {
+namespace broker {
+namespace amqp {
+namespace {
+const std::string B_TRUE("true");
+const std::string B_FALSE("false");
+const std::string POLICY_TYPE("qpid.policy_type");
+}
+
+Authorise::Authorise(const std::string& u, AclModule* a) : user(u), acl(a) {}
+void Authorise::access(boost::shared_ptr<Exchange> exchange)
+{
+ if (acl) {
+ std::map<acl::Property, std::string> params;
+ params.insert(std::make_pair(acl::PROP_TYPE, exchange->getType()));
+ params.insert(std::make_pair(acl::PROP_DURABLE, exchange->isDurable() ? B_TRUE : B_FALSE));
+ if (!acl->authorise(user, acl::ACT_ACCESS, acl::OBJ_EXCHANGE, exchange->getName(), ¶ms))
+ throw Exception(qpid::amqp::error_conditions::UNAUTHORIZED_ACCESS, QPID_MSG("ACL denied exchange access request from " << user));
+ }
+}
+void Authorise::access(boost::shared_ptr<Queue> queue)
+{
+ if (acl) {
+ const QueueSettings& settings = queue->getSettings();
+ std::map<acl::Property, std::string> params;
+ boost::shared_ptr<Exchange> altex = queue->getAlternateExchange();
+ if (altex)
+ params.insert(std::make_pair(acl::PROP_ALTERNATE, altex->getName()));
+ params.insert(std::make_pair(acl::PROP_DURABLE, settings.durable ? B_TRUE : B_FALSE));
+ params.insert(std::make_pair(acl::PROP_EXCLUSIVE, queue->hasExclusiveOwner() ? B_TRUE : B_FALSE));
+ params.insert(std::make_pair(acl::PROP_AUTODELETE, settings.autodelete ? B_TRUE : B_FALSE));
+ qpid::types::Variant::Map::const_iterator i = settings.original.find(POLICY_TYPE);
+ if (i != settings.original.end())
+ params.insert(std::make_pair(acl::PROP_POLICYTYPE, i->second.asString()));
+ if (settings.maxDepth.hasCount())
+ params.insert(std::make_pair(acl::PROP_MAXQUEUECOUNT, boost::lexical_cast<std::string>(settings.maxDepth.getCount())));
+ if (settings.maxDepth.hasCount())
+ params.insert(std::make_pair(acl::PROP_MAXQUEUESIZE, boost::lexical_cast<std::string>(settings.maxDepth.getSize())));
+ if (!acl->authorise(user, acl::ACT_ACCESS, acl::OBJ_QUEUE, queue->getName(), ¶ms) )
+ throw Exception(qpid::amqp::error_conditions::UNAUTHORIZED_ACCESS, QPID_MSG("ACL denied queue access request from " << user));
+ }
+}
+
+void Authorise::incoming(boost::shared_ptr<Exchange> exchange)
+{
+ access(exchange);
+ //can't check publish permission here as do not yet know routing key
+}
+void Authorise::incoming(boost::shared_ptr<Queue> queue)
+{
+ access(queue);
+ if (acl) {
+ if (!acl->authorise(user, acl::ACT_PUBLISH, acl::OBJ_EXCHANGE, std::string()/*default exchange*/, queue->getName()))
+ throw Exception(qpid::amqp::error_conditions::UNAUTHORIZED_ACCESS, QPID_MSG(user << " cannot publish to queue " << queue->getName()));
+ }
+}
+void Authorise::outgoing(boost::shared_ptr<Exchange> exchange, boost::shared_ptr<Queue> queue, const Filter& filter)
+{
+ access(exchange);
+ if (acl) {
+ std::map<qpid::acl::Property, std::string> params;
+ params.insert(std::make_pair(acl::PROP_QUEUENAME, queue->getName()));
+ params.insert(std::make_pair(acl::PROP_ROUTINGKEY, filter.getBindingKey(exchange)));
+
+ if (!acl->authorise(user, acl::ACT_BIND, acl::OBJ_EXCHANGE, exchange->getName(), ¶ms))
+ throw Exception(qpid::amqp::error_conditions::UNAUTHORIZED_ACCESS, QPID_MSG("ACL denied exchange bind request from " << user));
+
+ if (!acl->authorise(user, acl::ACT_CONSUME, acl::OBJ_QUEUE, queue->getName(), NULL))
+ throw Exception(qpid::amqp::error_conditions::UNAUTHORIZED_ACCESS, QPID_MSG("ACL denied queue subscribe request from " << user));
+ }
+}
+
+void Authorise::outgoing(boost::shared_ptr<Queue> queue)
+{
+ access(queue);
+ if (acl) {
+ if (!acl->authorise(user, acl::ACT_CONSUME, acl::OBJ_QUEUE, queue->getName(), NULL))
+ throw Exception(qpid::amqp::error_conditions::UNAUTHORIZED_ACCESS, QPID_MSG("ACL denied queue subscribe request from " << user));
+ }
+}
+
+void Authorise::route(boost::shared_ptr<Exchange> exchange, const Message& msg)
+{
+ if (acl && acl->doTransferAcl()) {
+ if (!acl->authorise(user, acl::ACT_PUBLISH, acl::OBJ_EXCHANGE, exchange->getName(), msg.getRoutingKey()))
+ throw Exception(qpid::amqp::error_conditions::UNAUTHORIZED_ACCESS, QPID_MSG(user << " cannot publish to " << exchange->getName() << " with routing-key " << msg.getRoutingKey()));
+ }
+}
+
+void Authorise::interlink()
+{
+ if (acl) {
+ if (!acl->authorise(user, acl::ACT_CREATE, acl::OBJ_LINK, "")){
+ throw Exception(qpid::amqp::error_conditions::UNAUTHORIZED_ACCESS, QPID_MSG("ACL denied " << user << " a AMQP 1.0 link"));
+ }
+ }
+}
+
+}}} // namespace qpid::broker::amqp
Copied: qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Authorise.h (from r1496401, qpid/trunk/qpid/cpp/src/qpid/broker/amqp/DataReader.h)
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Authorise.h?p2=qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Authorise.h&p1=qpid/trunk/qpid/cpp/src/qpid/broker/amqp/DataReader.h&r1=1496401&r2=1496466&rev=1496466&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/amqp/DataReader.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Authorise.h Tue Jun 25 13:28:15 2013
@@ -1,5 +1,5 @@
-#ifndef QPID_BROKER_AMQP_DATAREADER_H
-#define QPID_BROKER_AMQP_DATAREADER_H
+#ifndef QPID_BROKER_AMQP_AUTHORISE_H
+#define QPID_BROKER_AMQP_AUTHORISE_H
/*
*
@@ -21,33 +21,37 @@
* under the License.
*
*/
-#include "qpid/amqp/Reader.h"
-
-struct pn_data_t;
+#include <boost/shared_ptr.hpp>
namespace qpid {
-namespace amqp {
-struct Descriptor;
-}
namespace broker {
+class AclModule;
+class Exchange;
+class Message;
+class Queue;
namespace amqp {
+class Filter;
/**
- * Allows use of Reader interface to read pn_data_t* data.
+ * Class to handle authorisation requests (and hide the ACL mess behind)
*/
-class DataReader
+class Authorise
{
public:
- DataReader(qpid::amqp::Reader& reader);
- void read(pn_data_t*);
+ Authorise(const std::string& user, AclModule*);
+ void access(boost::shared_ptr<Exchange>);
+ void access(boost::shared_ptr<Queue>);
+ void incoming(boost::shared_ptr<Exchange>);
+ void incoming(boost::shared_ptr<Queue>);
+ void outgoing(boost::shared_ptr<Exchange>, boost::shared_ptr<Queue>, const Filter&);
+ void outgoing(boost::shared_ptr<Queue>);
+ void route(boost::shared_ptr<Exchange>, const Message&);
+ void interlink();
private:
- qpid::amqp::Reader& reader;
+ const std::string user;
+ AclModule* const acl;
- void readOne(pn_data_t*);
- void readMap(pn_data_t*, const qpid::amqp::Descriptor*);
- void readList(pn_data_t*, const qpid::amqp::Descriptor*);
- void readArray(pn_data_t*, const qpid::amqp::Descriptor*);
};
}}} // namespace qpid::broker::amqp
-#endif /*!QPID_BROKER_AMQP_DATAREADER_H*/
+#endif /*!QPID_BROKER_AMQP_AUTHORISE_H*/
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Connection.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Connection.cpp?rev=1496466&r1=1496465&r2=1496466&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Connection.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Connection.cpp Tue Jun 25 13:28:15 2013
@@ -19,9 +19,12 @@
*
*/
#include "Connection.h"
+#include "DataReader.h"
#include "Session.h"
-#include "qpid/Exception.h"
+#include "Exception.h"
+#include "qpid/broker/AclModule.h"
#include "qpid/broker/Broker.h"
+#include "qpid/amqp/descriptors.h"
#include "qpid/framing/Buffer.h"
#include "qpid/framing/ProtocolInitiation.h"
#include "qpid/framing/ProtocolVersion.h"
@@ -36,7 +39,6 @@ extern "C" {
namespace qpid {
namespace broker {
namespace amqp {
-
Connection::Connection(qpid::sys::OutputControl& o, const std::string& i, qpid::broker::Broker& b, Interconnects& interconnects_, bool saslInUse, const std::string& d)
: ManagedConnection(b, i),
connection(pn_connection()),
@@ -52,6 +54,7 @@ Connection::Connection(qpid::sys::Output
QPID_LOG_TEST_CAT(trace, protocol, enableTrace);
if (enableTrace) pn_transport_trace(transport, PN_TRACE_FRM);
+ broker.getConnectionObservers().connection(*this);
if (!saslInUse) {
//feed in a dummy AMQP 1.0 header as engine expects one, but
//we already read it (if sasl is in use we read the sasl
@@ -62,15 +65,14 @@ Connection::Connection(qpid::sys::Output
pi.encode(buffer);
pn_transport_input(transport, &protocolHeader[0], protocolHeader.size());
- //wont get a userid, so set a dummy one on the ManagedConnection to trigger event
- setUserid("no authentication used");
+ setUserId("none");
}
}
Connection::~Connection()
{
-
+ broker.getConnectionObservers().closed(*this);
pn_transport_free(transport);
pn_connection_free(connection);
}
@@ -97,8 +99,17 @@ size_t Connection::decode(const char* bu
QPID_LOG_CAT(debug, network, id << " decoded " << n << " bytes from " << size);
try {
process();
+ } catch (const Exception& e) {
+ QPID_LOG(error, id << ": " << e.what());
+ pn_condition_t* error = pn_connection_condition(connection);
+ pn_condition_set_name(error, e.symbol());
+ pn_condition_set_description(error, e.what());
+ close();
} catch (const std::exception& e) {
QPID_LOG(error, id << ": " << e.what());
+ pn_condition_t* error = pn_connection_condition(connection);
+ pn_condition_set_name(error, qpid::amqp::error_conditions::INTERNAL_ERROR.c_str());
+ pn_condition_set_description(error, e.what());
close();
}
pn_transport_tick(transport, 0);
@@ -108,7 +119,7 @@ size_t Connection::decode(const char* bu
}
return n;
} else if (n == PN_ERR) {
- throw qpid::Exception(QPID_MSG("Error on input: " << getError()));
+ throw Exception(qpid::amqp::error_conditions::DECODE_ERROR, QPID_MSG("Error on input: " << getError()));
} else {
return 0;
}
@@ -126,7 +137,7 @@ size_t Connection::encode(char* buffer,
haveOutput = size;
return size;//Is this right?
} else if (n == PN_ERR) {
- throw qpid::Exception(QPID_MSG("Error on output: " << getError()));
+ throw Exception(qpid::amqp::error_conditions::INTERNAL_ERROR, QPID_MSG("Error on output: " << getError()));
} else {
haveOutput = false;
return 0;
@@ -139,8 +150,17 @@ bool Connection::canEncode()
if (i->second->dispatch()) haveOutput = true;
}
process();
+ } catch (const Exception& e) {
+ QPID_LOG(error, id << ": " << e.what());
+ pn_condition_t* error = pn_connection_condition(connection);
+ pn_condition_set_name(error, e.symbol());
+ pn_condition_set_description(error, e.what());
+ close();
} catch (const std::exception& e) {
QPID_LOG(error, id << ": " << e.what());
+ pn_condition_t* error = pn_connection_condition(connection);
+ pn_condition_set_name(error, qpid::amqp::error_conditions::INTERNAL_ERROR.c_str());
+ pn_condition_set_description(error, e.what());
close();
}
//TODO: proper handling of time in and out of tick
@@ -148,6 +168,28 @@ bool Connection::canEncode()
QPID_LOG_CAT(trace, network, id << " canEncode(): " << haveOutput)
return haveOutput;
}
+
+void Connection::open()
+{
+ readPeerProperties();
+
+ pn_connection_set_container(connection, broker.getFederationTag().c_str());
+ pn_connection_open(connection);
+ out.connectionEstablished();
+ opened();
+ broker.getConnectionObservers().opened(*this);
+}
+
+void Connection::readPeerProperties()
+{
+ /**
+ * TODO: enable when proton 0.5 has been released:
+ qpid::types::Variant::Map properties;
+ DataReader::read(pn_connection_remote_properties(connection), properties);
+ setPeerProperties(properties);
+ */
+}
+
void Connection::closed()
{
for (Sessions::iterator i = sessions.begin(); i != sessions.end(); ++i) {
@@ -178,10 +220,8 @@ void Connection::process()
QPID_LOG(trace, id << " process()");
if ((pn_connection_state(connection) & REQUIRES_OPEN) == REQUIRES_OPEN) {
QPID_LOG_CAT(debug, model, id << " connection opened");
- pn_connection_set_container(connection, broker.getFederationTag().c_str());
+ open();
setContainerId(pn_connection_remote_container(connection));
- pn_connection_open(connection);
- out.connectionEstablished();
}
for (pn_session_t* s = pn_session_head(connection, REQUIRES_OPEN); s; s = pn_session_next(s, REQUIRES_OPEN)) {
@@ -200,9 +240,17 @@ void Connection::process()
try {
session->second->attach(l);
QPID_LOG_CAT(debug, protocol, id << " link " << l << " attached on " << pn_link_session(l));
+ } catch (const Exception& e) {
+ QPID_LOG_CAT(error, protocol, "Error on attach: " << e.what());
+ pn_condition_t* error = pn_link_condition(l);
+ pn_condition_set_name(error, e.symbol());
+ pn_condition_set_description(error, e.what());
+ pn_link_close(l);
} catch (const std::exception& e) {
QPID_LOG_CAT(error, protocol, "Error on attach: " << e.what());
- //TODO: set error details on detach when that is exposed via engine API
+ pn_condition_t* error = pn_link_condition(l);
+ pn_condition_set_name(error, qpid::amqp::error_conditions::INTERNAL_ERROR.c_str());
+ pn_condition_set_description(error, e.what());
pn_link_close(l);
}
}
@@ -214,7 +262,15 @@ void Connection::process()
if (pn_link_is_receiver(link)) {
Sessions::iterator i = sessions.find(pn_link_session(link));
if (i != sessions.end()) {
- i->second->readable(link, delivery);
+ try {
+ i->second->readable(link, delivery);
+ } catch (const Exception& e) {
+ QPID_LOG_CAT(error, protocol, "Error on publish: " << e.what());
+ pn_condition_t* error = pn_link_condition(link);
+ pn_condition_set_name(error, e.symbol());
+ pn_condition_set_description(error, e.what());
+ pn_link_close(link);
+ }
} else {
pn_delivery_update(delivery, PN_REJECTED);
}
@@ -271,4 +327,19 @@ std::string Connection::getDomain() cons
{
return domain;
}
+
+void Connection::abort()
+{
+ out.abort();
+}
+
+void Connection::setUserId(const std::string& user)
+{
+ ManagedConnection::setUserId(user);
+ AclModule* acl = broker.getAcl();
+ if (acl && !acl->approveConnection(*this))
+ {
+ throw Exception(qpid::amqp::error_conditions::RESOURCE_LIMIT_EXCEEDED, "User connection denied by configured limit");
+ }
+}
}}} // namespace qpid::broker::amqp
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Connection.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Connection.h?rev=1496466&r1=1496465&r2=1496466&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Connection.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Connection.h Tue Jun 25 13:28:15 2013
@@ -58,6 +58,9 @@ class Connection : public sys::Connectio
pn_transport_t* getTransport();
Interconnects& getInterconnects();
std::string getDomain() const;
+ void setUserId(const std::string&);
+ void abort();
+
protected:
typedef std::map<pn_session_t*, boost::shared_ptr<Session> > Sessions;
pn_connection_t* connection;
@@ -73,6 +76,8 @@ class Connection : public sys::Connectio
virtual void process();
std::string getError();
void close();
+ void open();
+ void readPeerProperties();
};
}}} // namespace qpid::broker::amqp
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/amqp/DataReader.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/amqp/DataReader.cpp?rev=1496466&r1=1496465&r2=1496466&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/amqp/DataReader.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/amqp/DataReader.cpp Tue Jun 25 13:28:15 2013
@@ -21,6 +21,7 @@
#include "DataReader.h"
#include "qpid/amqp/CharSequence.h"
#include "qpid/amqp/Descriptor.h"
+#include "qpid/amqp/MapBuilder.h"
#include "qpid/log/Statement.h"
#include <string>
extern "C" {
@@ -52,11 +53,6 @@ DataReader::DataReader(qpid::amqp::Reade
void DataReader::read(pn_data_t* data)
{
- /*
- while (pn_data_next(data)) {
- readOne(data);
- }
- */
do {
readOne(data);
} while (pn_data_next(data));
@@ -184,4 +180,12 @@ void DataReader::readMap(pn_data_t* data
pn_data_exit(data);
reader.onEndMap(count, descriptor);
}
+
+void DataReader::read(pn_data_t* data, std::map<std::string, qpid::types::Variant>& out)
+{
+ qpid::amqp::MapBuilder builder;
+ DataReader reader(builder);
+ reader.read(data);
+ out = builder.getMap();
+}
}}} // namespace qpid::broker::amqp
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/amqp/DataReader.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/amqp/DataReader.h?rev=1496466&r1=1496465&r2=1496466&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/amqp/DataReader.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/amqp/DataReader.h Tue Jun 25 13:28:15 2013
@@ -22,10 +22,15 @@
*
*/
#include "qpid/amqp/Reader.h"
+#include <map>
+#include <string>
struct pn_data_t;
namespace qpid {
+namespace types {
+class Variant;
+}
namespace amqp {
struct Descriptor;
}
@@ -40,6 +45,7 @@ class DataReader
public:
DataReader(qpid::amqp::Reader& reader);
void read(pn_data_t*);
+ static void read(pn_data_t*, std::map<std::string, qpid::types::Variant>&);
private:
qpid::amqp::Reader& reader;
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org