You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by gs...@apache.org on 2016/06/27 15:56:42 UTC

svn commit: r1750369 - in /qpid/trunk/qpid/cpp/src/qpid/broker/amqp: Outgoing.cpp Outgoing.h

Author: gsim
Date: Mon Jun 27 15:56:42 2016
New Revision: 1750369

URL: http://svn.apache.org/viewvc?rev=1750369&view=rev
Log:
feat(disposition): support undeliverable-here in modified outcomes

Previously, specifying `undeliverable-here` as `true` in a modified
outcome simply resulted in the message being rejected. This patch
adds tracking to the outgoing link management in order to not
redeliver messages to links that indicate that messages are not
deliverable there.

Modified:
    qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Outgoing.cpp
    qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Outgoing.h

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Outgoing.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Outgoing.cpp?rev=1750369&r1=1750368&r2=1750369&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Outgoing.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Outgoing.cpp Mon Jun 27 15:56:42 2016
@@ -69,7 +69,8 @@ OutgoingFromQueue::OutgoingFromQueue(Bro
       buffer(1024)/*used only for header at present*/,
       //for exclusive queues, assume unreliable unless reliable is explicitly requested; otherwise assume reliable unless unreliable requested
       unreliable(exclusive ? !requested_reliable(link) : requested_unreliable(link)),
-      cancelled(false)
+      cancelled(false),
+      trackingUndeliverableMessages(false)
 {
     for (size_t i = 0 ; i < deliveries.capacity(); ++i) {
         deliveries[i].init(i);
@@ -142,11 +143,17 @@ void OutgoingFromQueue::handle(pn_delive
                 if (preAcquires()) {
                     //TODO: handle message-annotations
                     if (pn_disposition_is_undeliverable(pn_delivery_remote(delivery))) {
-                        //treat undeliverable here as rejection
-                        queue->reject(r.cursor);
-                    } else {
-                        queue->release(r.cursor, pn_disposition_is_failed(pn_delivery_remote(delivery)));
+                        if (!trackingUndeliverableMessages) {
+                            // observe queue for changes to track undeliverable messages
+                            queue->getObservers().add(
+                              boost::dynamic_pointer_cast<OutgoingFromQueue>(shared_from_this()));
+                            trackingUndeliverableMessages = true;
+                        }
+
+                        undeliverableMessages.add(r.msg.getSequence());
                     }
+
+                    queue->release(r.cursor, pn_disposition_is_failed(pn_delivery_remote(delivery)));
                 }
                 outgoingMessageRejected();//TODO: not quite true...
                 break;
@@ -168,6 +175,13 @@ bool OutgoingFromQueue::canDeliver()
 void OutgoingFromQueue::detached(bool closed)
 {
     QPID_LOG(debug, "Detaching outgoing link " << getName() << " from " << queue->getName());
+
+    if (trackingUndeliverableMessages) {
+      // stop observation of the queue
+      queue->getObservers().remove(
+        boost::dynamic_pointer_cast<OutgoingFromQueue>(shared_from_this()));
+    }
+
     queue->cancel(shared_from_this());
     //TODO: release in a clearer order?
     for (size_t i = 0 ; i < deliveries.capacity(); ++i) {
@@ -279,6 +293,7 @@ bool match(const std::string& filter, co
 
 bool OutgoingFromQueue::filter(const qpid::broker::Message& m)
 {
+    if (undeliverableMessages.contains(m.getSequence())) return false;
     return (subjectFilter.empty() || subjectFilter == m.getRoutingKey() || match(subjectFilter, m.getRoutingKey()))
            && (!selector || selector->filter(m));
 }
@@ -334,5 +349,11 @@ boost::shared_ptr<Queue> OutgoingFromQue
     else return boost::shared_ptr<Queue>();
 }
 
+void OutgoingFromQueue::dequeued(const qpid::broker::Message &m)
+{
+    if (undeliverableMessages.contains(m.getSequence())) {
+        undeliverableMessages.remove(m.getSequence());
+    }
+}
 
 }}} // namespace qpid::broker::amqp

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Outgoing.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Outgoing.h?rev=1750369&r1=1750368&r2=1750369&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Outgoing.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Outgoing.h Mon Jun 27 15:56:42 2016
@@ -24,6 +24,7 @@
 #include "qpid/broker/amqp/Message.h"
 #include "qpid/broker/amqp/ManagedOutgoingLink.h"
 #include "qpid/broker/Consumer.h"
+#include "qpid/broker/QueueObserver.h"
 
 #include <boost/shared_ptr.hpp>
 #include <boost/scoped_ptr.hpp>
@@ -36,12 +37,19 @@ namespace qpid {
 namespace sys {
 class OutputControl;
 }
+
+namespace framing {
+class SequenceSet;
+}
+
 namespace broker {
 class Broker;
 class Queue;
 class Selector;
+
 namespace amqp {
 class Session;
+
 template <class T>
 class CircularArray
 {
@@ -75,6 +83,7 @@ class Outgoing : public ManagedOutgoingL
      * Called when a delivery is writable
      */
     virtual void handle(pn_delivery_t* delivery) = 0;
+
     void wakeup();
     virtual ~Outgoing() {}
   protected:
@@ -85,7 +94,9 @@ class Outgoing : public ManagedOutgoingL
  * Logic for handling an outgoing link from a queue (even if it is a
  * subscription pseduo-queue created by the broker)
  */
-class OutgoingFromQueue : public Outgoing, public qpid::broker::Consumer, public boost::enable_shared_from_this<OutgoingFromQueue>
+class OutgoingFromQueue : public Outgoing, public qpid::broker::Consumer,
+                          public boost::enable_shared_from_this<OutgoingFromQueue>,
+                          public qpid::broker::QueueObserver
 {
   public:
     OutgoingFromQueue(Broker&, const std::string& source, const std::string& target, boost::shared_ptr<Queue> q, pn_link_t* l, Session&,
@@ -100,7 +111,7 @@ class OutgoingFromQueue : public Outgoin
     bool canDeliver();
     void detached(bool closed);
 
-    //Consumer interface:
+    // Consumer interface:
     bool deliver(const QueueCursor& cursor, const qpid::broker::Message& msg);
     void notify();
     bool accept(const qpid::broker::Message&);
@@ -110,6 +121,14 @@ class OutgoingFromQueue : public Outgoin
     qpid::broker::OwnershipToken* getSession();
     static boost::shared_ptr<Queue> getExclusiveSubscriptionQueue(Outgoing*);
 
+    // QueueObserver interface
+    virtual void enqueued(const qpid::broker::Message&) {};
+    virtual void acquired(const qpid::broker::Message&) {};
+    virtual void requeued(const qpid::broker::Message&) {};
+    virtual void dequeued(const qpid::broker::Message&);
+    virtual void consumerAdded(const qpid::broker::Consumer&) {};
+    virtual void consumerRemoved(const qpid::broker::Consumer&) {};
+
   private:
 
     struct Record
@@ -145,6 +164,9 @@ class OutgoingFromQueue : public Outgoin
     boost::scoped_ptr<Selector> selector;
     bool unreliable;
     bool cancelled;
+
+    bool trackingUndeliverableMessages;
+    qpid::framing::SequenceSet undeliverableMessages;
 };
 }}} // namespace qpid::broker::amqp
 



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org