You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by gs...@apache.org on 2012/11/28 15:13:12 UTC

svn commit: r1414705 - in /qpid/branches/0.20/qpid/cpp/src/qpid/messaging/amqp: ConnectionContext.cpp SenderContext.cpp SenderContext.h SessionContext.cpp SessionContext.h

Author: gsim
Date: Wed Nov 28 14:13:11 2012
New Revision: 1414705

URL: http://svn.apache.org/viewvc?rev=1414705&view=rev
Log:
QPID-4451: wait for outgoing messages to settle when closing

Modified:
    qpid/branches/0.20/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp
    qpid/branches/0.20/qpid/cpp/src/qpid/messaging/amqp/SenderContext.cpp
    qpid/branches/0.20/qpid/cpp/src/qpid/messaging/amqp/SenderContext.h
    qpid/branches/0.20/qpid/cpp/src/qpid/messaging/amqp/SessionContext.cpp
    qpid/branches/0.20/qpid/cpp/src/qpid/messaging/amqp/SessionContext.h

Modified: qpid/branches/0.20/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/0.20/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp?rev=1414705&r1=1414704&r2=1414705&view=diff
==============================================================================
--- qpid/branches/0.20/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp (original)
+++ qpid/branches/0.20/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp Wed Nov 28 14:13:11 2012
@@ -149,7 +149,14 @@ void ConnectionContext::close()
     qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock);
     if (state != CONNECTED) return;
     if (!(pn_connection_state(connection) & PN_LOCAL_CLOSED)) {
-        for (SessionMap::iterator i = sessions.begin(); i != sessions.end(); ++i){
+        for (SessionMap::iterator i = sessions.begin(); i != sessions.end(); ++i) {
+            //wait for outstanding sends to settle
+            while (!i->second->settled()) {
+                QPID_LOG(debug, "Waiting for sends to settle before closing");
+                wait();//wait until message has been confirmed
+            }
+
+
             if (!(pn_session_state(i->second->session) & PN_LOCAL_CLOSED)) {
                 pn_session_close(i->second->session);
             }

Modified: qpid/branches/0.20/qpid/cpp/src/qpid/messaging/amqp/SenderContext.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/0.20/qpid/cpp/src/qpid/messaging/amqp/SenderContext.cpp?rev=1414705&r1=1414704&r2=1414705&view=diff
==============================================================================
--- qpid/branches/0.20/qpid/cpp/src/qpid/messaging/amqp/SenderContext.cpp (original)
+++ qpid/branches/0.20/qpid/cpp/src/qpid/messaging/amqp/SenderContext.cpp Wed Nov 28 14:13:11 2012
@@ -350,4 +350,10 @@ void SenderContext::configure(pn_terminu
         helper.setNodeProperties(target);
     }
 }
+
+bool SenderContext::settled()
+{
+    return processUnsettled() == 0;
+}
+
 }}} // namespace qpid::messaging::amqp

Modified: qpid/branches/0.20/qpid/cpp/src/qpid/messaging/amqp/SenderContext.h
URL: http://svn.apache.org/viewvc/qpid/branches/0.20/qpid/cpp/src/qpid/messaging/amqp/SenderContext.h?rev=1414705&r1=1414704&r2=1414705&view=diff
==============================================================================
--- qpid/branches/0.20/qpid/cpp/src/qpid/messaging/amqp/SenderContext.h (original)
+++ qpid/branches/0.20/qpid/cpp/src/qpid/messaging/amqp/SenderContext.h Wed Nov 28 14:13:11 2012
@@ -69,6 +69,7 @@ class SenderContext
     const std::string& getTarget() const;
     Delivery* send(const qpid::messaging::Message& message);
     void configure() const;
+    bool settled();
   private:
     friend class ConnectionContext;
     typedef std::deque<Delivery> Deliveries;

Modified: qpid/branches/0.20/qpid/cpp/src/qpid/messaging/amqp/SessionContext.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/0.20/qpid/cpp/src/qpid/messaging/amqp/SessionContext.cpp?rev=1414705&r1=1414704&r2=1414705&view=diff
==============================================================================
--- qpid/branches/0.20/qpid/cpp/src/qpid/messaging/amqp/SessionContext.cpp (original)
+++ qpid/branches/0.20/qpid/cpp/src/qpid/messaging/amqp/SessionContext.cpp Wed Nov 28 14:13:11 2012
@@ -144,4 +144,13 @@ void SessionContext::acknowledge(const q
     }
 }
 
+bool SessionContext::settled()
+{
+    bool result = true;
+    for (SenderMap::iterator i = senders.begin(); i != senders.end(); ++i) {
+        if (!i->second->settled()) result = false;
+    }
+    return result;
+}
+
 }}} // namespace qpid::messaging::amqp

Modified: qpid/branches/0.20/qpid/cpp/src/qpid/messaging/amqp/SessionContext.h
URL: http://svn.apache.org/viewvc/qpid/branches/0.20/qpid/cpp/src/qpid/messaging/amqp/SessionContext.h?rev=1414705&r1=1414704&r2=1414705&view=diff
==============================================================================
--- qpid/branches/0.20/qpid/cpp/src/qpid/messaging/amqp/SessionContext.h (original)
+++ qpid/branches/0.20/qpid/cpp/src/qpid/messaging/amqp/SessionContext.h Wed Nov 28 14:13:11 2012
@@ -59,6 +59,7 @@ class SessionContext
     boost::shared_ptr<ReceiverContext> nextReceiver(qpid::messaging::Duration timeout);
     uint32_t getReceivable();
     uint32_t getUnsettledAcks();
+    bool settled();
   private:
     friend class ConnectionContext;
     typedef std::map<std::string, boost::shared_ptr<SenderContext> > SenderMap;



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org