You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by as...@apache.org on 2012/03/15 15:24:32 UTC

svn commit: r1301001 - in /qpid/trunk/qpid/cpp: include/qpid/framing/FieldTable.h src/qpid/framing/FieldTable.cpp

Author: astitcher
Date: Thu Mar 15 14:24:32 2012
New Revision: 1301001

URL: http://svn.apache.org/viewvc?rev=1301001&view=rev
Log:
QPID-3893: C++ broker appears to segfault during MultipleTransactedBatchProducerTest
    Added in lock to preserve the previous read/write concurrency
    guarantees of FieldTable in the face of the new caching.

    Fix extra unnecessary decodes

Modified:
    qpid/trunk/qpid/cpp/include/qpid/framing/FieldTable.h
    qpid/trunk/qpid/cpp/src/qpid/framing/FieldTable.cpp

Modified: qpid/trunk/qpid/cpp/include/qpid/framing/FieldTable.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/include/qpid/framing/FieldTable.h?rev=1301001&r1=1301000&r2=1301001&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/include/qpid/framing/FieldTable.h (original)
+++ qpid/trunk/qpid/cpp/include/qpid/framing/FieldTable.h Thu Mar 15 14:24:32 2012
@@ -1,3 +1,6 @@
+#ifndef _FieldTable_
+#define _FieldTable_
+
 /*
  *
  * Licensed to the Apache Software Foundation (ASF) under one
@@ -18,16 +21,17 @@
  * under the License.
  *
  */
-#include <iostream>
-#include <vector>
+
+#include "qpid/framing/amqp_types.h"
+#include "qpid/sys/Mutex.h"
+
 #include <boost/shared_ptr.hpp>
 #include <boost/shared_array.hpp>
+
+#include <iosfwd>
 #include <map>
-#include "qpid/framing/amqp_types.h"
-#include "qpid/CommonImportExport.h"
 
-#ifndef _FieldTable_
-#define _FieldTable_
+#include "qpid/CommonImportExport.h"
 
 namespace qpid {
     /**
@@ -114,11 +118,13 @@ class FieldTable
 
   private:
     void realDecode() const;
-    void flushRawCache() const;
+    void flushRawCache();
 
+    mutable qpid::sys::Mutex lock;
     mutable ValueMap values;
     mutable boost::shared_array<uint8_t> cachedBytes;
     mutable uint32_t cachedSize; // if = 0 then non cached size as 0 is not a legal size
+    mutable bool newBytes;
 
     QPID_COMMON_EXTERN friend std::ostream& operator<<(std::ostream& out, const FieldTable& body);
 };

Modified: qpid/trunk/qpid/cpp/src/qpid/framing/FieldTable.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/framing/FieldTable.cpp?rev=1301001&r1=1301000&r2=1301001&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/framing/FieldTable.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/framing/FieldTable.cpp Thu Mar 15 14:24:32 2012
@@ -28,22 +28,45 @@
 #include "qpid/Msg.h"
 #include <assert.h>
 
+// The locking rationale in the FieldTable seems a little odd, but it
+// maintains the concurrent guarantees and requirements that were in
+// place before the cachedBytes/cachedSize were added:
+//
+// The FieldTable client code needs to make sure that they call no write
+// operation in parallel with any other operation on the FieldTable.
+// However multiple parallel read operations are safe.
+//
+// To this end the only code that is locked is code that can transparently
+// change the state of the FieldTable during a read only operation.
+// (In other words the code that required the mutable members in the class
+// definition!)
+//
 namespace qpid {
+
+using sys::Mutex;
+using sys::ScopedLock;
+
 namespace framing {
 
 FieldTable::FieldTable() :
-    cachedSize(0)
+    cachedSize(0),
+    newBytes(false)
 {
 }
 
 FieldTable::FieldTable(const FieldTable& ft) :
     cachedBytes(ft.cachedBytes),
-    cachedSize(ft.cachedSize)
+    cachedSize(ft.cachedSize),
+    newBytes(ft.newBytes)
 {
     // Only copy the values if we have no raw data
     // - copying the map is expensive and we can
     //   reconstruct it if necessary from the raw data
-    if (!cachedBytes && !ft.values.empty()) values = ft.values;
+    if (cachedBytes) {
+        newBytes = true;
+        return;
+    }
+    if (!ft.values.empty()) values = ft.values;
 }
 
 FieldTable& FieldTable::operator=(const FieldTable& ft)
@@ -52,10 +75,13 @@ FieldTable& FieldTable::operator=(const 
     values.swap(nft.values);
     cachedBytes.swap(nft.cachedBytes);
     cachedSize = nft.cachedSize;
+    newBytes = nft.newBytes;
     return (*this);
 }
 
 uint32_t FieldTable::encodedSize() const {
+    ScopedLock<Mutex> l(lock);
+
     if (cachedSize != 0) {
         return cachedSize;
     }
@@ -238,6 +264,7 @@ void FieldTable::encode(Buffer& buffer) 
             i->second->encode(buffer);
         }
         // Now create raw bytes in case we are used again
+        ScopedLock<Mutex> l(lock);
         cachedSize = buffer.getPosition() - p;
         cachedBytes = boost::shared_array<uint8_t>(new uint8_t[cachedSize]);
         buffer.setPosition(p);
@@ -261,14 +288,17 @@ void FieldTable::decode(Buffer& buffer){
     // Copy data into our buffer
     cachedBytes = boost::shared_array<uint8_t>(new uint8_t[len + 4]);
     cachedSize = len + 4;
+    newBytes = true;
     buffer.setPosition(p);
     buffer.getRawData(&cachedBytes[0], cachedSize);
 }
 
 void FieldTable::realDecode() const
 {
+    ScopedLock<Mutex> l(lock);
+
     // If we've got no raw data stored up then nothing to do
-    if (!cachedBytes)
+    if (!newBytes)
         return;
 
     Buffer buffer((char*)&cachedBytes[0], cachedSize);
@@ -286,10 +316,13 @@ void FieldTable::realDecode() const
             values[name] = ValuePtr(value);
         }
     }
+    newBytes = false;
 }
 
-void FieldTable::flushRawCache() const
+void FieldTable::flushRawCache()
 {
+    // We can only flush the cache if there are no cached bytes to decode
+    assert(newBytes==false);
     // Avoid recreating shared array unless we actually have one.
     if (cachedBytes) cachedBytes.reset();
     cachedSize = 0;
@@ -319,6 +352,7 @@ void FieldTable::erase(const std::string
 void FieldTable::clear()
 {
     values.clear();
+    newBytes = false;
     flushRawCache();
 }
 



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org