You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by gs...@apache.org on 2012/11/28 15:13:12 UTC
svn commit: r1414705 - in
/qpid/branches/0.20/qpid/cpp/src/qpid/messaging/amqp: ConnectionContext.cpp
SenderContext.cpp SenderContext.h SessionContext.cpp SessionContext.h
Author: gsim
Date: Wed Nov 28 14:13:11 2012
New Revision: 1414705
URL: http://svn.apache.org/viewvc?rev=1414705&view=rev
Log:
QPID-4451: wait for outgoing messages to settle when closing
Modified:
qpid/branches/0.20/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp
qpid/branches/0.20/qpid/cpp/src/qpid/messaging/amqp/SenderContext.cpp
qpid/branches/0.20/qpid/cpp/src/qpid/messaging/amqp/SenderContext.h
qpid/branches/0.20/qpid/cpp/src/qpid/messaging/amqp/SessionContext.cpp
qpid/branches/0.20/qpid/cpp/src/qpid/messaging/amqp/SessionContext.h
Modified: qpid/branches/0.20/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/0.20/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp?rev=1414705&r1=1414704&r2=1414705&view=diff
==============================================================================
--- qpid/branches/0.20/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp (original)
+++ qpid/branches/0.20/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp Wed Nov 28 14:13:11 2012
@@ -149,7 +149,14 @@ void ConnectionContext::close()
qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock);
if (state != CONNECTED) return;
if (!(pn_connection_state(connection) & PN_LOCAL_CLOSED)) {
- for (SessionMap::iterator i = sessions.begin(); i != sessions.end(); ++i){
+ for (SessionMap::iterator i = sessions.begin(); i != sessions.end(); ++i) {
+ //wait for outstanding sends to settle
+ while (!i->second->settled()) {
+ QPID_LOG(debug, "Waiting for sends to settle before closing");
+ wait();//wait until message has been confirmed
+ }
+
+
if (!(pn_session_state(i->second->session) & PN_LOCAL_CLOSED)) {
pn_session_close(i->second->session);
}
Modified: qpid/branches/0.20/qpid/cpp/src/qpid/messaging/amqp/SenderContext.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/0.20/qpid/cpp/src/qpid/messaging/amqp/SenderContext.cpp?rev=1414705&r1=1414704&r2=1414705&view=diff
==============================================================================
--- qpid/branches/0.20/qpid/cpp/src/qpid/messaging/amqp/SenderContext.cpp (original)
+++ qpid/branches/0.20/qpid/cpp/src/qpid/messaging/amqp/SenderContext.cpp Wed Nov 28 14:13:11 2012
@@ -350,4 +350,10 @@ void SenderContext::configure(pn_terminu
helper.setNodeProperties(target);
}
}
+
+bool SenderContext::settled()
+{
+ return processUnsettled() == 0;
+}
+
}}} // namespace qpid::messaging::amqp
Modified: qpid/branches/0.20/qpid/cpp/src/qpid/messaging/amqp/SenderContext.h
URL: http://svn.apache.org/viewvc/qpid/branches/0.20/qpid/cpp/src/qpid/messaging/amqp/SenderContext.h?rev=1414705&r1=1414704&r2=1414705&view=diff
==============================================================================
--- qpid/branches/0.20/qpid/cpp/src/qpid/messaging/amqp/SenderContext.h (original)
+++ qpid/branches/0.20/qpid/cpp/src/qpid/messaging/amqp/SenderContext.h Wed Nov 28 14:13:11 2012
@@ -69,6 +69,7 @@ class SenderContext
const std::string& getTarget() const;
Delivery* send(const qpid::messaging::Message& message);
void configure() const;
+ bool settled();
private:
friend class ConnectionContext;
typedef std::deque<Delivery> Deliveries;
Modified: qpid/branches/0.20/qpid/cpp/src/qpid/messaging/amqp/SessionContext.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/0.20/qpid/cpp/src/qpid/messaging/amqp/SessionContext.cpp?rev=1414705&r1=1414704&r2=1414705&view=diff
==============================================================================
--- qpid/branches/0.20/qpid/cpp/src/qpid/messaging/amqp/SessionContext.cpp (original)
+++ qpid/branches/0.20/qpid/cpp/src/qpid/messaging/amqp/SessionContext.cpp Wed Nov 28 14:13:11 2012
@@ -144,4 +144,13 @@ void SessionContext::acknowledge(const q
}
}
+bool SessionContext::settled()
+{
+ bool result = true;
+ for (SenderMap::iterator i = senders.begin(); i != senders.end(); ++i) {
+ if (!i->second->settled()) result = false;
+ }
+ return result;
+}
+
}}} // namespace qpid::messaging::amqp
Modified: qpid/branches/0.20/qpid/cpp/src/qpid/messaging/amqp/SessionContext.h
URL: http://svn.apache.org/viewvc/qpid/branches/0.20/qpid/cpp/src/qpid/messaging/amqp/SessionContext.h?rev=1414705&r1=1414704&r2=1414705&view=diff
==============================================================================
--- qpid/branches/0.20/qpid/cpp/src/qpid/messaging/amqp/SessionContext.h (original)
+++ qpid/branches/0.20/qpid/cpp/src/qpid/messaging/amqp/SessionContext.h Wed Nov 28 14:13:11 2012
@@ -59,6 +59,7 @@ class SessionContext
boost::shared_ptr<ReceiverContext> nextReceiver(qpid::messaging::Duration timeout);
uint32_t getReceivable();
uint32_t getUnsettledAcks();
+ bool settled();
private:
friend class ConnectionContext;
typedef std::map<std::string, boost::shared_ptr<SenderContext> > SenderMap;
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org