You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by gs...@apache.org on 2012/11/15 15:42:23 UTC
svn commit: r1409813 - in /qpid/trunk/qpid/cpp/src/qpid: broker/amqp/
messaging/amqp/
Author: gsim
Date: Thu Nov 15 14:42:22 2012
New Revision: 1409813
URL: http://svn.apache.org/viewvc?rev=1409813&view=rev
Log:
QPID-4368: Add support for subject, translated to a filter (i.e. at present a binding key) by receivers and used as default value for senders
Modified:
qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Session.cpp
qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp
qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/ReceiverContext.cpp
qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/ReceiverContext.h
qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/SenderContext.cpp
qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/SenderContext.h
qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/SessionContext.cpp
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Session.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Session.cpp?rev=1409813&r1=1409812&r2=1409813&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Session.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Session.cpp Thu Nov 15 14:42:22 2012
@@ -22,6 +22,8 @@
#include "Outgoing.h"
#include "Message.h"
#include "ManagedConnection.h"
+#include "qpid/amqp/CharSequence.h"
+#include "qpid/amqp/Descriptor.h"
#include "qpid/broker/AsyncCompletion.h"
#include "qpid/broker/Broker.h"
#include "qpid/broker/DeliverableMessage.h"
@@ -104,8 +106,76 @@ void Session::attach(pn_link_t* link)
QueueSettings settings(false, true);
//TODO: populate settings from source details when available from engine
queue = broker.createQueue(name + qpid::types::Uuid(true).str(), settings, this, "", connection.getUserid(), connection.getId()).first;
- //TODO: bind based on filter when that is exposed by engine
- if (exchange->getType() == FanOutExchange::typeName) {
+ pn_data_t* filter = pn_terminus_filter(source);
+ pn_data_next(filter);
+ if (filter && !pn_data_is_null(filter)) {
+ if (pn_data_type(filter) == PN_MAP) {
+ pn_data_t* echo = pn_terminus_filter(pn_link_source(link));
+ pn_data_put_map(echo);
+ pn_data_enter(echo);
+ size_t count = pn_data_get_map(filter)/2;
+ QPID_LOG(debug, "Got filter map with " << count << " entries");
+ pn_data_enter(filter);
+ for (size_t i = 0; i < count; i++) {
+ pn_bytes_t fname = pn_data_get_symbol(filter);
+ pn_data_next(filter);
+ bool isDescribed = pn_data_is_described(filter);
+ qpid::amqp::Descriptor descriptor(0);
+ if (isDescribed) {
+ pn_data_enter(filter);
+ pn_data_next(filter);
+ //TODO: use or at least verify descriptor
+ if (pn_data_type(filter) == PN_ULONG) {
+ descriptor = qpid::amqp::Descriptor(pn_data_get_ulong(filter));
+ } else if (pn_data_type(filter) == PN_SYMBOL) {
+ pn_bytes_t d = pn_data_get_symbol(filter);
+ qpid::amqp::CharSequence c;
+ c.data = d.start;
+ c.size = d.size;
+ descriptor = qpid::amqp::Descriptor(c);
+ } else {
+ QPID_LOG(notice, "Ignoring filter with descriptor with key " << std::string(fname.start, fname.size) << " and type " << pn_data_type(filter));
+ continue;
+ }
+ QPID_LOG(debug, "Got filter with descriptor " << descriptor);
+ pn_data_next(filter);
+ } else {
+ QPID_LOG(debug, "Got undescribed filter of type " << pn_data_type(filter));
+ }
+ if (pn_data_type(filter) == PN_STRING) {
+ pn_bytes_t value = pn_data_get_string(filter);
+ pn_data_next(filter);
+ exchange->bind(queue, std::string(value.start, value.size), 0);
+ pn_data_put_symbol(echo, fname);
+ if (isDescribed) {
+ pn_data_put_described(echo);
+ pn_data_enter(echo);
+ pn_bytes_t symbol;
+ switch (descriptor.type) {
+ case qpid::amqp::Descriptor::NUMERIC:
+ pn_data_put_ulong(echo, descriptor.value.code);
+ break;
+ case qpid::amqp::Descriptor::SYMBOLIC:
+ symbol.start = const_cast<char*>(descriptor.value.symbol.data);
+ symbol.size = descriptor.value.symbol.size;
+ pn_data_put_symbol(echo, symbol);
+ break;
+ }
+ }
+ pn_data_put_string(echo, value);
+ if (isDescribed) pn_data_exit(echo);
+
+ QPID_LOG(debug, "Binding using filter " << std::string(fname.start, fname.size) << ":" << std::string(value.start, value.size));
+ } else {
+ //TODO: handle headers exchange filters
+ QPID_LOG(warning, "Ignoring unsupported filter type with key " << std::string(fname.start, fname.size) << " and type " << pn_data_type(filter));
+ }
+ }
+ pn_data_exit(echo);
+ } else {
+ QPID_LOG(warning, "Filter should be map, got type: " << pn_data_type(filter));
+ }
+ } else if (exchange->getType() == FanOutExchange::typeName) {
exchange->bind(queue, std::string(), 0);
} else if (exchange->getType() == TopicExchange::typeName) {
exchange->bind(queue, "#", 0);
Modified: qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp?rev=1409813&r1=1409812&r2=1409813&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp Thu Nov 15 14:42:22 2012
@@ -262,10 +262,9 @@ void ConnectionContext::attach(boost::sh
void ConnectionContext::attach(boost::shared_ptr<SessionContext> ssn, boost::shared_ptr<ReceiverContext> lnk)
{
- pn_terminus_t* source = pn_link_source((pn_link_t*) lnk->receiver);
- pn_terminus_set_address(source, lnk->getSource().c_str());
- attach(ssn->session, (pn_link_t*) lnk->receiver, lnk->capacity);
- if (!pn_link_remote_source((pn_link_t*) lnk->receiver)) {
+ lnk->configure();
+ attach(ssn->session, lnk->receiver, lnk->capacity);
+ if (!pn_link_remote_source(lnk->receiver)) {
std::string msg("No such source : ");
msg += lnk->getSource();
throw qpid::messaging::NotFound(msg);
Modified: qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/ReceiverContext.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/ReceiverContext.cpp?rev=1409813&r1=1409812&r2=1409813&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/ReceiverContext.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/ReceiverContext.cpp Thu Nov 15 14:42:22 2012
@@ -29,10 +29,10 @@ namespace qpid {
namespace messaging {
namespace amqp {
//TODO: proper conversion to wide string for address
-ReceiverContext::ReceiverContext(pn_session_t* session, const std::string& n, const std::string& s)
+ReceiverContext::ReceiverContext(pn_session_t* session, const std::string& n, const qpid::messaging::Address& a)
: name(n),
- source(s),
- receiver(pn_receiver(session, source.c_str())),
+ address(a),
+ receiver(pn_receiver(session, name.c_str())),
capacity(0) {}
ReceiverContext::~ReceiverContext()
{
@@ -84,7 +84,36 @@ const std::string& ReceiverContext::getN
const std::string& ReceiverContext::getSource() const
{
- return source;
+ return address.getName();
+}
+namespace {
+pn_bytes_t convert(const std::string& s)
+{
+ pn_bytes_t result;
+ result.start = const_cast<char*>(s.data());
+ result.size = s.size();
+ return result;
+}
+}
+
+void ReceiverContext::configure() const
+{
+ configure(pn_link_source(receiver));
+}
+void ReceiverContext::configure(pn_terminus_t* source) const
+{
+ pn_terminus_set_address(source, address.getName().c_str());
+
+ pn_data_t* filter = pn_terminus_filter(source);
+ pn_data_put_map(filter);
+ pn_data_enter(filter);
+ pn_data_put_symbol(filter, convert("subject"));
+ pn_data_put_described(filter);
+ pn_data_enter(filter);
+ pn_data_put_ulong(filter, 0x0000468C00000001/*LEGACY_TOPIC_FILTER*/);
+ pn_data_put_string(filter, convert(address.getSubject()));
+ pn_data_exit(filter);
+ pn_data_exit(filter);
}
bool ReceiverContext::isClosed() const
Modified: qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/ReceiverContext.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/ReceiverContext.h?rev=1409813&r1=1409812&r2=1409813&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/ReceiverContext.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/ReceiverContext.h Thu Nov 15 14:42:22 2012
@@ -21,11 +21,13 @@
* under the License.
*
*/
+#include "qpid/messaging/Address.h"
#include <string>
#include "qpid/sys/IntegerTypes.h"
struct pn_link_t;
struct pn_session_t;
+struct pn_terminus_t;
namespace qpid {
namespace messaging {
@@ -41,22 +43,25 @@ namespace amqp {
class ReceiverContext
{
public:
- ReceiverContext(pn_session_t* session, const std::string& name, const std::string& source);
+ ReceiverContext(pn_session_t* session, const std::string& name, const qpid::messaging::Address& source);
~ReceiverContext();
void setCapacity(uint32_t);
uint32_t getCapacity();
uint32_t getAvailable();
uint32_t getUnsettled();
+ void attach();
void close();
const std::string& getName() const;
const std::string& getSource() const;
bool isClosed() const;
+ void configure() const;
private:
friend class ConnectionContext;
const std::string name;
- const std::string source;
+ const Address address;
pn_link_t* receiver;
uint32_t capacity;
+ void configure(pn_terminus_t*) const;
};
}}} // namespace qpid::messaging::amqp
Modified: qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/SenderContext.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/SenderContext.cpp?rev=1409813&r1=1409812&r2=1409813&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/SenderContext.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/SenderContext.cpp Thu Nov 15 14:42:22 2012
@@ -36,10 +36,10 @@ namespace qpid {
namespace messaging {
namespace amqp {
//TODO: proper conversion to wide string for address
-SenderContext::SenderContext(pn_session_t* session, const std::string& n, const std::string& t)
+SenderContext::SenderContext(pn_session_t* session, const std::string& n, const qpid::messaging::Address& a)
: name(n),
- target(t),
- sender(pn_sender(session, target.c_str())), capacity(1000) {}
+ address(a),
+ sender(pn_sender(session, n.c_str())), capacity(1000) {}
SenderContext::~SenderContext()
{
@@ -74,7 +74,7 @@ const std::string& SenderContext::getNam
const std::string& SenderContext::getTarget() const
{
- return target;
+ return address.getName();
}
SenderContext::Delivery* SenderContext::send(const qpid::messaging::Message& message)
@@ -82,7 +82,7 @@ SenderContext::Delivery* SenderContext::
if (processUnsettled() < capacity) {
deliveries.push_back(Delivery(nextId++));
Delivery& delivery = deliveries.back();
- delivery.encode(MessageImplAccess::get(message));
+ delivery.encode(MessageImplAccess::get(message), address);
delivery.send(sender);
return &delivery;
} else {
@@ -135,7 +135,7 @@ const std::string EMPTY;
class PropertiesAdapter : public qpid::amqp::MessageEncoder::Properties
{
public:
- PropertiesAdapter(const qpid::messaging::MessageImpl& impl) : msg(impl) {}
+ PropertiesAdapter(const qpid::messaging::MessageImpl& impl, const std::string& s) : msg(impl), subject(s) {}
bool hasMessageId() const
{
return getMessageId().size();
@@ -167,12 +167,12 @@ class PropertiesAdapter : public qpid::a
bool hasSubject() const
{
- return getSubject().size();
+ return subject.size() || getSubject().size();
}
std::string getSubject() const
{
- return msg.getSubject();
+ return subject.size() ? subject : msg.getSubject();
}
bool hasReplyTo() const
@@ -266,16 +266,23 @@ class PropertiesAdapter : public qpid::a
}
private:
const qpid::messaging::MessageImpl& msg;
+ const std::string subject;
};
+
+bool changedSubject(const qpid::messaging::MessageImpl& msg, const qpid::messaging::Address& address)
+{
+ return address.getSubject().size() && address.getSubject() != msg.getSubject();
+}
+
}
SenderContext::Delivery::Delivery(int32_t i) : id(i), token(0) {}
-void SenderContext::Delivery::encode(const qpid::messaging::MessageImpl& msg)
+void SenderContext::Delivery::encode(const qpid::messaging::MessageImpl& msg, const qpid::messaging::Address& address)
{
boost::shared_ptr<const EncodedMessage> original = msg.getEncoded();
- if (original) { //still have the content as received, send at least the bare message unaltered
+ if (original && !changedSubject(msg, address)) { //still have the content as received, send at least the bare message unaltered
//do we need to alter the header? are durable, priority, ttl, first-acquirer, delivery-count different from what was received?
if (original->hasHeaderChanged(msg)) {
//since as yet have no annotations, just write the revised header then the rest of the message as received
@@ -293,7 +300,7 @@ void SenderContext::Delivery::encode(con
}
} else {
HeaderAdapter header(msg);
- PropertiesAdapter properties(msg);
+ PropertiesAdapter properties(msg, address.getSubject());
//compute size:
encoded.resize(qpid::amqp::MessageEncoder::getEncodedSize(header, properties, msg.getHeaders(), msg.getBytes()));
QPID_LOG(debug, "Sending message, buffer is " << encoded.getSize() << " bytes")
Modified: qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/SenderContext.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/SenderContext.h?rev=1409813&r1=1409812&r2=1409813&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/SenderContext.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/SenderContext.h Thu Nov 15 14:42:22 2012
@@ -25,6 +25,7 @@
#include <string>
#include <vector>
#include "qpid/sys/IntegerTypes.h"
+#include "qpid/messaging/Address.h"
#include "qpid/messaging/amqp/EncodedMessage.h"
struct pn_delivery_t;
@@ -48,7 +49,7 @@ class SenderContext
{
public:
Delivery(int32_t id);
- void encode(const qpid::messaging::MessageImpl& message);
+ void encode(const qpid::messaging::MessageImpl& message, const qpid::messaging::Address&);
void send(pn_link_t*);
bool accepted();
private:
@@ -57,7 +58,7 @@ class SenderContext
EncodedMessage encoded;
};
- SenderContext(pn_session_t* session, const std::string& name, const std::string& target);
+ SenderContext(pn_session_t* session, const std::string& name, const qpid::messaging::Address& target);
~SenderContext();
void close();
void setCapacity(uint32_t);
@@ -71,7 +72,7 @@ class SenderContext
typedef std::deque<Delivery> Deliveries;
const std::string name;
- const std::string target;
+ const qpid::messaging::Address address;
pn_link_t* sender;
int32_t nextId;
Deliveries deliveries;
Modified: qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/SessionContext.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/SessionContext.cpp?rev=1409813&r1=1409812&r2=1409813&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/SessionContext.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/SessionContext.cpp Thu Nov 15 14:42:22 2012
@@ -49,7 +49,7 @@ boost::shared_ptr<SenderContext> Session
for (SenderMap::const_iterator i = senders.find(name); i != senders.end(); i = senders.find(name)) {
name = (boost::format("%1%_%2%") % address.getName() % ++count).str();
}
- boost::shared_ptr<SenderContext> s(new SenderContext(session, name, address.str()));
+ boost::shared_ptr<SenderContext> s(new SenderContext(session, name, address));
senders[name] = s;
return s;
}
@@ -62,7 +62,7 @@ boost::shared_ptr<ReceiverContext> Sessi
for (ReceiverMap::const_iterator i = receivers.find(name); i != receivers.end(); i = receivers.find(name)) {
name = (boost::format("%1%_%2%") % address.getName() % ++count).str();
}
- boost::shared_ptr<ReceiverContext> r(new ReceiverContext(session, name, address.str()));
+ boost::shared_ptr<ReceiverContext> r(new ReceiverContext(session, name, address));
receivers[name] = r;
return r;
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org