You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by gs...@apache.org on 2016/06/28 21:32:07 UTC

svn commit: r1750587 - in /qpid/trunk/qpid/cpp/src/qpid/broker: Queue.cpp Queue.h amqp/Outgoing.cpp amqp/Outgoing.h

Author: gsim
Date: Tue Jun 28 21:32:06 2016
New Revision: 1750587

URL: http://svn.apache.org/viewvc?rev=1750587&view=rev
Log:
QPID-7329: Merge branch 'github/pr/10' into trunk

Closes #10

Modified:
    qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp
    qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h
    qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Outgoing.cpp
    qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Outgoing.h

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp?rev=1750587&r1=1750586&r2=1750587&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp Tue Jun 28 21:32:06 2016
@@ -345,6 +345,19 @@ void Queue::process(Message& msg)
     }
 }
 
+void Queue::mergeMessageAnnotations(const QueueCursor& position,
+                                    const qpid::types::Variant::Map& messageAnnotations)
+{
+  Mutex::ScopedLock locker(messageLock);
+  Message *message = messages->find(position);
+  if (!message) return;
+
+  qpid::types::Variant::Map::const_iterator it;
+  for (it = messageAnnotations.begin(); it != messageAnnotations.end(); ++it) {
+    message->addAnnotation(it->first, it->second);
+  }
+}
+
 void Queue::release(const QueueCursor& position, bool markRedelivered)
 {
     QueueListeners::NotificationSet copy;

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h?rev=1750587&r1=1750586&r2=1750587&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h Tue Jun 28 21:32:06 2016
@@ -332,6 +332,13 @@ class Queue : public boost::enable_share
     QPID_BROKER_EXTERN void deliverTo(Message, TxBuffer* = 0);
   public:
     /**
+     * Merges message annotations for an in-memory message as a result of
+     * a modified disposition outcome
+     */
+    QPID_BROKER_EXTERN void mergeMessageAnnotations(const QueueCursor& msg,
+                                                    const qpid::types::Variant::Map& annotations);
+
+    /**
      * Returns a message to the in-memory queue (due to lack
      * of acknowledegement from a receiver). If a consumer is
      * available it will be dispatched immediately, else it

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Outgoing.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Outgoing.cpp?rev=1750587&r1=1750586&r2=1750587&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Outgoing.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Outgoing.cpp Tue Jun 28 21:32:06 2016
@@ -18,6 +18,7 @@
  * under the License.
  *
  */
+#include "qpid/broker/amqp/DataReader.h"
 #include "qpid/broker/amqp/Outgoing.h"
 #include "qpid/broker/amqp/Exception.h"
 #include "qpid/broker/amqp/Header.h"
@@ -108,6 +109,19 @@ void OutgoingFromQueue::write(const char
     pn_link_send(link, data, size);
 }
 
+void OutgoingFromQueue::mergeMessageAnnotationsIfRequired(const Record &r)
+{
+    pn_data_t *remoteAnnotationsRaw =
+      pn_disposition_annotations(pn_delivery_remote(r.delivery));
+    if (remoteAnnotationsRaw == 0) {
+      return;
+    }
+
+    qpid::types::Variant::Map remoteMessageAnnotations;
+    DataReader::read(remoteAnnotationsRaw, remoteMessageAnnotations);
+    queue->mergeMessageAnnotations(r.cursor, remoteMessageAnnotations);
+}
+
 void OutgoingFromQueue::handle(pn_delivery_t* delivery)
 {
     size_t i = Record::getIndex(pn_delivery_tag(delivery));
@@ -141,7 +155,7 @@ void OutgoingFromQueue::handle(pn_delive
                 break;
               case PN_MODIFIED:
                 if (preAcquires()) {
-                    //TODO: handle message-annotations
+                    mergeMessageAnnotationsIfRequired(r);
                     if (pn_disposition_is_undeliverable(pn_delivery_remote(delivery))) {
                         if (!trackingUndeliverableMessages) {
                             // observe queue for changes to track undeliverable messages

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Outgoing.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Outgoing.h?rev=1750587&r1=1750586&r2=1750587&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Outgoing.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Outgoing.h Tue Jun 28 21:32:06 2016
@@ -152,6 +152,8 @@ class OutgoingFromQueue : public Outgoin
         static size_t getIndex(pn_delivery_tag_t);
     };
 
+    void mergeMessageAnnotationsIfRequired(const Record &r);
+
     const bool exclusive;
     const bool isControllingUser;
     boost::shared_ptr<Queue> queue;



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org