You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2010/05/17 23:27:56 UTC
svn commit: r945383 - /qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp
Author: aconway
Date: Mon May 17 21:27:56 2010
New Revision: 945383
URL: http://svn.apache.org/viewvc?rev=945383&view=rev
Log:
Fix for "Assertion `!mcastSentButNotReceived' failed." in cluster tests.
Modified:
qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp
Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp?rev=945383&r1=945382&r2=945383&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp Mon May 17 21:27:56 2010
@@ -115,7 +115,7 @@ Connection::Connection(Cluster& c, sys::
// Local clients are announced to the cluster
// and initialized when the announce is received.
QPID_LOG(info, "new client connection " << *this);
- giveReadCredit(cluster.getSettings().readMax);
+ giveReadCredit(cluster.getSettings().readMax); // Flow control
cluster.getMulticast().mcastControl(
ClusterConnectionAnnounceBody(ProtocolVersion(), mgmtId,
connectionCtor.external.ssf,
@@ -158,6 +158,8 @@ void Connection::init() {
connection->setUserIdCallback ( fn );
}
+// Called when we have consumed a read buffer to give credit to the
+// connection layer to continue reading.
void Connection::giveReadCredit(int credit) {
{
sys::Mutex::ScopedLock l(connectionNegotiationMonitor);
@@ -166,7 +168,6 @@ void Connection::giveReadCredit(int cred
connectionNegotiationMonitor.notify();
}
}
-
if (cluster.getSettings().readMax && credit)
output.giveReadCredit(credit);
}
@@ -311,20 +312,13 @@ size_t Connection::decode(const char* bu
Buffer buf(const_cast<char*>(buffer), size);
while (localDecoder.decode(buf))
received(localDecoder.getFrame());
+ return buf.getPosition();
}
else { // Multicast local connections.
assert(isLocal());
const char* remainingData = buffer;
size_t remainingSize = size;
- { // scope for scoped lock.
- sys::Mutex::ScopedLock l(connectionNegotiationMonitor);
- if ( inConnectionNegotiation ) {
- assert(!mcastSentButNotReceived);
- mcastSentButNotReceived = true;
- }
- }
-
if (expectProtocolHeader) {
//If this is an outgoing link, we will receive a protocol
//header which needs to be decoded first
@@ -342,16 +336,28 @@ size_t Connection::decode(const char* bu
return 0;
}
}
- cluster.getMulticast().mcastBuffer(remainingData, remainingSize, self);
- { // scope for scoped lock.
+ // During connection negotiation wait for each multicast to be
+ // processed before sending the next, to ensure that the
+ // security layer is activated before we attempt to decode
+ // encrypted frames.
+ {
+ sys::Mutex::ScopedLock l(connectionNegotiationMonitor);
+ if ( inConnectionNegotiation ) {
+ assert(!mcastSentButNotReceived);
+ mcastSentButNotReceived = true;
+ }
+ }
+ cluster.getMulticast().mcastBuffer(remainingData, remainingSize, self);
+ {
sys::Mutex::ScopedLock l(connectionNegotiationMonitor);
- if ( inConnectionNegotiation )
- while (inConnectionNegotiation && mcastSentButNotReceived)
+ if (inConnectionNegotiation)
+ while (mcastSentButNotReceived)
connectionNegotiationMonitor.wait();
+ assert(!mcastSentButNotReceived);
}
+ return size;
}
- return size;
}
broker::SessionState& Connection::sessionState() {
@@ -621,6 +627,7 @@ void Connection::mcastUserId ( std::stri
{
sys::Mutex::ScopedLock l(connectionNegotiationMonitor);
inConnectionNegotiation = false;
+ mcastSentButNotReceived = false;
connectionNegotiationMonitor.notify();
}
}
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org