You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by gs...@apache.org on 2015/08/29 00:16:27 UTC

svn commit: r1698426 - in /qpid/trunk/qpid: cpp/src/qpid/broker/ cpp/src/qpid/broker/amqp/ cpp/src/qpid/broker/amqp_0_10/ tests/src/py/qpid_tests/broker_1_0/

Author: gsim
Date: Fri Aug 28 22:16:27 2015
New Revision: 1698426

URL: http://svn.apache.org/r1698426
Log:
QPID-6714: support for JMS header names in selectors, plus support for to, replyto and subject

Modified:
    qpid/trunk/qpid/cpp/src/qpid/broker/Message.cpp
    qpid/trunk/qpid/cpp/src/qpid/broker/Message.h
    qpid/trunk/qpid/cpp/src/qpid/broker/Selector.cpp
    qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Message.cpp
    qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Message.h
    qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Translation.cpp
    qpid/trunk/qpid/cpp/src/qpid/broker/amqp_0_10/MessageTransfer.cpp
    qpid/trunk/qpid/cpp/src/qpid/broker/amqp_0_10/MessageTransfer.h
    qpid/trunk/qpid/tests/src/py/qpid_tests/broker_1_0/selector.py

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Message.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Message.cpp?rev=1698426&r1=1698425&r2=1698426&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Message.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Message.cpp Fri Aug 28 22:16:27 2015
@@ -64,6 +64,19 @@ std::string Message::getRoutingKey() con
     return getEncoding().getRoutingKey();
 }
 
+std::string Message::getTo() const
+{
+    return getEncoding().getTo();
+}
+std::string Message::getSubject() const
+{
+    return getEncoding().getSubject();
+}
+std::string Message::getReplyTo() const
+{
+    return getEncoding().getReplyTo();
+}
+
 bool Message::isPersistent() const
 {
     return getEncoding().isPersistent();

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Message.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Message.h?rev=1698426&r1=1698425&r2=1698426&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Message.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Message.h Fri Aug 28 22:16:27 2015
@@ -78,6 +78,9 @@ public:
         virtual void processProperties(qpid::amqp::MapHandler&) const = 0;
         virtual std::string getUserId() const = 0;
         virtual uint64_t getTimestamp() const = 0;
+        virtual std::string getTo() const = 0;
+        virtual std::string getSubject() const = 0;
+        virtual std::string getReplyTo() const = 0;
     };
 
     class SharedState : public Encoding
@@ -137,6 +140,11 @@ public:
 
     QPID_BROKER_EXTERN uint64_t getTimestamp() const;
 
+    //required for selectors:
+    QPID_BROKER_EXTERN std::string getTo() const;
+    QPID_BROKER_EXTERN std::string getSubject() const;
+    QPID_BROKER_EXTERN std::string getReplyTo() const;
+
     QPID_BROKER_EXTERN void addAnnotation(const std::string& key, const qpid::types::Variant& value);
     QPID_BROKER_EXTERN bool isExcluded(const std::vector<std::string>& excludes) const;
     QPID_BROKER_EXTERN void addTraceId(const std::string& id);

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Selector.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Selector.cpp?rev=1698426&r1=1698425&r2=1698426&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Selector.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Selector.cpp Fri Aug 28 22:16:27 2015
@@ -30,6 +30,7 @@
 #include "qpid/log/Statement.h"
 #include "qpid/types/Variant.h"
 
+#include <map>
 #include <stdexcept>
 #include <string>
 #include <sstream>
@@ -54,6 +55,7 @@ using qpid::amqp::MessageId;
  * priority             | Priority     | priority             header section
  * delivery_count       |              | delivery-count       header section
  * redelivered          |[Redelivered] | (delivery_count>0)  (computed value)
+ * subject              | Type         | subject              properties section
  * correlation_id       | CorrelationID| correlation-id       properties section
  * to                   |[Destination] | to                   properties section
  * absolute_expiry_time |[Expiration]  | absolute-expiry-time properties section
@@ -66,6 +68,26 @@ const string EMPTY;
 const string PERSISTENT("PERSISTENT");
 const string NON_PERSISTENT("NON_PERSISTENT");
 
+namespace {
+   typedef std::map<std::string, std::string> Aliases;
+   Aliases define_aliases()
+   {
+       Aliases aliases;
+       aliases["JMSType"] = "subject";
+       aliases["JMSCorrelationID"] = "correlation_id";
+       aliases["JMSMessageID"] = "message_id";
+       aliases["JMSDeliveryMode"] = "delivery_mode";
+       aliases["JMSRedelivered"] = "redelivered";
+       aliases["JMSPriority"] = "priority";
+       aliases["JMSDestination"] = "to";
+       aliases["JMSReplyTo"] = "reply_to";
+       aliases["JMSTimestamp"] = "creation_time";
+       aliases["JMSExpiration"] = "absolute_expiry_time";
+       return aliases;
+   }
+   const Aliases aliases = define_aliases();
+}
+
 class MessageSelectorEnv : public SelectorEnv {
     const Message& msg;
     mutable boost::ptr_vector<string> returnedStrings;
@@ -82,8 +104,7 @@ public:
 MessageSelectorEnv::MessageSelectorEnv(const Message& m) :
     msg(m),
     valuesLookedup(false)
-{
-}
+{}
 
 const Value MessageSelectorEnv::specialValue(const string& id) const
 {
@@ -91,6 +112,12 @@ const Value MessageSelectorEnv::specialV
     // TODO: Just use a simple if chain for now - improve this later
     if ( id=="delivery_mode" ) {
         v = msg.getEncoding().isPersistent() ? PERSISTENT : NON_PERSISTENT;
+    } else if ( id=="subject" ) {
+        std::string s = msg.getSubject();
+        if (!s.empty()) {
+            returnedStrings.push_back(new string(s));
+            v = returnedStrings[returnedStrings.size()-1];
+        }
     } else if ( id=="redelivered" ) {
         // Although redelivered is defined to be true delivery-count>0 if it is 0 now
         // it will be 1 by the time the message is delivered
@@ -110,9 +137,17 @@ const Value MessageSelectorEnv::specialV
             v = returnedStrings[returnedStrings.size()-1];
         }
     } else if ( id=="to" ) {
-        v = EMPTY; // Hard to get this correct for both 1.0 and 0-10
+        std::string s = msg.getTo();
+        if (!s.empty()) {
+            returnedStrings.push_back(new string(s));
+            v = returnedStrings[returnedStrings.size()-1];
+        }
     } else if ( id=="reply_to" ) {
-        v = EMPTY; // Hard to get this correct for both 1.0 and 0-10
+        std::string s = msg.getReplyTo();
+        if (!s.empty()) {
+            returnedStrings.push_back(new string(s));
+            v = returnedStrings[returnedStrings.size()-1];
+        }
     } else if ( id=="absolute_expiry_time" ) {
         qpid::sys::AbsTime expiry = msg.getExpiration();
         // Java property has value of 0 for no expiry
@@ -183,6 +218,14 @@ const Value& MessageSelectorEnv::value(c
             QPID_LOG(debug, "Selector lookup special identifier: " << identifier);
             returnedValues[identifier] = specialValue(identifier.substr(5));
         }
+    } else if (identifier.substr(0, 3) == "JMS") {
+        Aliases::const_iterator equivalent = aliases.find(identifier);
+        if (equivalent != aliases.end()) {
+            QPID_LOG(debug, "Selector lookup JMS identifier: " << identifier << " treated as alias for " << equivalent->second);
+            returnedValues[identifier] = specialValue(equivalent->second);
+        } else {
+            QPID_LOG(info, "Unrecognised JMS identifier in selector: " << identifier);
+        }
     } else if (!valuesLookedup) {
         QPID_LOG(debug, "Selector lookup triggered by: " << identifier);
         // Iterate over all the message properties

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Message.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Message.cpp?rev=1698426&r1=1698425&r2=1698426&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Message.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Message.cpp Fri Aug 28 22:16:27 2015
@@ -62,9 +62,8 @@ std::string Message::getUserId() const
 
 uint64_t Message::getTimestamp() const
 {
-    //AMQP 1.0 message doesn't have the equivalent of the 0-10 timestamp field
-    //TODO: define an annotation for that
-    return 0;
+    //creation time is in milliseconds, timestamp (from the 0-10 spec) is in seconds
+    return !creationTime ? 0 : creationTime.get()/1000;
 }
 
 bool Message::isPersistent() const
@@ -87,6 +86,25 @@ uint8_t Message::getPriority() const
     else return priority.get();
 }
 
+std::string Message::getTo() const
+{
+    std::string v;
+    if (to.data) v.assign(to.data, to.size);
+    return v;
+}
+std::string Message::getSubject() const
+{
+    std::string v;
+    if (subject.data) v.assign(subject.data, subject.size);
+    return v;
+}
+std::string Message::getReplyTo() const
+{
+    std::string v;
+    if (replyTo.data) v.assign(replyTo.data, replyTo.size);
+    return v;
+}
+
 namespace {
 class StringRetriever : public MapHandler
 {
@@ -242,7 +260,7 @@ qpid::amqp::MessageId Message::getMessag
 {
     return messageId;
 }
-qpid::amqp::CharSequence Message::getReplyTo() const
+qpid::amqp::CharSequence Message::getReplyToAsCharSequence() const
 {
     return replyTo;
 }
@@ -318,7 +336,7 @@ void Message::onCorrelationId(const qpid
 void Message::onContentType(const qpid::amqp::CharSequence& v) { contentType = v; }
 void Message::onContentEncoding(const qpid::amqp::CharSequence& v) { contentEncoding = v; }
 void Message::onAbsoluteExpiryTime(int64_t) {}
-void Message::onCreationTime(int64_t) {}
+void Message::onCreationTime(int64_t v) { creationTime = v; }
 void Message::onGroupId(const qpid::amqp::CharSequence&) {}
 void Message::onGroupSequence(uint32_t) {}
 void Message::onReplyToGroupId(const qpid::amqp::CharSequence&) {}

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Message.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Message.h?rev=1698426&r1=1698425&r2=1698426&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Message.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Message.h Fri Aug 28 22:16:27 2015
@@ -53,10 +53,13 @@ class Message : public qpid::broker::Mes
     void processProperties(qpid::amqp::MapHandler&) const;
     std::string getUserId() const;
     uint64_t getTimestamp() const;
+    std::string getTo() const;
+    std::string getSubject() const;
+    std::string getReplyTo() const;
     qpid::amqp::MessageId getMessageId() const;
     qpid::amqp::MessageId getCorrelationId() const;
 
-    qpid::amqp::CharSequence getReplyTo() const;
+    qpid::amqp::CharSequence getReplyToAsCharSequence() const;
     qpid::amqp::CharSequence getContentType() const;
     qpid::amqp::CharSequence getContentEncoding() const;
 
@@ -108,6 +111,7 @@ class Message : public qpid::broker::Mes
     qpid::amqp::MessageId correlationId;
     qpid::amqp::CharSequence contentType;
     qpid::amqp::CharSequence contentEncoding;
+    boost::optional<int64_t> creationTime;
 
     //application-properties:
     qpid::amqp::CharSequence applicationProperties;

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Translation.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Translation.cpp?rev=1698426&r1=1698425&r2=1698426&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Translation.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Translation.cpp Fri Aug 28 22:16:27 2015
@@ -245,7 +245,7 @@ boost::intrusive_ptr<const qpid::broker:
                 props->setCorrelationId(boost::lexical_cast<std::string>(cid.value.ulong));
                 break;
             }
-            if (message->getReplyTo()) props->setReplyTo(translate(message->getReplyTo(), broker));
+            if (message->getReplyToAsCharSequence()) props->setReplyTo(translate(message->getReplyTo(), broker));
             if (message->getContentType()) props->setContentType(translate(message->getContentType()));
             if (message->getContentEncoding()) props->setContentEncoding(translate(message->getContentEncoding()));
             props->setUserId(message->getUserId());

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/amqp_0_10/MessageTransfer.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/amqp_0_10/MessageTransfer.cpp?rev=1698426&r1=1698425&r2=1698426&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/amqp_0_10/MessageTransfer.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/amqp_0_10/MessageTransfer.cpp Fri Aug 28 22:16:27 2015
@@ -41,8 +41,11 @@ namespace qpid {
 namespace broker {
 namespace amqp_0_10 {
 namespace {
+const std::string DELIMITER("/");
+const std::string EMPTY;
 const std::string QMF2("qmf2");
 const std::string PARTIAL("partial");
+const std::string SUBJECT_KEY("qpid.subject");
 }
 MessageTransfer::MessageTransfer() : frames(framing::SequenceNumber()), requiredCredit(0), cachedRequiredCredit(false) {}
 MessageTransfer::MessageTransfer(const framing::SequenceNumber& id) : frames(id), requiredCredit(0), cachedRequiredCredit(false) {}
@@ -143,6 +146,41 @@ uint64_t MessageTransfer::getTimestamp()
     return props ? props->getTimestamp() : 0;
 }
 
+std::string MessageTransfer::getTo() const
+{
+    const DeliveryProperties* props = getProperties<DeliveryProperties>();
+    if (props) {
+        //if message was sent to 'nameless exchange' then the routing key is the queue
+        return props->getExchange().empty() ? props->getRoutingKey() : props->getExchange();
+    } else {
+        return EMPTY;
+    }
+}
+std::string MessageTransfer::getSubject() const
+{
+    const DeliveryProperties* props = getProperties<DeliveryProperties>();
+    if (props) {
+        //if message was sent to 'nameless exchange' then the routing key is the queue name, not the subject
+        return props->getExchange().empty() ? getPropertyAsString(SUBJECT_KEY) : props->getRoutingKey();
+    } else {
+        return EMPTY;
+    }
+}
+std::string MessageTransfer::getReplyTo() const
+{
+    const MessageProperties* props = getProperties<MessageProperties>();
+    if (props && props->hasReplyTo()) {
+        const qpid::framing::ReplyTo& replyto = props->getReplyTo();
+        if (replyto.hasExchange() && replyto.hasRoutingKey())
+            return replyto.getExchange() + DELIMITER + replyto.getRoutingKey();
+        else if (replyto.hasExchange()) return replyto.getExchange();
+        else if (replyto.hasRoutingKey()) return replyto.getRoutingKey();
+        else return EMPTY;
+    } else {
+        return EMPTY;
+    }
+}
+
 bool MessageTransfer::requiresAccept() const
 {
     const framing::MessageTransferBody* b = getFrames().as<framing::MessageTransferBody>();

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/amqp_0_10/MessageTransfer.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/amqp_0_10/MessageTransfer.h?rev=1698426&r1=1698425&r2=1698426&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/amqp_0_10/MessageTransfer.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/amqp_0_10/MessageTransfer.h Fri Aug 28 22:16:27 2015
@@ -57,6 +57,10 @@ class MessageTransfer : public qpid::bro
     std::string getUserId() const;
     void setTimestamp();
     uint64_t getTimestamp() const;
+    std::string getTo() const;
+    std::string getSubject() const;
+    std::string getReplyTo() const;
+
 
     bool requiresAccept() const;
     const qpid::framing::SequenceNumber& getCommandId() const;

Modified: qpid/trunk/qpid/tests/src/py/qpid_tests/broker_1_0/selector.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/tests/src/py/qpid_tests/broker_1_0/selector.py?rev=1698426&r1=1698425&r2=1698426&view=diff
==============================================================================
--- qpid/trunk/qpid/tests/src/py/qpid_tests/broker_1_0/selector.py (original)
+++ qpid/trunk/qpid/tests/src/py/qpid_tests/broker_1_0/selector.py Fri Aug 28 22:16:27 2015
@@ -71,3 +71,25 @@ class SelectorTests (VersionTest):
         msg = rcv_4.fetch(0)
         assert msg.content == 'd'
         self.ssn.acknowledge(msg)
+
+    def check_selected(self,node, selector, expected_content):
+        rcv = self.ssn.receiver("%s; {mode:browse, link:{selector:\"%s\"}}" % (node, selector))
+        msg = rcv.fetch(0)
+        assert msg.content == expected_content, msg
+        rcv.close()
+
+    def test_jms_header_names(self):
+        """
+        The new AMQP 1.0 based JMS client uses these rather than the special names above
+        """
+        msgs = [Message(content=i, id=i, correlation_id=i, subject=i, priority=p+1, reply_to=i, properties={'x-amqp-to':i}) for p, i in enumerate(['a', 'b', 'c', 'd'])]
+
+        snd = self.ssn.sender("#")
+        for m in msgs: snd.send(m)
+
+        self.check_selected(snd.target, "JMSMessageID = 'a'", 'a')
+        self.check_selected(snd.target, "JMSCorrelationID = 'b'", 'b')
+        self.check_selected(snd.target, "JMSPriority = 3", 'c')
+        self.check_selected(snd.target, "JMSDestination = 'a'", 'a')
+        self.check_selected(snd.target, "JMSReplyTo = 'b'", 'b')
+        self.check_selected(snd.target, "JMSType = 'c'", 'c')



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org