You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by gs...@apache.org on 2013/04/01 18:08:11 UTC

svn commit: r1463175 - in /qpid/branches/0.22/qpid: ./ cpp/src/ cpp/src/qpid/messaging/amqp/ConnectionContext.cpp cpp/src/qpid/messaging/amqp/ConnectionContext.h cpp/src/qpid/messaging/amqp/SenderHandle.cpp cpp/src/qpid/messaging/amqp/SessionContext.cpp

Author: gsim
Date: Mon Apr  1 16:08:11 2013
New Revision: 1463175

URL: http://svn.apache.org/r1463175
Log:
QPID-4674: Detect asynchronous connection close, session end and link detach. Merged from r1462138.

Modified:
    qpid/branches/0.22/qpid/   (props changed)
    qpid/branches/0.22/qpid/cpp/src/   (props changed)
    qpid/branches/0.22/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp
    qpid/branches/0.22/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.h
    qpid/branches/0.22/qpid/cpp/src/qpid/messaging/amqp/SenderHandle.cpp
    qpid/branches/0.22/qpid/cpp/src/qpid/messaging/amqp/SessionContext.cpp

Propchange: qpid/branches/0.22/qpid/
------------------------------------------------------------------------------
  Merged /qpid/trunk/qpid:r1462138

Propchange: qpid/branches/0.22/qpid/cpp/src/
------------------------------------------------------------------------------
  Merged /qpid/trunk/qpid/cpp/src:r1462138

Modified: qpid/branches/0.22/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/0.22/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp?rev=1463175&r1=1463174&r2=1463175&view=diff
==============================================================================
--- qpid/branches/0.22/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp (original)
+++ qpid/branches/0.22/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp Mon Apr  1 16:08:11 2013
@@ -193,6 +193,7 @@ bool ConnectionContext::fetch(boost::sha
 {
     {
         qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock);
+        checkClosed(ssn, lnk);
         if (!lnk->capacity) {
             pn_link_flow(lnk->receiver, 1);
             wakeupDriver();
@@ -212,7 +213,7 @@ bool ConnectionContext::fetch(boost::sha
             wakeupDriver();
             while (pn_link_credit(lnk->receiver) && !pn_link_queued(lnk->receiver)) {
                 QPID_LOG(debug, "Waiting for message or for credit to be drained: credit=" << pn_link_credit(lnk->receiver) << ", queued=" << pn_link_queued(lnk->receiver));
-                wait();
+                wait(ssn, lnk);
             }
             if (lnk->capacity && pn_link_queued(lnk->receiver) == 0) {
                 pn_link_flow(lnk->receiver, lnk->capacity);
@@ -247,6 +248,7 @@ bool ConnectionContext::get(boost::share
     qpid::sys::AbsTime until(convert(timeout));
     while (true) {
         qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock);
+        checkClosed(ssn, lnk);
         pn_delivery_t* current = pn_link_current((pn_link_t*) lnk->receiver);
         QPID_LOG(debug, "In ConnectionContext::get(), current=" << current);
         if (current) {
@@ -262,7 +264,7 @@ bool ConnectionContext::get(boost::share
             pn_link_advance(lnk->receiver);
             return true;
         } else if (until > qpid::sys::now()) {
-            wait();
+            wait(ssn, lnk);
         } else {
             return false;
         }
@@ -273,6 +275,7 @@ bool ConnectionContext::get(boost::share
 void ConnectionContext::acknowledge(boost::shared_ptr<SessionContext> ssn, qpid::messaging::Message* message, bool cumulative)
 {
     qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock);
+    checkClosed(ssn);
     if (message) {
         ssn->acknowledge(MessageImplAccess::get(*message).getInternalId(), cumulative);
     } else {
@@ -329,19 +332,20 @@ void ConnectionContext::attach(pn_sessio
     }
 }
 
-void ConnectionContext::send(boost::shared_ptr<SenderContext> snd, const qpid::messaging::Message& message, bool sync)
+void ConnectionContext::send(boost::shared_ptr<SessionContext> ssn, boost::shared_ptr<SenderContext> snd, const qpid::messaging::Message& message, bool sync)
 {
     qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock);
+    checkClosed(ssn);
     SenderContext::Delivery* delivery(0);
     while (!(delivery = snd->send(message))) {
         QPID_LOG(debug, "Waiting for capacity...");
-        wait();//wait for capacity
+        wait(ssn, snd);//wait for capacity
     }
     wakeupDriver();
     if (sync) {
         while (!delivery->accepted()) {
             QPID_LOG(debug, "Waiting for confirmation...");
-            wait();//wait until message has been confirmed
+            wait(ssn, snd);//wait until message has been confirmed
         }
     }
 }
@@ -408,15 +412,65 @@ void ConnectionContext::wakeupDriver()
     }
 }
 
+namespace {
+pn_state_t REQUIRES_CLOSE = PN_LOCAL_ACTIVE | PN_REMOTE_CLOSED;
+pn_state_t IS_CLOSED = PN_LOCAL_CLOSED | PN_REMOTE_CLOSED;
+}
+
 void ConnectionContext::wait()
 {
     lock.wait();
     if (state == DISCONNECTED) {
         throw qpid::messaging::TransportFailure("Disconnected");
     }
-    //check for any closed links, sessions or indeed the connection
+    if ((pn_connection_state(connection) & REQUIRES_CLOSE) == REQUIRES_CLOSE) {
+        pn_connection_close(connection);
+        throw qpid::messaging::ConnectionError("Connection closed by peer");
+    }
+}
+void ConnectionContext::wait(boost::shared_ptr<SessionContext> ssn)
+{
+    wait();
+    checkClosed(ssn);
+}
+void ConnectionContext::wait(boost::shared_ptr<SessionContext> ssn, boost::shared_ptr<ReceiverContext> lnk)
+{
+    wait();
+    checkClosed(ssn, lnk);
+}
+void ConnectionContext::wait(boost::shared_ptr<SessionContext> ssn, boost::shared_ptr<SenderContext> lnk)
+{
+    wait();
+    checkClosed(ssn, lnk);
+}
+void ConnectionContext::checkClosed(boost::shared_ptr<SessionContext> ssn)
+{
+    if ((pn_session_state(ssn->session) & REQUIRES_CLOSE) == REQUIRES_CLOSE) {
+        pn_session_close(ssn->session);
+        throw qpid::messaging::SessionError("Session ended by peer");
+    } else if ((pn_session_state(ssn->session) & IS_CLOSED) == IS_CLOSED) {
+        throw qpid::messaging::SessionError("Session has ended");
+    }
 }
 
+void ConnectionContext::checkClosed(boost::shared_ptr<SessionContext> ssn, boost::shared_ptr<ReceiverContext> lnk)
+{
+    checkClosed(ssn, lnk->receiver);
+}
+void ConnectionContext::checkClosed(boost::shared_ptr<SessionContext> ssn, boost::shared_ptr<SenderContext> lnk)
+{
+    checkClosed(ssn, lnk->sender);
+}
+void ConnectionContext::checkClosed(boost::shared_ptr<SessionContext> ssn, pn_link_t* lnk)
+{
+    checkClosed(ssn);
+    if ((pn_link_state(lnk) & REQUIRES_CLOSE) == REQUIRES_CLOSE) {
+        pn_link_close(lnk);
+        throw qpid::messaging::LinkError("Link detached by peer");
+    } else if ((pn_link_state(lnk) & IS_CLOSED) == IS_CLOSED) {
+        throw qpid::messaging::LinkError("Link is not attached");
+    }
+}
 boost::shared_ptr<SessionContext> ConnectionContext::newSession(bool transactional, const std::string& n)
 {
     qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock);

Modified: qpid/branches/0.22/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.h
URL: http://svn.apache.org/viewvc/qpid/branches/0.22/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.h?rev=1463175&r1=1463174&r2=1463175&view=diff
==============================================================================
--- qpid/branches/0.22/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.h (original)
+++ qpid/branches/0.22/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.h Mon Apr  1 16:08:11 2013
@@ -72,7 +72,7 @@ class ConnectionContext : public qpid::s
     void endSession(boost::shared_ptr<SessionContext>);
     void attach(boost::shared_ptr<SessionContext>, boost::shared_ptr<SenderContext>);
     void attach(boost::shared_ptr<SessionContext>, boost::shared_ptr<ReceiverContext>);
-    void send(boost::shared_ptr<SenderContext> ctxt, const qpid::messaging::Message& message, bool sync);
+    void send(boost::shared_ptr<SessionContext>, boost::shared_ptr<SenderContext> ctxt, const qpid::messaging::Message& message, bool sync);
     bool fetch(boost::shared_ptr<SessionContext> ssn, boost::shared_ptr<ReceiverContext> lnk, qpid::messaging::Message& message, qpid::messaging::Duration timeout);
     bool get(boost::shared_ptr<SessionContext> ssn, boost::shared_ptr<ReceiverContext> lnk, qpid::messaging::Message& message, qpid::messaging::Duration timeout);
     void acknowledge(boost::shared_ptr<SessionContext> ssn, qpid::messaging::Message* message, bool cumulative);
@@ -136,6 +136,13 @@ class ConnectionContext : public qpid::s
     CodecSwitch codecSwitch;
 
     void wait();
+    void wait(boost::shared_ptr<SessionContext>);
+    void wait(boost::shared_ptr<SessionContext>, boost::shared_ptr<ReceiverContext>);
+    void wait(boost::shared_ptr<SessionContext>, boost::shared_ptr<SenderContext>);
+    void checkClosed(boost::shared_ptr<SessionContext>);
+    void checkClosed(boost::shared_ptr<SessionContext>, boost::shared_ptr<ReceiverContext>);
+    void checkClosed(boost::shared_ptr<SessionContext>, boost::shared_ptr<SenderContext>);
+    void checkClosed(boost::shared_ptr<SessionContext>, pn_link_t*);
     void wakeupDriver();
     void attach(pn_session_t*, pn_link_t*, int credit=0);
 

Modified: qpid/branches/0.22/qpid/cpp/src/qpid/messaging/amqp/SenderHandle.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/0.22/qpid/cpp/src/qpid/messaging/amqp/SenderHandle.cpp?rev=1463175&r1=1463174&r2=1463175&view=diff
==============================================================================
--- qpid/branches/0.22/qpid/cpp/src/qpid/messaging/amqp/SenderHandle.cpp (original)
+++ qpid/branches/0.22/qpid/cpp/src/qpid/messaging/amqp/SenderHandle.cpp Mon Apr  1 16:08:11 2013
@@ -39,7 +39,7 @@ SenderHandle::SenderHandle(boost::shared
 
 void SenderHandle::send(const Message& message, bool sync)
 {
-    connection->send(sender, message, sync);
+    connection->send(session, sender, message, sync);
 }
 
 void SenderHandle::close()

Modified: qpid/branches/0.22/qpid/cpp/src/qpid/messaging/amqp/SessionContext.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/0.22/qpid/cpp/src/qpid/messaging/amqp/SessionContext.cpp?rev=1463175&r1=1463174&r2=1463175&view=diff
==============================================================================
--- qpid/branches/0.22/qpid/cpp/src/qpid/messaging/amqp/SessionContext.cpp (original)
+++ qpid/branches/0.22/qpid/cpp/src/qpid/messaging/amqp/SessionContext.cpp Mon Apr  1 16:08:11 2013
@@ -156,5 +156,4 @@ bool SessionContext::settled()
     }
     return result;
 }
-
 }}} // namespace qpid::messaging::amqp



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org