You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by gs...@apache.org on 2013/10/24 14:37:47 UTC

svn commit: r1535354 - in /qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10: IncomingMessages.cpp IncomingMessages.h SessionImpl.cpp

Author: gsim
Date: Thu Oct 24 12:37:47 2013
New Revision: 1535354

URL: http://svn.apache.org/r1535354
Log:
QPID-4265: wakeup fetches on closed receiver

Modified:
    qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp
    qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.h
    qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp

Modified: qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp?rev=1535354&r1=1535353&r2=1535354&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp Thu Oct 24 12:37:47 2013
@@ -148,11 +148,24 @@ bool IncomingMessages::get(Handler& hand
             ScopedRelease release(inUse, lock);
             sys::Mutex::ScopedUnlock l(lock);
             //wait for suitable new message to arrive
-            return process(&handler, timeout == qpid::sys::TIME_INFINITE ? qpid::sys::TIME_INFINITE : qpid::sys::Duration(AbsTime::now(), deadline));
+            if (process(&handler, timeout == qpid::sys::TIME_INFINITE ? qpid::sys::TIME_INFINITE : qpid::sys::Duration(AbsTime::now(), deadline))) {
+                return true;
+            }
         }
+        if (handler.isClosed()) throw qpid::messaging::ReceiverError("Receiver has been closed");
     } while (AbsTime::now() < deadline);
     return false;
 }
+namespace {
+struct Wakeup : public qpid::types::Exception {};
+}
+
+void IncomingMessages::wakeup()
+{
+    sys::Mutex::ScopedLock l(lock);
+    incoming->close(qpid::sys::ExceptionHolder(new Wakeup()));
+    lock.notifyAll();
+}
 
 bool IncomingMessages::getNextDestination(std::string& destination, qpid::sys::Duration timeout)
 {
@@ -222,6 +235,16 @@ void IncomingMessages::releasePending(co
     session.messageRelease(match.ids);
 }
 
+bool IncomingMessages::pop(FrameSet::shared_ptr& content, qpid::sys::Duration timeout)
+{
+    try {
+        return incoming->pop(content, timeout);
+    } catch (const Wakeup&) {
+        incoming->open();
+        return false;
+    }
+}
+
 /**
  * Get a frameset that is accepted by the specified handler from
  * session queue, waiting for up to the specified duration and
@@ -234,7 +257,7 @@ bool IncomingMessages::process(Handler* 
     AbsTime deadline(AbsTime::now(), duration);
     FrameSet::shared_ptr content;
     try {
-        for (Duration timeout = duration; incoming->pop(content, timeout); timeout = Duration(AbsTime::now(), deadline)) {
+        for (Duration timeout = duration; pop(content, timeout); timeout = Duration(AbsTime::now(), deadline)) {
             if (content->isA<MessageTransferBody>()) {
                 MessageTransfer transfer(content, *this);
                 if (handler && handler->accept(transfer)) {
@@ -261,7 +284,7 @@ bool IncomingMessages::wait(qpid::sys::D
 {
     AbsTime deadline(AbsTime::now(), duration);
     FrameSet::shared_ptr content;
-    for (Duration timeout = duration; incoming->pop(content, timeout); timeout = Duration(AbsTime::now(), deadline)) {
+    for (Duration timeout = duration; pop(content, timeout); timeout = Duration(AbsTime::now(), deadline)) {
         if (content->isA<MessageTransferBody>()) {
             QPID_LOG(debug, "Pushed " << *content->getMethod() << " to received queue");
             sys::Mutex::ScopedLock l(lock);

Modified: qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.h?rev=1535354&r1=1535353&r2=1535354&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.h Thu Oct 24 12:37:47 2013
@@ -66,11 +66,13 @@ class IncomingMessages
     {
         virtual ~Handler() {}
         virtual bool accept(MessageTransfer& transfer) = 0;
+        virtual bool isClosed() { return false; }
     };
 
     IncomingMessages();
     void setSession(qpid::client::AsyncSession session);
     bool get(Handler& handler, qpid::sys::Duration timeout);
+    void wakeup();
     bool getNextDestination(std::string& destination, qpid::sys::Duration timeout);
     void accept();
     void accept(qpid::framing::SequenceNumber id, bool cumulative);
@@ -94,6 +96,8 @@ class IncomingMessages
 
     bool process(Handler*, qpid::sys::Duration);
     bool wait(qpid::sys::Duration);
+    bool pop(FrameSetPtr&, qpid::sys::Duration);
+
     void retrieve(FrameSetPtr, qpid::messaging::Message*);
 
 };

Modified: qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp?rev=1535354&r1=1535353&r2=1535354&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp Thu Oct 24 12:37:47 2013
@@ -276,13 +276,19 @@ struct IncomingMessageHandler : Incoming
 {
     typedef boost::function1<bool, IncomingMessages::MessageTransfer&> Callback;
     Callback callback;
+    ReceiverImpl* receiver;
 
-    IncomingMessageHandler(Callback c) : callback(c) {}
+    IncomingMessageHandler(Callback c) : callback(c), receiver(0) {}
 
     bool accept(IncomingMessages::MessageTransfer& transfer)
     {
         return callback(transfer);
     }
+
+    bool isClosed()
+    {
+        return receiver && receiver->isClosed();
+    }
 };
 
 }
@@ -332,6 +338,7 @@ bool SessionImpl::getIncoming(IncomingMe
 bool SessionImpl::get(ReceiverImpl& receiver, qpid::messaging::Message& message, qpid::messaging::Duration timeout)
 {
     IncomingMessageHandler handler(boost::bind(&SessionImpl::accept, this, &receiver, &message, _1));
+    handler.receiver = &receiver;
     return getIncoming(handler, timeout);
 }
 
@@ -495,10 +502,13 @@ void SessionImpl::releaseImpl(qpid::mess
 
 void SessionImpl::receiverCancelled(const std::string& name)
 {
-    ScopedLock l(lock);
-    receivers.erase(name);
-    session.sync();
-    incoming.releasePending(name);
+    {
+        ScopedLock l(lock);
+        receivers.erase(name);
+        session.sync();
+        incoming.releasePending(name);
+    }
+    incoming.wakeup();
 }
 
 void SessionImpl::releasePending(const std::string& name)



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org