You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2009/10/01 22:48:11 UTC
svn commit: r820783 - in /qpid/trunk/qpid/cpp/src/qpid/amqp_0_10:
Connection.cpp Connection.h
Author: aconway
Date: Thu Oct 1 20:48:11 2009
New Revision: 820783
URL: http://svn.apache.org/viewvc?rev=820783&view=rev
Log:
Fix broker race where broker closes connection before sending close-ok.
This was showing up in python/cluster interactions but it looks like
the race could also occur on a stand-alone broker.
Modified:
qpid/trunk/qpid/cpp/src/qpid/amqp_0_10/Connection.cpp
qpid/trunk/qpid/cpp/src/qpid/amqp_0_10/Connection.h
Modified: qpid/trunk/qpid/cpp/src/qpid/amqp_0_10/Connection.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/amqp_0_10/Connection.cpp?rev=820783&r1=820782&r2=820783&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/amqp_0_10/Connection.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/amqp_0_10/Connection.cpp Thu Oct 1 20:48:11 2009
@@ -31,7 +31,7 @@
using sys::Mutex;
Connection::Connection(sys::OutputControl& o, const std::string& id, bool _isClient)
- : frameQueueClosed(false), output(o), identifier(id), initialized(false),
+ : pushClosed(false), popClosed(false), output(o), identifier(id), initialized(false),
isClient(_isClient), buffered(0), version(0,10)
{}
@@ -61,19 +61,23 @@
}
bool Connection::canEncode() {
- if (!frameQueueClosed) connection->doOutput();
Mutex::ScopedLock l(frameQueueLock);
- return (!isClient && !initialized) || !frameQueue.empty();
+ if (!popClosed) {
+ Mutex::ScopedUnlock u(frameQueueLock);
+ connection->doOutput();
+ }
+ return !popClosed && ((!isClient && !initialized) || !frameQueue.empty());
}
bool Connection::isClosed() const {
Mutex::ScopedLock l(frameQueueLock);
- return frameQueueClosed;
+ return pushClosed && popClosed;
}
size_t Connection::encode(const char* buffer, size_t size) {
{ // Swap frameQueue data into workQueue to avoid holding lock while we encode.
Mutex::ScopedLock l(frameQueueLock);
+ if (popClosed) return 0; // Can't pop any more frames.
assert(workQueue.empty());
workQueue.swap(frameQueue);
}
@@ -102,6 +106,8 @@
// Put back any frames we did not encode.
frameQueue.insert(frameQueue.begin(), workQueue.begin(), workQueue.end());
workQueue.clear();
+ if (frameQueue.empty() && pushClosed)
+ popClosed = true;
}
return out.getPosition();
}
@@ -111,9 +117,10 @@
void Connection::giveReadCredit(int32_t credit) { output.giveReadCredit(credit); }
void Connection::close() {
- // Close the output queue.
+ // No more frames can be pushed onto the queue.
+ // Frames aleady on the queue can be popped.
Mutex::ScopedLock l(frameQueueLock);
- frameQueueClosed = true;
+ pushClosed = true;
}
void Connection::closed() {
@@ -123,7 +130,7 @@
void Connection::send(framing::AMQFrame& f) {
{
Mutex::ScopedLock l(frameQueueLock);
- if (!frameQueueClosed)
+ if (!pushClosed)
frameQueue.push_back(f);
buffered += f.encodedSize();
}
Modified: qpid/trunk/qpid/cpp/src/qpid/amqp_0_10/Connection.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/amqp_0_10/Connection.h?rev=820783&r1=820782&r2=820783&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/amqp_0_10/Connection.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/amqp_0_10/Connection.h Thu Oct 1 20:48:11 2009
@@ -47,7 +47,7 @@
FrameQueue frameQueue;
FrameQueue workQueue;
- bool frameQueueClosed;
+ bool pushClosed, popClosed;
mutable sys::Mutex frameQueueLock;
sys::OutputControl& output;
std::auto_ptr<sys::ConnectionInputHandler> connection;
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org