You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2009/10/01 22:48:11 UTC

svn commit: r820783 - in /qpid/trunk/qpid/cpp/src/qpid/amqp_0_10: Connection.cpp Connection.h

Author: aconway
Date: Thu Oct  1 20:48:11 2009
New Revision: 820783

URL: http://svn.apache.org/viewvc?rev=820783&view=rev
Log:
Fix broker race where broker closes connection before sending close-ok.

This was showing up in python/cluster interactions but it looks like
the race could also occur on a stand-alone broker.

Modified:
    qpid/trunk/qpid/cpp/src/qpid/amqp_0_10/Connection.cpp
    qpid/trunk/qpid/cpp/src/qpid/amqp_0_10/Connection.h

Modified: qpid/trunk/qpid/cpp/src/qpid/amqp_0_10/Connection.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/amqp_0_10/Connection.cpp?rev=820783&r1=820782&r2=820783&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/amqp_0_10/Connection.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/amqp_0_10/Connection.cpp Thu Oct  1 20:48:11 2009
@@ -31,7 +31,7 @@
 using sys::Mutex;
 
 Connection::Connection(sys::OutputControl& o, const std::string& id, bool _isClient)
-    : frameQueueClosed(false), output(o), identifier(id), initialized(false),
+    : pushClosed(false), popClosed(false), output(o), identifier(id), initialized(false),
       isClient(_isClient), buffered(0), version(0,10)
 {}
 
@@ -61,19 +61,23 @@
 }
 
 bool Connection::canEncode() {
-    if (!frameQueueClosed) connection->doOutput();
     Mutex::ScopedLock l(frameQueueLock);
-    return (!isClient && !initialized) || !frameQueue.empty();
+    if (!popClosed)  {
+        Mutex::ScopedUnlock u(frameQueueLock);
+        connection->doOutput();
+    }
+    return !popClosed && ((!isClient && !initialized) || !frameQueue.empty());
 }
 
 bool Connection::isClosed() const {
     Mutex::ScopedLock l(frameQueueLock);
-    return frameQueueClosed;
+    return pushClosed && popClosed;
 }
 
 size_t  Connection::encode(const char* buffer, size_t size) {
     {   // Swap frameQueue data into workQueue to avoid holding lock while we encode.
         Mutex::ScopedLock l(frameQueueLock);
+        if (popClosed) return 0; // Can't pop any more frames.
         assert(workQueue.empty());
         workQueue.swap(frameQueue);
     }
@@ -102,6 +106,8 @@
         // Put back any frames we did not encode.
         frameQueue.insert(frameQueue.begin(), workQueue.begin(), workQueue.end());
         workQueue.clear();
+        if (frameQueue.empty() && pushClosed)
+            popClosed = true;
     }
     return out.getPosition();
 }
@@ -111,9 +117,10 @@
 void Connection::giveReadCredit(int32_t credit) { output.giveReadCredit(credit); }
 
 void  Connection::close() {
-    // Close the output queue.
+    // No more frames can be pushed onto the queue.
+    // Frames aleady on the queue can be popped.
     Mutex::ScopedLock l(frameQueueLock);
-    frameQueueClosed = true;
+    pushClosed = true;
 }
 
 void  Connection::closed() {
@@ -123,7 +130,7 @@
 void Connection::send(framing::AMQFrame& f) {
     {
         Mutex::ScopedLock l(frameQueueLock);
-	if (!frameQueueClosed)
+	if (!pushClosed)
             frameQueue.push_back(f);
         buffered += f.encodedSize();
     }

Modified: qpid/trunk/qpid/cpp/src/qpid/amqp_0_10/Connection.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/amqp_0_10/Connection.h?rev=820783&r1=820782&r2=820783&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/amqp_0_10/Connection.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/amqp_0_10/Connection.h Thu Oct  1 20:48:11 2009
@@ -47,7 +47,7 @@
 
     FrameQueue frameQueue;
     FrameQueue workQueue;
-    bool frameQueueClosed;
+    bool pushClosed, popClosed;
     mutable sys::Mutex frameQueueLock;
     sys::OutputControl& output;
     std::auto_ptr<sys::ConnectionInputHandler> connection;



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org