You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by gs...@apache.org on 2016/04/28 21:01:29 UTC

svn commit: r1741491 - in /qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10: IncomingMessages.cpp IncomingMessages.h ReceiverImpl.cpp ReceiverImpl.h SessionImpl.cpp

Author: gsim
Date: Thu Apr 28 19:01:29 2016
New Revision: 1741491

URL: http://svn.apache.org/viewvc?rev=1741491&view=rev
Log:
QPID-7234: allow proper credit processing to happen even for expired messages

Modified:
    qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp
    qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.h
    qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/ReceiverImpl.cpp
    qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/ReceiverImpl.h
    qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp

Modified: qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp?rev=1741491&r1=1741490&r2=1741491&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp Thu Apr 28 19:01:29 2016
@@ -55,15 +55,17 @@ const std::string EMPTY_STRING;
 struct GetNone : IncomingMessages::Handler
 {
     bool accept(IncomingMessages::MessageTransfer&) { return false; }
+    bool expire(IncomingMessages::MessageTransfer&) { return false; }
 };
 
 struct GetAny : IncomingMessages::Handler
 {
     bool accept(IncomingMessages::MessageTransfer& transfer)
-    { 
+    {
         transfer.retrieve(0);
         return true;
     }
+    bool expire(IncomingMessages::MessageTransfer&) { return false; }
 };
 
 struct MatchAndTrack
@@ -147,7 +149,7 @@ bool IncomingMessages::get(Handler& hand
         for (FrameSetQueue::iterator i = received.begin(); i != received.end();)
         {
             MessageTransfer transfer(*i, *this);
-            if (transfer.checkExpired()) {
+            if (transfer.checkExpired() && handler.expire(transfer)) {
                 i = received.erase(i);
             } else if (handler.accept(transfer)) {
                 received.erase(i);
@@ -282,7 +284,7 @@ IncomingMessages::ProcessState IncomingM
         for (Duration timeout = duration; pop(content, timeout); timeout = Duration(AbsTime::now(), deadline)) {
             if (content->isA<MessageTransferBody>()) {
                 MessageTransfer transfer(content, *this);
-                if (transfer.checkExpired()) {
+                if (transfer.checkExpired() && handler->expire(transfer)) {
                     QPID_LOG(debug, "Expired received transfer: " << *content->getMethod());
                 } else if (handler && handler->accept(transfer)) {
                     QPID_LOG(debug, "Delivered " << *content->getMethod() << " "

Modified: qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.h?rev=1741491&r1=1741490&r2=1741491&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.h Thu Apr 28 19:01:29 2016
@@ -67,6 +67,7 @@ class IncomingMessages
     {
         virtual ~Handler() {}
         virtual bool accept(MessageTransfer& transfer) = 0;
+        virtual bool expire(MessageTransfer& transfer) = 0;
         virtual bool isClosed() { return false; }
     };
 

Modified: qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/ReceiverImpl.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/ReceiverImpl.cpp?rev=1741491&r1=1741490&r2=1741491&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/ReceiverImpl.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/ReceiverImpl.cpp Thu Apr 28 19:01:29 2016
@@ -36,7 +36,7 @@ using qpid::messaging::NoMessageAvailabl
 using qpid::messaging::Receiver;
 using qpid::messaging::Duration;
 
-void ReceiverImpl::received(qpid::messaging::Message&)
+void ReceiverImpl::received()
 {
     //TODO: should this be configurable
     sys::Mutex::ScopedLock l(lock);

Modified: qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/ReceiverImpl.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/ReceiverImpl.h?rev=1741491&r1=1741490&r2=1741491&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/ReceiverImpl.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/ReceiverImpl.h Thu Apr 28 19:01:29 2016
@@ -63,7 +63,7 @@ class ReceiverImpl : public qpid::messag
     uint32_t getCapacity();
     uint32_t getAvailable();
     uint32_t getUnsettled();
-    void received(qpid::messaging::Message& message);
+    void received();
     qpid::messaging::Session getSession() const;
     bool isClosed() const;
     qpid::messaging::Address getAddress() const;

Modified: qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp?rev=1741491&r1=1741490&r2=1741491&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp Thu Apr 28 19:01:29 2016
@@ -330,6 +330,16 @@ struct IncomingMessageHandler : Incoming
         return callback(transfer);
     }
 
+    bool expire(IncomingMessages::MessageTransfer& transfer)
+    {
+        if (receiver && receiver->getName() == transfer.getDestination()) {
+            receiver->received();
+            return true;
+        } else {
+            return false;
+        }
+    }
+
     bool isClosed()
     {
         return receiver && receiver->isClosed();
@@ -358,7 +368,7 @@ bool SessionImpl::accept(ReceiverImpl* r
 {
     if (receiver->getName() == transfer.getDestination()) {
         transfer.retrieve(message);
-        receiver->received(*message);
+        receiver->received();
         return true;
     } else {
         return false;



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org