You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by gs...@apache.org on 2013/04/01 18:08:11 UTC
svn commit: r1463175 - in /qpid/branches/0.22/qpid: ./ cpp/src/
cpp/src/qpid/messaging/amqp/ConnectionContext.cpp
cpp/src/qpid/messaging/amqp/ConnectionContext.h
cpp/src/qpid/messaging/amqp/SenderHandle.cpp
cpp/src/qpid/messaging/amqp/SessionContext.cpp
Author: gsim
Date: Mon Apr 1 16:08:11 2013
New Revision: 1463175
URL: http://svn.apache.org/r1463175
Log:
QPID-4674: Detect asynchronous connection close, session end and link detach. Merged from r1462138.
Modified:
qpid/branches/0.22/qpid/ (props changed)
qpid/branches/0.22/qpid/cpp/src/ (props changed)
qpid/branches/0.22/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp
qpid/branches/0.22/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.h
qpid/branches/0.22/qpid/cpp/src/qpid/messaging/amqp/SenderHandle.cpp
qpid/branches/0.22/qpid/cpp/src/qpid/messaging/amqp/SessionContext.cpp
Propchange: qpid/branches/0.22/qpid/
------------------------------------------------------------------------------
Merged /qpid/trunk/qpid:r1462138
Propchange: qpid/branches/0.22/qpid/cpp/src/
------------------------------------------------------------------------------
Merged /qpid/trunk/qpid/cpp/src:r1462138
Modified: qpid/branches/0.22/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/0.22/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp?rev=1463175&r1=1463174&r2=1463175&view=diff
==============================================================================
--- qpid/branches/0.22/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp (original)
+++ qpid/branches/0.22/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp Mon Apr 1 16:08:11 2013
@@ -193,6 +193,7 @@ bool ConnectionContext::fetch(boost::sha
{
{
qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock);
+ checkClosed(ssn, lnk);
if (!lnk->capacity) {
pn_link_flow(lnk->receiver, 1);
wakeupDriver();
@@ -212,7 +213,7 @@ bool ConnectionContext::fetch(boost::sha
wakeupDriver();
while (pn_link_credit(lnk->receiver) && !pn_link_queued(lnk->receiver)) {
QPID_LOG(debug, "Waiting for message or for credit to be drained: credit=" << pn_link_credit(lnk->receiver) << ", queued=" << pn_link_queued(lnk->receiver));
- wait();
+ wait(ssn, lnk);
}
if (lnk->capacity && pn_link_queued(lnk->receiver) == 0) {
pn_link_flow(lnk->receiver, lnk->capacity);
@@ -247,6 +248,7 @@ bool ConnectionContext::get(boost::share
qpid::sys::AbsTime until(convert(timeout));
while (true) {
qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock);
+ checkClosed(ssn, lnk);
pn_delivery_t* current = pn_link_current((pn_link_t*) lnk->receiver);
QPID_LOG(debug, "In ConnectionContext::get(), current=" << current);
if (current) {
@@ -262,7 +264,7 @@ bool ConnectionContext::get(boost::share
pn_link_advance(lnk->receiver);
return true;
} else if (until > qpid::sys::now()) {
- wait();
+ wait(ssn, lnk);
} else {
return false;
}
@@ -273,6 +275,7 @@ bool ConnectionContext::get(boost::share
void ConnectionContext::acknowledge(boost::shared_ptr<SessionContext> ssn, qpid::messaging::Message* message, bool cumulative)
{
qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock);
+ checkClosed(ssn);
if (message) {
ssn->acknowledge(MessageImplAccess::get(*message).getInternalId(), cumulative);
} else {
@@ -329,19 +332,20 @@ void ConnectionContext::attach(pn_sessio
}
}
-void ConnectionContext::send(boost::shared_ptr<SenderContext> snd, const qpid::messaging::Message& message, bool sync)
+void ConnectionContext::send(boost::shared_ptr<SessionContext> ssn, boost::shared_ptr<SenderContext> snd, const qpid::messaging::Message& message, bool sync)
{
qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock);
+ checkClosed(ssn);
SenderContext::Delivery* delivery(0);
while (!(delivery = snd->send(message))) {
QPID_LOG(debug, "Waiting for capacity...");
- wait();//wait for capacity
+ wait(ssn, snd);//wait for capacity
}
wakeupDriver();
if (sync) {
while (!delivery->accepted()) {
QPID_LOG(debug, "Waiting for confirmation...");
- wait();//wait until message has been confirmed
+ wait(ssn, snd);//wait until message has been confirmed
}
}
}
@@ -408,15 +412,65 @@ void ConnectionContext::wakeupDriver()
}
}
+namespace {
+pn_state_t REQUIRES_CLOSE = PN_LOCAL_ACTIVE | PN_REMOTE_CLOSED;
+pn_state_t IS_CLOSED = PN_LOCAL_CLOSED | PN_REMOTE_CLOSED;
+}
+
void ConnectionContext::wait()
{
lock.wait();
if (state == DISCONNECTED) {
throw qpid::messaging::TransportFailure("Disconnected");
}
- //check for any closed links, sessions or indeed the connection
+ if ((pn_connection_state(connection) & REQUIRES_CLOSE) == REQUIRES_CLOSE) {
+ pn_connection_close(connection);
+ throw qpid::messaging::ConnectionError("Connection closed by peer");
+ }
+}
+void ConnectionContext::wait(boost::shared_ptr<SessionContext> ssn)
+{
+ wait();
+ checkClosed(ssn);
+}
+void ConnectionContext::wait(boost::shared_ptr<SessionContext> ssn, boost::shared_ptr<ReceiverContext> lnk)
+{
+ wait();
+ checkClosed(ssn, lnk);
+}
+void ConnectionContext::wait(boost::shared_ptr<SessionContext> ssn, boost::shared_ptr<SenderContext> lnk)
+{
+ wait();
+ checkClosed(ssn, lnk);
+}
+void ConnectionContext::checkClosed(boost::shared_ptr<SessionContext> ssn)
+{
+ if ((pn_session_state(ssn->session) & REQUIRES_CLOSE) == REQUIRES_CLOSE) {
+ pn_session_close(ssn->session);
+ throw qpid::messaging::SessionError("Session ended by peer");
+ } else if ((pn_session_state(ssn->session) & IS_CLOSED) == IS_CLOSED) {
+ throw qpid::messaging::SessionError("Session has ended");
+ }
}
+void ConnectionContext::checkClosed(boost::shared_ptr<SessionContext> ssn, boost::shared_ptr<ReceiverContext> lnk)
+{
+ checkClosed(ssn, lnk->receiver);
+}
+void ConnectionContext::checkClosed(boost::shared_ptr<SessionContext> ssn, boost::shared_ptr<SenderContext> lnk)
+{
+ checkClosed(ssn, lnk->sender);
+}
+void ConnectionContext::checkClosed(boost::shared_ptr<SessionContext> ssn, pn_link_t* lnk)
+{
+ checkClosed(ssn);
+ if ((pn_link_state(lnk) & REQUIRES_CLOSE) == REQUIRES_CLOSE) {
+ pn_link_close(lnk);
+ throw qpid::messaging::LinkError("Link detached by peer");
+ } else if ((pn_link_state(lnk) & IS_CLOSED) == IS_CLOSED) {
+ throw qpid::messaging::LinkError("Link is not attached");
+ }
+}
boost::shared_ptr<SessionContext> ConnectionContext::newSession(bool transactional, const std::string& n)
{
qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock);
Modified: qpid/branches/0.22/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.h
URL: http://svn.apache.org/viewvc/qpid/branches/0.22/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.h?rev=1463175&r1=1463174&r2=1463175&view=diff
==============================================================================
--- qpid/branches/0.22/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.h (original)
+++ qpid/branches/0.22/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.h Mon Apr 1 16:08:11 2013
@@ -72,7 +72,7 @@ class ConnectionContext : public qpid::s
void endSession(boost::shared_ptr<SessionContext>);
void attach(boost::shared_ptr<SessionContext>, boost::shared_ptr<SenderContext>);
void attach(boost::shared_ptr<SessionContext>, boost::shared_ptr<ReceiverContext>);
- void send(boost::shared_ptr<SenderContext> ctxt, const qpid::messaging::Message& message, bool sync);
+ void send(boost::shared_ptr<SessionContext>, boost::shared_ptr<SenderContext> ctxt, const qpid::messaging::Message& message, bool sync);
bool fetch(boost::shared_ptr<SessionContext> ssn, boost::shared_ptr<ReceiverContext> lnk, qpid::messaging::Message& message, qpid::messaging::Duration timeout);
bool get(boost::shared_ptr<SessionContext> ssn, boost::shared_ptr<ReceiverContext> lnk, qpid::messaging::Message& message, qpid::messaging::Duration timeout);
void acknowledge(boost::shared_ptr<SessionContext> ssn, qpid::messaging::Message* message, bool cumulative);
@@ -136,6 +136,13 @@ class ConnectionContext : public qpid::s
CodecSwitch codecSwitch;
void wait();
+ void wait(boost::shared_ptr<SessionContext>);
+ void wait(boost::shared_ptr<SessionContext>, boost::shared_ptr<ReceiverContext>);
+ void wait(boost::shared_ptr<SessionContext>, boost::shared_ptr<SenderContext>);
+ void checkClosed(boost::shared_ptr<SessionContext>);
+ void checkClosed(boost::shared_ptr<SessionContext>, boost::shared_ptr<ReceiverContext>);
+ void checkClosed(boost::shared_ptr<SessionContext>, boost::shared_ptr<SenderContext>);
+ void checkClosed(boost::shared_ptr<SessionContext>, pn_link_t*);
void wakeupDriver();
void attach(pn_session_t*, pn_link_t*, int credit=0);
Modified: qpid/branches/0.22/qpid/cpp/src/qpid/messaging/amqp/SenderHandle.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/0.22/qpid/cpp/src/qpid/messaging/amqp/SenderHandle.cpp?rev=1463175&r1=1463174&r2=1463175&view=diff
==============================================================================
--- qpid/branches/0.22/qpid/cpp/src/qpid/messaging/amqp/SenderHandle.cpp (original)
+++ qpid/branches/0.22/qpid/cpp/src/qpid/messaging/amqp/SenderHandle.cpp Mon Apr 1 16:08:11 2013
@@ -39,7 +39,7 @@ SenderHandle::SenderHandle(boost::shared
void SenderHandle::send(const Message& message, bool sync)
{
- connection->send(sender, message, sync);
+ connection->send(session, sender, message, sync);
}
void SenderHandle::close()
Modified: qpid/branches/0.22/qpid/cpp/src/qpid/messaging/amqp/SessionContext.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/0.22/qpid/cpp/src/qpid/messaging/amqp/SessionContext.cpp?rev=1463175&r1=1463174&r2=1463175&view=diff
==============================================================================
--- qpid/branches/0.22/qpid/cpp/src/qpid/messaging/amqp/SessionContext.cpp (original)
+++ qpid/branches/0.22/qpid/cpp/src/qpid/messaging/amqp/SessionContext.cpp Mon Apr 1 16:08:11 2013
@@ -156,5 +156,4 @@ bool SessionContext::settled()
}
return result;
}
-
}}} // namespace qpid::messaging::amqp
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org