You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by gs...@apache.org on 2016/09/14 16:13:53 UTC
qpid-cpp git commit: QPID-7430: Allow address to be passed through
unparsed to server
Repository: qpid-cpp
Updated Branches:
refs/heads/master 3b9b412e6 -> 7ca027022
QPID-7430: Allow address to be passed through unparsed to server
Project: http://git-wip-us.apache.org/repos/asf/qpid-cpp/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-cpp/commit/7ca02702
Tree: http://git-wip-us.apache.org/repos/asf/qpid-cpp/tree/7ca02702
Diff: http://git-wip-us.apache.org/repos/asf/qpid-cpp/diff/7ca02702
Branch: refs/heads/master
Commit: 7ca027022830646ddeb65f137086aba2f98cf5a7
Parents: 3b9b412
Author: Gordon Sim <gs...@redhat.com>
Authored: Tue Sep 6 22:41:54 2016 +0100
Committer: Gordon Sim <gs...@redhat.com>
Committed: Wed Sep 14 16:52:21 2016 +0100
----------------------------------------------------------------------
src/qpid/messaging/ConnectionOptions.cpp | 2 +
src/qpid/messaging/ConnectionOptions.h | 2 +
src/qpid/messaging/amqp/ConnectionContext.cpp | 60 +++++++++++++++++++++-
src/qpid/messaging/amqp/ConnectionContext.h | 4 ++
4 files changed, 66 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-cpp/blob/7ca02702/src/qpid/messaging/ConnectionOptions.cpp
----------------------------------------------------------------------
diff --git a/src/qpid/messaging/ConnectionOptions.cpp b/src/qpid/messaging/ConnectionOptions.cpp
index d956e9a..3095169 100644
--- a/src/qpid/messaging/ConnectionOptions.cpp
+++ b/src/qpid/messaging/ConnectionOptions.cpp
@@ -125,6 +125,8 @@ void ConnectionOptions::set(const std::string& name, const qpid::types::Variant&
nestAnnotations = value;
} else if (name == "set-to-on-send" || name == "set_to_on_send") {
setToOnSend = value;
+ } else if (name == "address-passthrough" || name == "address_passthrough") {
+ addressPassthrough = value;
} else if (name == "properties" || name == "client-properties" || name == "client_properties") {
properties = value.asMap();
} else {
http://git-wip-us.apache.org/repos/asf/qpid-cpp/blob/7ca02702/src/qpid/messaging/ConnectionOptions.h
----------------------------------------------------------------------
diff --git a/src/qpid/messaging/ConnectionOptions.h b/src/qpid/messaging/ConnectionOptions.h
index c8c8798..6b89838 100644
--- a/src/qpid/messaging/ConnectionOptions.h
+++ b/src/qpid/messaging/ConnectionOptions.h
@@ -26,6 +26,7 @@
#include "qpid/client/ConnectionSettings.h"
#include <map>
#include <vector>
+#include <boost/optional.hpp>
namespace qpid {
namespace types {
@@ -47,6 +48,7 @@ struct ConnectionOptions : qpid::client::ConnectionSettings
std::string identifier;
bool nestAnnotations;
bool setToOnSend;
+ boost::optional<bool> addressPassthrough;
std::map<std::string, qpid::types::Variant> properties;
QPID_MESSAGING_EXTERN ConnectionOptions(const std::map<std::string, qpid::types::Variant>&);
http://git-wip-us.apache.org/repos/asf/qpid-cpp/blob/7ca02702/src/qpid/messaging/amqp/ConnectionContext.cpp
----------------------------------------------------------------------
diff --git a/src/qpid/messaging/amqp/ConnectionContext.cpp b/src/qpid/messaging/amqp/ConnectionContext.cpp
index 25dd68d..ff6a7be 100644
--- a/src/qpid/messaging/amqp/ConnectionContext.cpp
+++ b/src/qpid/messaging/amqp/ConnectionContext.cpp
@@ -434,10 +434,66 @@ void ConnectionContext::attach(boost::shared_ptr<SessionContext> ssn, pn_link_t*
}
}
+namespace {
+const std::string PASSTHROUGH_CAPABILITY("qpid:messaging:address");
+const std::string LINK("link");
+const std::string NAME("name");
+const std::string RELIABILITY("reliability");
+bool copy(const qpid::types::Variant::Map& from, qpid::types::Variant::Map& to, const std::string& key)
+{
+ Variant::Map::const_iterator i = from.find(key);
+ if (i == from.end()) {
+ return false;
+ } else {
+ to[key] = i->second.asString();
+ return true;
+ }
+}
+}
+
+types::Variant::List ConnectionContext::getPeersOfferedCapabilities()
+{
+ qpid::types::Variant::List capabilities;
+ pn_data_t* raw = pn_connection_remote_offered_capabilities(connection);
+ if (raw) {
+ PnData data(raw);
+ data.getList(capabilities);
+ }
+ return capabilities;
+}
+
+bool ConnectionContext::usePassthrough()
+{
+ if (!addressPassthrough) {
+ qpid::types::Variant::List capabilities = getPeersOfferedCapabilities();
+ for (qpid::types::Variant::List::const_iterator i = capabilities.begin(); i != capabilities.end(); i++) {
+ if (i->asString() == PASSTHROUGH_CAPABILITY) {
+ return true;
+ }
+ }
+ return false;
+ } else {
+ return addressPassthrough.get();
+ }
+}
+qpid::messaging::Address ConnectionContext::passthrough(const qpid::messaging::Address& in)
+{
+ qpid::messaging::Address out;
+ out.setName(in.str());
+ qpid::types::Variant::Map::const_iterator i = in.getOptions().find(LINK);
+ if (i != in.getOptions().end()) {
+ qpid::types::Variant::Map linkOptions;
+ if (copy(i->second.asMap(), linkOptions, NAME) || copy(i->second.asMap(), linkOptions, RELIABILITY)) {
+ out.getOptions()[LINK] = linkOptions;
+ }
+ }
+ return out;
+}
+
boost::shared_ptr<SenderContext> ConnectionContext::createSender(boost::shared_ptr<SessionContext> session, const qpid::messaging::Address& address)
{
sys::Monitor::ScopedLock l(lock);
- boost::shared_ptr<SenderContext> sender = session->createSender(address, setToOnSend);
+ boost::shared_ptr<SenderContext> sender = session->createSender(usePassthrough() ? passthrough(address) : address, setToOnSend);
try {
attach(session, sender);
return sender;
@@ -450,7 +506,7 @@ boost::shared_ptr<SenderContext> ConnectionContext::createSender(boost::shared_p
boost::shared_ptr<ReceiverContext> ConnectionContext::createReceiver(boost::shared_ptr<SessionContext> session, const qpid::messaging::Address& address)
{
sys::Monitor::ScopedLock l(lock);
- boost::shared_ptr<ReceiverContext> receiver = session->createReceiver(address);
+ boost::shared_ptr<ReceiverContext> receiver = session->createReceiver(usePassthrough() ? passthrough(address) : address);
try {
attach(session, receiver);
return receiver;
http://git-wip-us.apache.org/repos/asf/qpid-cpp/blob/7ca02702/src/qpid/messaging/amqp/ConnectionContext.h
----------------------------------------------------------------------
diff --git a/src/qpid/messaging/amqp/ConnectionContext.h b/src/qpid/messaging/amqp/ConnectionContext.h
index ba3220c..db673d4 100644
--- a/src/qpid/messaging/amqp/ConnectionContext.h
+++ b/src/qpid/messaging/amqp/ConnectionContext.h
@@ -226,6 +226,10 @@ class ConnectionContext : public qpid::sys::ConnectionCodec, public qpid::messag
const qpid::messaging::Message& message, bool sync,
SenderContext::Delivery** delivery, sys::Monitor::ScopedLock&);
void acknowledgeLH(boost::shared_ptr<SessionContext> ssn, qpid::messaging::Message* message, bool cumulative, sys::Monitor::ScopedLock&);
+
+ types::Variant::List getPeersOfferedCapabilities();
+ bool usePassthrough();
+ qpid::messaging::Address passthrough(const qpid::messaging::Address&);
};
}}} // namespace qpid::messaging::amqp
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org