You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by gs...@apache.org on 2013/10/24 14:37:47 UTC
svn commit: r1535354 - in /qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10:
IncomingMessages.cpp IncomingMessages.h SessionImpl.cpp
Author: gsim
Date: Thu Oct 24 12:37:47 2013
New Revision: 1535354
URL: http://svn.apache.org/r1535354
Log:
QPID-4265: wakeup fetches on closed receiver
Modified:
qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp
qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.h
qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp
Modified: qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp?rev=1535354&r1=1535353&r2=1535354&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp Thu Oct 24 12:37:47 2013
@@ -148,11 +148,24 @@ bool IncomingMessages::get(Handler& hand
ScopedRelease release(inUse, lock);
sys::Mutex::ScopedUnlock l(lock);
//wait for suitable new message to arrive
- return process(&handler, timeout == qpid::sys::TIME_INFINITE ? qpid::sys::TIME_INFINITE : qpid::sys::Duration(AbsTime::now(), deadline));
+ if (process(&handler, timeout == qpid::sys::TIME_INFINITE ? qpid::sys::TIME_INFINITE : qpid::sys::Duration(AbsTime::now(), deadline))) {
+ return true;
+ }
}
+ if (handler.isClosed()) throw qpid::messaging::ReceiverError("Receiver has been closed");
} while (AbsTime::now() < deadline);
return false;
}
+namespace {
+struct Wakeup : public qpid::types::Exception {};
+}
+
+void IncomingMessages::wakeup()
+{
+ sys::Mutex::ScopedLock l(lock);
+ incoming->close(qpid::sys::ExceptionHolder(new Wakeup()));
+ lock.notifyAll();
+}
bool IncomingMessages::getNextDestination(std::string& destination, qpid::sys::Duration timeout)
{
@@ -222,6 +235,16 @@ void IncomingMessages::releasePending(co
session.messageRelease(match.ids);
}
+bool IncomingMessages::pop(FrameSet::shared_ptr& content, qpid::sys::Duration timeout)
+{
+ try {
+ return incoming->pop(content, timeout);
+ } catch (const Wakeup&) {
+ incoming->open();
+ return false;
+ }
+}
+
/**
* Get a frameset that is accepted by the specified handler from
* session queue, waiting for up to the specified duration and
@@ -234,7 +257,7 @@ bool IncomingMessages::process(Handler*
AbsTime deadline(AbsTime::now(), duration);
FrameSet::shared_ptr content;
try {
- for (Duration timeout = duration; incoming->pop(content, timeout); timeout = Duration(AbsTime::now(), deadline)) {
+ for (Duration timeout = duration; pop(content, timeout); timeout = Duration(AbsTime::now(), deadline)) {
if (content->isA<MessageTransferBody>()) {
MessageTransfer transfer(content, *this);
if (handler && handler->accept(transfer)) {
@@ -261,7 +284,7 @@ bool IncomingMessages::wait(qpid::sys::D
{
AbsTime deadline(AbsTime::now(), duration);
FrameSet::shared_ptr content;
- for (Duration timeout = duration; incoming->pop(content, timeout); timeout = Duration(AbsTime::now(), deadline)) {
+ for (Duration timeout = duration; pop(content, timeout); timeout = Duration(AbsTime::now(), deadline)) {
if (content->isA<MessageTransferBody>()) {
QPID_LOG(debug, "Pushed " << *content->getMethod() << " to received queue");
sys::Mutex::ScopedLock l(lock);
Modified: qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.h?rev=1535354&r1=1535353&r2=1535354&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.h Thu Oct 24 12:37:47 2013
@@ -66,11 +66,13 @@ class IncomingMessages
{
virtual ~Handler() {}
virtual bool accept(MessageTransfer& transfer) = 0;
+ virtual bool isClosed() { return false; }
};
IncomingMessages();
void setSession(qpid::client::AsyncSession session);
bool get(Handler& handler, qpid::sys::Duration timeout);
+ void wakeup();
bool getNextDestination(std::string& destination, qpid::sys::Duration timeout);
void accept();
void accept(qpid::framing::SequenceNumber id, bool cumulative);
@@ -94,6 +96,8 @@ class IncomingMessages
bool process(Handler*, qpid::sys::Duration);
bool wait(qpid::sys::Duration);
+ bool pop(FrameSetPtr&, qpid::sys::Duration);
+
void retrieve(FrameSetPtr, qpid::messaging::Message*);
};
Modified: qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp?rev=1535354&r1=1535353&r2=1535354&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp Thu Oct 24 12:37:47 2013
@@ -276,13 +276,19 @@ struct IncomingMessageHandler : Incoming
{
typedef boost::function1<bool, IncomingMessages::MessageTransfer&> Callback;
Callback callback;
+ ReceiverImpl* receiver;
- IncomingMessageHandler(Callback c) : callback(c) {}
+ IncomingMessageHandler(Callback c) : callback(c), receiver(0) {}
bool accept(IncomingMessages::MessageTransfer& transfer)
{
return callback(transfer);
}
+
+ bool isClosed()
+ {
+ return receiver && receiver->isClosed();
+ }
};
}
@@ -332,6 +338,7 @@ bool SessionImpl::getIncoming(IncomingMe
bool SessionImpl::get(ReceiverImpl& receiver, qpid::messaging::Message& message, qpid::messaging::Duration timeout)
{
IncomingMessageHandler handler(boost::bind(&SessionImpl::accept, this, &receiver, &message, _1));
+ handler.receiver = &receiver;
return getIncoming(handler, timeout);
}
@@ -495,10 +502,13 @@ void SessionImpl::releaseImpl(qpid::mess
void SessionImpl::receiverCancelled(const std::string& name)
{
- ScopedLock l(lock);
- receivers.erase(name);
- session.sync();
- incoming.releasePending(name);
+ {
+ ScopedLock l(lock);
+ receivers.erase(name);
+ session.sync();
+ incoming.releasePending(name);
+ }
+ incoming.wakeup();
}
void SessionImpl::releasePending(const std::string& name)
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org