You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2010/05/17 23:27:56 UTC

svn commit: r945383 - /qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp

Author: aconway
Date: Mon May 17 21:27:56 2010
New Revision: 945383

URL: http://svn.apache.org/viewvc?rev=945383&view=rev
Log:
Fix for "Assertion `!mcastSentButNotReceived' failed." in cluster tests.

Modified:
    qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp

Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp?rev=945383&r1=945382&r2=945383&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp Mon May 17 21:27:56 2010
@@ -115,7 +115,7 @@ Connection::Connection(Cluster& c, sys::
         // Local clients are announced to the cluster
         // and initialized when the announce is received.
         QPID_LOG(info, "new client connection " << *this);
-        giveReadCredit(cluster.getSettings().readMax);
+        giveReadCredit(cluster.getSettings().readMax); // Flow control
         cluster.getMulticast().mcastControl(
             ClusterConnectionAnnounceBody(ProtocolVersion(), mgmtId,
                                           connectionCtor.external.ssf,
@@ -158,6 +158,8 @@ void Connection::init() {
     connection->setUserIdCallback ( fn );
 }
 
+// Called when we have consumed a read buffer to give credit to the
+// connection layer to continue reading.
 void Connection::giveReadCredit(int credit) {
     {
         sys::Mutex::ScopedLock l(connectionNegotiationMonitor);
@@ -166,7 +168,6 @@ void Connection::giveReadCredit(int cred
             connectionNegotiationMonitor.notify();
         }
     }
-
     if (cluster.getSettings().readMax && credit) 
         output.giveReadCredit(credit);
 }
@@ -311,20 +312,13 @@ size_t Connection::decode(const char* bu
         Buffer buf(const_cast<char*>(buffer), size);
         while (localDecoder.decode(buf))
             received(localDecoder.getFrame());
+        return buf.getPosition();
     }
     else {                      // Multicast local connections.
         assert(isLocal());
         const char* remainingData = buffer;
         size_t remainingSize = size;
 
-        { // scope for scoped lock.
-            sys::Mutex::ScopedLock l(connectionNegotiationMonitor);
-            if ( inConnectionNegotiation ) {
-                assert(!mcastSentButNotReceived);
-                mcastSentButNotReceived = true;
-            }
-        }
-
         if (expectProtocolHeader) {
             //If this is an outgoing link, we will receive a protocol
             //header which needs to be decoded first
@@ -342,16 +336,28 @@ size_t Connection::decode(const char* bu
                 return 0;
             }
         }
-        cluster.getMulticast().mcastBuffer(remainingData, remainingSize, self);
 
-        { // scope for scoped lock.
+        // During connection negotiation wait for each multicast to be
+        // processed before sending the next, to ensure that the
+        // security layer is activated before we attempt to decode
+        // encrypted frames.
+        { 
+            sys::Mutex::ScopedLock l(connectionNegotiationMonitor);
+            if ( inConnectionNegotiation ) {
+                assert(!mcastSentButNotReceived);
+                mcastSentButNotReceived = true;
+            }
+        }
+        cluster.getMulticast().mcastBuffer(remainingData, remainingSize, self);
+        {
             sys::Mutex::ScopedLock l(connectionNegotiationMonitor);
-            if ( inConnectionNegotiation )
-                while (inConnectionNegotiation && mcastSentButNotReceived)
+            if (inConnectionNegotiation)
+                while (mcastSentButNotReceived)
                     connectionNegotiationMonitor.wait();
+            assert(!mcastSentButNotReceived);
         }
+        return size;
     }
-    return size;
 }
 
 broker::SessionState& Connection::sessionState() {
@@ -621,6 +627,7 @@ void Connection::mcastUserId ( std::stri
   {
       sys::Mutex::ScopedLock l(connectionNegotiationMonitor);
       inConnectionNegotiation = false;
+      mcastSentButNotReceived = false;
       connectionNegotiationMonitor.notify();
   }
 }



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org