You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by gs...@apache.org on 2015/08/29 00:16:27 UTC
svn commit: r1698426 - in /qpid/trunk/qpid: cpp/src/qpid/broker/
cpp/src/qpid/broker/amqp/ cpp/src/qpid/broker/amqp_0_10/
tests/src/py/qpid_tests/broker_1_0/
Author: gsim
Date: Fri Aug 28 22:16:27 2015
New Revision: 1698426
URL: http://svn.apache.org/r1698426
Log:
QPID-6714: support for JMS header names in selectors, plus support for to, replyto and subject
Modified:
qpid/trunk/qpid/cpp/src/qpid/broker/Message.cpp
qpid/trunk/qpid/cpp/src/qpid/broker/Message.h
qpid/trunk/qpid/cpp/src/qpid/broker/Selector.cpp
qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Message.cpp
qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Message.h
qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Translation.cpp
qpid/trunk/qpid/cpp/src/qpid/broker/amqp_0_10/MessageTransfer.cpp
qpid/trunk/qpid/cpp/src/qpid/broker/amqp_0_10/MessageTransfer.h
qpid/trunk/qpid/tests/src/py/qpid_tests/broker_1_0/selector.py
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Message.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Message.cpp?rev=1698426&r1=1698425&r2=1698426&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Message.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Message.cpp Fri Aug 28 22:16:27 2015
@@ -64,6 +64,19 @@ std::string Message::getRoutingKey() con
return getEncoding().getRoutingKey();
}
+std::string Message::getTo() const
+{
+ return getEncoding().getTo();
+}
+std::string Message::getSubject() const
+{
+ return getEncoding().getSubject();
+}
+std::string Message::getReplyTo() const
+{
+ return getEncoding().getReplyTo();
+}
+
bool Message::isPersistent() const
{
return getEncoding().isPersistent();
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Message.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Message.h?rev=1698426&r1=1698425&r2=1698426&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Message.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Message.h Fri Aug 28 22:16:27 2015
@@ -78,6 +78,9 @@ public:
virtual void processProperties(qpid::amqp::MapHandler&) const = 0;
virtual std::string getUserId() const = 0;
virtual uint64_t getTimestamp() const = 0;
+ virtual std::string getTo() const = 0;
+ virtual std::string getSubject() const = 0;
+ virtual std::string getReplyTo() const = 0;
};
class SharedState : public Encoding
@@ -137,6 +140,11 @@ public:
QPID_BROKER_EXTERN uint64_t getTimestamp() const;
+ //required for selectors:
+ QPID_BROKER_EXTERN std::string getTo() const;
+ QPID_BROKER_EXTERN std::string getSubject() const;
+ QPID_BROKER_EXTERN std::string getReplyTo() const;
+
QPID_BROKER_EXTERN void addAnnotation(const std::string& key, const qpid::types::Variant& value);
QPID_BROKER_EXTERN bool isExcluded(const std::vector<std::string>& excludes) const;
QPID_BROKER_EXTERN void addTraceId(const std::string& id);
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Selector.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Selector.cpp?rev=1698426&r1=1698425&r2=1698426&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Selector.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Selector.cpp Fri Aug 28 22:16:27 2015
@@ -30,6 +30,7 @@
#include "qpid/log/Statement.h"
#include "qpid/types/Variant.h"
+#include <map>
#include <stdexcept>
#include <string>
#include <sstream>
@@ -54,6 +55,7 @@ using qpid::amqp::MessageId;
* priority | Priority | priority header section
* delivery_count | | delivery-count header section
* redelivered |[Redelivered] | (delivery_count>0) (computed value)
+ * subject | Type | subject properties section
* correlation_id | CorrelationID| correlation-id properties section
* to |[Destination] | to properties section
* absolute_expiry_time |[Expiration] | absolute-expiry-time properties section
@@ -66,6 +68,26 @@ const string EMPTY;
const string PERSISTENT("PERSISTENT");
const string NON_PERSISTENT("NON_PERSISTENT");
+namespace {
+ typedef std::map<std::string, std::string> Aliases;
+ Aliases define_aliases()
+ {
+ Aliases aliases;
+ aliases["JMSType"] = "subject";
+ aliases["JMSCorrelationID"] = "correlation_id";
+ aliases["JMSMessageID"] = "message_id";
+ aliases["JMSDeliveryMode"] = "delivery_mode";
+ aliases["JMSRedelivered"] = "redelivered";
+ aliases["JMSPriority"] = "priority";
+ aliases["JMSDestination"] = "to";
+ aliases["JMSReplyTo"] = "reply_to";
+ aliases["JMSTimestamp"] = "creation_time";
+ aliases["JMSExpiration"] = "absolute_expiry_time";
+ return aliases;
+ }
+ const Aliases aliases = define_aliases();
+}
+
class MessageSelectorEnv : public SelectorEnv {
const Message& msg;
mutable boost::ptr_vector<string> returnedStrings;
@@ -82,8 +104,7 @@ public:
MessageSelectorEnv::MessageSelectorEnv(const Message& m) :
msg(m),
valuesLookedup(false)
-{
-}
+{}
const Value MessageSelectorEnv::specialValue(const string& id) const
{
@@ -91,6 +112,12 @@ const Value MessageSelectorEnv::specialV
// TODO: Just use a simple if chain for now - improve this later
if ( id=="delivery_mode" ) {
v = msg.getEncoding().isPersistent() ? PERSISTENT : NON_PERSISTENT;
+ } else if ( id=="subject" ) {
+ std::string s = msg.getSubject();
+ if (!s.empty()) {
+ returnedStrings.push_back(new string(s));
+ v = returnedStrings[returnedStrings.size()-1];
+ }
} else if ( id=="redelivered" ) {
// Although redelivered is defined to be true delivery-count>0 if it is 0 now
// it will be 1 by the time the message is delivered
@@ -110,9 +137,17 @@ const Value MessageSelectorEnv::specialV
v = returnedStrings[returnedStrings.size()-1];
}
} else if ( id=="to" ) {
- v = EMPTY; // Hard to get this correct for both 1.0 and 0-10
+ std::string s = msg.getTo();
+ if (!s.empty()) {
+ returnedStrings.push_back(new string(s));
+ v = returnedStrings[returnedStrings.size()-1];
+ }
} else if ( id=="reply_to" ) {
- v = EMPTY; // Hard to get this correct for both 1.0 and 0-10
+ std::string s = msg.getReplyTo();
+ if (!s.empty()) {
+ returnedStrings.push_back(new string(s));
+ v = returnedStrings[returnedStrings.size()-1];
+ }
} else if ( id=="absolute_expiry_time" ) {
qpid::sys::AbsTime expiry = msg.getExpiration();
// Java property has value of 0 for no expiry
@@ -183,6 +218,14 @@ const Value& MessageSelectorEnv::value(c
QPID_LOG(debug, "Selector lookup special identifier: " << identifier);
returnedValues[identifier] = specialValue(identifier.substr(5));
}
+ } else if (identifier.substr(0, 3) == "JMS") {
+ Aliases::const_iterator equivalent = aliases.find(identifier);
+ if (equivalent != aliases.end()) {
+ QPID_LOG(debug, "Selector lookup JMS identifier: " << identifier << " treated as alias for " << equivalent->second);
+ returnedValues[identifier] = specialValue(equivalent->second);
+ } else {
+ QPID_LOG(info, "Unrecognised JMS identifier in selector: " << identifier);
+ }
} else if (!valuesLookedup) {
QPID_LOG(debug, "Selector lookup triggered by: " << identifier);
// Iterate over all the message properties
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Message.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Message.cpp?rev=1698426&r1=1698425&r2=1698426&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Message.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Message.cpp Fri Aug 28 22:16:27 2015
@@ -62,9 +62,8 @@ std::string Message::getUserId() const
uint64_t Message::getTimestamp() const
{
- //AMQP 1.0 message doesn't have the equivalent of the 0-10 timestamp field
- //TODO: define an annotation for that
- return 0;
+ //creation time is in milliseconds, timestamp (from the 0-10 spec) is in seconds
+ return !creationTime ? 0 : creationTime.get()/1000;
}
bool Message::isPersistent() const
@@ -87,6 +86,25 @@ uint8_t Message::getPriority() const
else return priority.get();
}
+std::string Message::getTo() const
+{
+ std::string v;
+ if (to.data) v.assign(to.data, to.size);
+ return v;
+}
+std::string Message::getSubject() const
+{
+ std::string v;
+ if (subject.data) v.assign(subject.data, subject.size);
+ return v;
+}
+std::string Message::getReplyTo() const
+{
+ std::string v;
+ if (replyTo.data) v.assign(replyTo.data, replyTo.size);
+ return v;
+}
+
namespace {
class StringRetriever : public MapHandler
{
@@ -242,7 +260,7 @@ qpid::amqp::MessageId Message::getMessag
{
return messageId;
}
-qpid::amqp::CharSequence Message::getReplyTo() const
+qpid::amqp::CharSequence Message::getReplyToAsCharSequence() const
{
return replyTo;
}
@@ -318,7 +336,7 @@ void Message::onCorrelationId(const qpid
void Message::onContentType(const qpid::amqp::CharSequence& v) { contentType = v; }
void Message::onContentEncoding(const qpid::amqp::CharSequence& v) { contentEncoding = v; }
void Message::onAbsoluteExpiryTime(int64_t) {}
-void Message::onCreationTime(int64_t) {}
+void Message::onCreationTime(int64_t v) { creationTime = v; }
void Message::onGroupId(const qpid::amqp::CharSequence&) {}
void Message::onGroupSequence(uint32_t) {}
void Message::onReplyToGroupId(const qpid::amqp::CharSequence&) {}
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Message.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Message.h?rev=1698426&r1=1698425&r2=1698426&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Message.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Message.h Fri Aug 28 22:16:27 2015
@@ -53,10 +53,13 @@ class Message : public qpid::broker::Mes
void processProperties(qpid::amqp::MapHandler&) const;
std::string getUserId() const;
uint64_t getTimestamp() const;
+ std::string getTo() const;
+ std::string getSubject() const;
+ std::string getReplyTo() const;
qpid::amqp::MessageId getMessageId() const;
qpid::amqp::MessageId getCorrelationId() const;
- qpid::amqp::CharSequence getReplyTo() const;
+ qpid::amqp::CharSequence getReplyToAsCharSequence() const;
qpid::amqp::CharSequence getContentType() const;
qpid::amqp::CharSequence getContentEncoding() const;
@@ -108,6 +111,7 @@ class Message : public qpid::broker::Mes
qpid::amqp::MessageId correlationId;
qpid::amqp::CharSequence contentType;
qpid::amqp::CharSequence contentEncoding;
+ boost::optional<int64_t> creationTime;
//application-properties:
qpid::amqp::CharSequence applicationProperties;
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Translation.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Translation.cpp?rev=1698426&r1=1698425&r2=1698426&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Translation.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Translation.cpp Fri Aug 28 22:16:27 2015
@@ -245,7 +245,7 @@ boost::intrusive_ptr<const qpid::broker:
props->setCorrelationId(boost::lexical_cast<std::string>(cid.value.ulong));
break;
}
- if (message->getReplyTo()) props->setReplyTo(translate(message->getReplyTo(), broker));
+ if (message->getReplyToAsCharSequence()) props->setReplyTo(translate(message->getReplyTo(), broker));
if (message->getContentType()) props->setContentType(translate(message->getContentType()));
if (message->getContentEncoding()) props->setContentEncoding(translate(message->getContentEncoding()));
props->setUserId(message->getUserId());
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/amqp_0_10/MessageTransfer.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/amqp_0_10/MessageTransfer.cpp?rev=1698426&r1=1698425&r2=1698426&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/amqp_0_10/MessageTransfer.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/amqp_0_10/MessageTransfer.cpp Fri Aug 28 22:16:27 2015
@@ -41,8 +41,11 @@ namespace qpid {
namespace broker {
namespace amqp_0_10 {
namespace {
+const std::string DELIMITER("/");
+const std::string EMPTY;
const std::string QMF2("qmf2");
const std::string PARTIAL("partial");
+const std::string SUBJECT_KEY("qpid.subject");
}
MessageTransfer::MessageTransfer() : frames(framing::SequenceNumber()), requiredCredit(0), cachedRequiredCredit(false) {}
MessageTransfer::MessageTransfer(const framing::SequenceNumber& id) : frames(id), requiredCredit(0), cachedRequiredCredit(false) {}
@@ -143,6 +146,41 @@ uint64_t MessageTransfer::getTimestamp()
return props ? props->getTimestamp() : 0;
}
+std::string MessageTransfer::getTo() const
+{
+ const DeliveryProperties* props = getProperties<DeliveryProperties>();
+ if (props) {
+ //if message was sent to 'nameless exchange' then the routing key is the queue
+ return props->getExchange().empty() ? props->getRoutingKey() : props->getExchange();
+ } else {
+ return EMPTY;
+ }
+}
+std::string MessageTransfer::getSubject() const
+{
+ const DeliveryProperties* props = getProperties<DeliveryProperties>();
+ if (props) {
+ //if message was sent to 'nameless exchange' then the routing key is the queue name, not the subject
+ return props->getExchange().empty() ? getPropertyAsString(SUBJECT_KEY) : props->getRoutingKey();
+ } else {
+ return EMPTY;
+ }
+}
+std::string MessageTransfer::getReplyTo() const
+{
+ const MessageProperties* props = getProperties<MessageProperties>();
+ if (props && props->hasReplyTo()) {
+ const qpid::framing::ReplyTo& replyto = props->getReplyTo();
+ if (replyto.hasExchange() && replyto.hasRoutingKey())
+ return replyto.getExchange() + DELIMITER + replyto.getRoutingKey();
+ else if (replyto.hasExchange()) return replyto.getExchange();
+ else if (replyto.hasRoutingKey()) return replyto.getRoutingKey();
+ else return EMPTY;
+ } else {
+ return EMPTY;
+ }
+}
+
bool MessageTransfer::requiresAccept() const
{
const framing::MessageTransferBody* b = getFrames().as<framing::MessageTransferBody>();
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/amqp_0_10/MessageTransfer.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/amqp_0_10/MessageTransfer.h?rev=1698426&r1=1698425&r2=1698426&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/amqp_0_10/MessageTransfer.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/amqp_0_10/MessageTransfer.h Fri Aug 28 22:16:27 2015
@@ -57,6 +57,10 @@ class MessageTransfer : public qpid::bro
std::string getUserId() const;
void setTimestamp();
uint64_t getTimestamp() const;
+ std::string getTo() const;
+ std::string getSubject() const;
+ std::string getReplyTo() const;
+
bool requiresAccept() const;
const qpid::framing::SequenceNumber& getCommandId() const;
Modified: qpid/trunk/qpid/tests/src/py/qpid_tests/broker_1_0/selector.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/tests/src/py/qpid_tests/broker_1_0/selector.py?rev=1698426&r1=1698425&r2=1698426&view=diff
==============================================================================
--- qpid/trunk/qpid/tests/src/py/qpid_tests/broker_1_0/selector.py (original)
+++ qpid/trunk/qpid/tests/src/py/qpid_tests/broker_1_0/selector.py Fri Aug 28 22:16:27 2015
@@ -71,3 +71,25 @@ class SelectorTests (VersionTest):
msg = rcv_4.fetch(0)
assert msg.content == 'd'
self.ssn.acknowledge(msg)
+
+ def check_selected(self,node, selector, expected_content):
+ rcv = self.ssn.receiver("%s; {mode:browse, link:{selector:\"%s\"}}" % (node, selector))
+ msg = rcv.fetch(0)
+ assert msg.content == expected_content, msg
+ rcv.close()
+
+ def test_jms_header_names(self):
+ """
+ The new AMQP 1.0 based JMS client uses these rather than the special names above
+ """
+ msgs = [Message(content=i, id=i, correlation_id=i, subject=i, priority=p+1, reply_to=i, properties={'x-amqp-to':i}) for p, i in enumerate(['a', 'b', 'c', 'd'])]
+
+ snd = self.ssn.sender("#")
+ for m in msgs: snd.send(m)
+
+ self.check_selected(snd.target, "JMSMessageID = 'a'", 'a')
+ self.check_selected(snd.target, "JMSCorrelationID = 'b'", 'b')
+ self.check_selected(snd.target, "JMSPriority = 3", 'c')
+ self.check_selected(snd.target, "JMSDestination = 'a'", 'a')
+ self.check_selected(snd.target, "JMSReplyTo = 'b'", 'b')
+ self.check_selected(snd.target, "JMSType = 'c'", 'c')
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org