You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by gs...@apache.org on 2016/04/28 21:01:29 UTC
svn commit: r1741491 - in /qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10:
IncomingMessages.cpp IncomingMessages.h ReceiverImpl.cpp ReceiverImpl.h
SessionImpl.cpp
Author: gsim
Date: Thu Apr 28 19:01:29 2016
New Revision: 1741491
URL: http://svn.apache.org/viewvc?rev=1741491&view=rev
Log:
QPID-7234: allow proper credit processing to happen even for expired messages
Modified:
qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp
qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.h
qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/ReceiverImpl.cpp
qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/ReceiverImpl.h
qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp
Modified: qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp?rev=1741491&r1=1741490&r2=1741491&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp Thu Apr 28 19:01:29 2016
@@ -55,15 +55,17 @@ const std::string EMPTY_STRING;
struct GetNone : IncomingMessages::Handler
{
bool accept(IncomingMessages::MessageTransfer&) { return false; }
+ bool expire(IncomingMessages::MessageTransfer&) { return false; }
};
struct GetAny : IncomingMessages::Handler
{
bool accept(IncomingMessages::MessageTransfer& transfer)
- {
+ {
transfer.retrieve(0);
return true;
}
+ bool expire(IncomingMessages::MessageTransfer&) { return false; }
};
struct MatchAndTrack
@@ -147,7 +149,7 @@ bool IncomingMessages::get(Handler& hand
for (FrameSetQueue::iterator i = received.begin(); i != received.end();)
{
MessageTransfer transfer(*i, *this);
- if (transfer.checkExpired()) {
+ if (transfer.checkExpired() && handler.expire(transfer)) {
i = received.erase(i);
} else if (handler.accept(transfer)) {
received.erase(i);
@@ -282,7 +284,7 @@ IncomingMessages::ProcessState IncomingM
for (Duration timeout = duration; pop(content, timeout); timeout = Duration(AbsTime::now(), deadline)) {
if (content->isA<MessageTransferBody>()) {
MessageTransfer transfer(content, *this);
- if (transfer.checkExpired()) {
+ if (transfer.checkExpired() && handler->expire(transfer)) {
QPID_LOG(debug, "Expired received transfer: " << *content->getMethod());
} else if (handler && handler->accept(transfer)) {
QPID_LOG(debug, "Delivered " << *content->getMethod() << " "
Modified: qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.h?rev=1741491&r1=1741490&r2=1741491&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.h Thu Apr 28 19:01:29 2016
@@ -67,6 +67,7 @@ class IncomingMessages
{
virtual ~Handler() {}
virtual bool accept(MessageTransfer& transfer) = 0;
+ virtual bool expire(MessageTransfer& transfer) = 0;
virtual bool isClosed() { return false; }
};
Modified: qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/ReceiverImpl.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/ReceiverImpl.cpp?rev=1741491&r1=1741490&r2=1741491&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/ReceiverImpl.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/ReceiverImpl.cpp Thu Apr 28 19:01:29 2016
@@ -36,7 +36,7 @@ using qpid::messaging::NoMessageAvailabl
using qpid::messaging::Receiver;
using qpid::messaging::Duration;
-void ReceiverImpl::received(qpid::messaging::Message&)
+void ReceiverImpl::received()
{
//TODO: should this be configurable
sys::Mutex::ScopedLock l(lock);
Modified: qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/ReceiverImpl.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/ReceiverImpl.h?rev=1741491&r1=1741490&r2=1741491&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/ReceiverImpl.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/ReceiverImpl.h Thu Apr 28 19:01:29 2016
@@ -63,7 +63,7 @@ class ReceiverImpl : public qpid::messag
uint32_t getCapacity();
uint32_t getAvailable();
uint32_t getUnsettled();
- void received(qpid::messaging::Message& message);
+ void received();
qpid::messaging::Session getSession() const;
bool isClosed() const;
qpid::messaging::Address getAddress() const;
Modified: qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp?rev=1741491&r1=1741490&r2=1741491&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp Thu Apr 28 19:01:29 2016
@@ -330,6 +330,16 @@ struct IncomingMessageHandler : Incoming
return callback(transfer);
}
+ bool expire(IncomingMessages::MessageTransfer& transfer)
+ {
+ if (receiver && receiver->getName() == transfer.getDestination()) {
+ receiver->received();
+ return true;
+ } else {
+ return false;
+ }
+ }
+
bool isClosed()
{
return receiver && receiver->isClosed();
@@ -358,7 +368,7 @@ bool SessionImpl::accept(ReceiverImpl* r
{
if (receiver->getName() == transfer.getDestination()) {
transfer.retrieve(message);
- receiver->received(*message);
+ receiver->received();
return true;
} else {
return false;
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org