You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kw...@apache.org on 2014/01/23 12:01:08 UTC

svn commit: r1560634 [2/7] - in /qpid/branches/java-broker-bdb-ha: ./ qpid/ qpid/bin/ qpid/cpp/ qpid/cpp/bindings/qmf2/examples/cpp/ qpid/cpp/bindings/qpid/dotnet/src/ qpid/cpp/bindings/qpid/dotnet/src/msvc10/ qpid/cpp/bindings/qpid/dotnet/src/msvc9/ q...

Modified: qpid/branches/java-broker-bdb-ha/qpid/cpp/src/linearstore.cmake
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/cpp/src/linearstore.cmake?rev=1560634&r1=1560633&r2=1560634&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/cpp/src/linearstore.cmake (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/cpp/src/linearstore.cmake Thu Jan 23 11:01:02 2014
@@ -30,39 +30,22 @@ else (DEFINED linearstore_force)
         #
         include (finddb.cmake)
         if (DB_FOUND)
-	        #
-	        # find libaio
-	        #
-	        CHECK_LIBRARY_EXISTS (aio io_queue_init "" HAVE_AIO)
-	        CHECK_INCLUDE_FILES (libaio.h HAVE_AIO_H)
-	        if (HAVE_AIO AND HAVE_AIO_H)
-	            #
-		        # find libuuid
-		        #
-  	            CHECK_LIBRARY_EXISTS (uuid uuid_compare "" HAVE_UUID)
-		        CHECK_INCLUDE_FILES(uuid/uuid.h HAVE_UUID_H)
-		        if (HAVE_UUID AND HAVE_UUID_H)
-		            #
-		            # allow linearstore to be built
-		            #
-                    message(STATUS "BerkeleyDB for C++, libaio and uuid found, Linearstore support enabled")
-		            set (linearstore_default ON)
-		        else (HAVE_UUID AND HAVE_UUID_H)
-                    if (NOT HAVE_UUID)
-                        message(STATUS "Linearstore requires uuid which is absent.")
-                    endif (NOT HAVE_UUID)
-                    if (NOT HAVE_UUID_H)
-                        message(STATUS "Linearstore requires uuid.h which is absent.")
-                    endif (NOT HAVE_UUID_H)
-		        endif (HAVE_UUID AND HAVE_UUID_H)
-	        else (HAVE_AIO AND HAVE_AIO_H)
+            #
+            # find libaio
+            #
+            CHECK_LIBRARY_EXISTS (aio io_queue_init "" HAVE_AIO)
+            CHECK_INCLUDE_FILES (libaio.h HAVE_AIO_H)
+            if (HAVE_AIO AND HAVE_AIO_H)
+                message(STATUS "BerkeleyDB for C++ and libaio found, Linearstore support enabled")
+                set (linearstore_default ON)
+            else (HAVE_AIO AND HAVE_AIO_H)
                 if (NOT HAVE_AIO)
                     message(STATUS "Linearstore requires libaio which is absent.")
                 endif (NOT HAVE_AIO)
                 if (NOT HAVE_AIO_H)
                     message(STATUS "Linearstore requires libaio.h which is absent.")
                 endif (NOT HAVE_AIO_H)
-	        endif (HAVE_AIO AND HAVE_AIO_H)
+            endif (HAVE_AIO AND HAVE_AIO_H)
         else (DB_FOUND)
             message(STATUS "Linearstore requires BerkeleyDB for C++ which is absent.")
         endif (DB_FOUND)
@@ -84,12 +67,6 @@ if (BUILD_LINEARSTORE)
     if (NOT HAVE_AIO_H)
         message(FATAL_ERROR "Linearstore requires libaio.h which is absent.")
     endif (NOT HAVE_AIO_H)
-    if (NOT HAVE_UUID)
-        message(FATAL_ERROR "Linearstore requires uuid which is absent.")
-    endif (NOT HAVE_UUID)
-    if (NOT HAVE_UUID_H)
-        message(FATAL_ERROR "Linearstore requires uuid.h which is absent.")
-    endif (NOT HAVE_UUID_H)
 
     # Journal source files
     set (linear_jrnl_SOURCES
@@ -105,8 +82,8 @@ if (BUILD_LINEARSTORE)
         qpid/linearstore/journal/jdir.cpp
         qpid/linearstore/journal/jerrno.cpp
         qpid/linearstore/journal/jexception.cpp
-		qpid/linearstore/journal/JournalFile.cpp
-		qpid/linearstore/journal/JournalLog.cpp
+        qpid/linearstore/journal/JournalFile.cpp
+        qpid/linearstore/journal/JournalLog.cpp
         qpid/linearstore/journal/LinearFileController.cpp
         qpid/linearstore/journal/pmgr.cpp
         qpid/linearstore/journal/RecoveryManager.cpp

Modified: qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/amqp/descriptors.h
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/amqp/descriptors.h?rev=1560634&r1=1560633&r2=1560634&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/amqp/descriptors.h (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/amqp/descriptors.h Thu Jan 23 11:01:02 2014
@@ -109,6 +109,7 @@ const std::string NOT_FOUND("amqp:not-fo
 const std::string UNAUTHORIZED_ACCESS("amqp:unauthorized-access");
 const std::string DECODE_ERROR("amqp:decode-error");
 const std::string NOT_ALLOWED("amqp:not-allowed");
+const std::string NOT_IMPLEMENTED("amqp:not-implemented");
 const std::string RESOURCE_LIMIT_EXCEEDED("amqp:resource-limit-exceeded");
 const std::string RESOURCE_DELETED("amqp:resource-deleted");
 const std::string PRECONDITION_FAILED("amqp:precondition-failed");

Propchange: qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/broker/
------------------------------------------------------------------------------
  Merged /qpid/trunk/qpid/cpp/src/qpid/broker:r1558037-1560619

Modified: qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/broker/Exchange.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/broker/Exchange.cpp?rev=1560634&r1=1560633&r2=1560634&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/broker/Exchange.cpp (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/broker/Exchange.cpp Thu Jan 23 11:01:02 2014
@@ -476,7 +476,7 @@ void Exchange::destroy()
         deletionListeners.swap(copy);
     }
     for (std::map<std::string, boost::function0<void> >::iterator i = copy.begin(); i != copy.end(); ++i) {
-        QPID_LOG(notice, "Exchange::destroy() notifying " << i->first);
+        QPID_LOG(debug, "Exchange::destroy() notifying " << i->first);
         if (i->second) i->second();
     }
 }

Modified: qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/broker/PagedQueue.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/broker/PagedQueue.cpp?rev=1560634&r1=1560633&r2=1560634&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/broker/PagedQueue.cpp (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/broker/PagedQueue.cpp Thu Jan 23 11:01:02 2014
@@ -24,13 +24,18 @@
 #include "qpid/broker/Message.h"
 #include "qpid/log/Statement.h"
 #include "qpid/framing/reply_exceptions.h"
+#include "qpid/sys/Time.h"
 #include <string.h>
 
 namespace qpid {
 namespace broker {
 namespace {
+using qpid::sys::AbsTime;
+using qpid::sys::Duration;
+using qpid::sys::EPOCH;
+using qpid::sys::FAR_FUTURE;
 using qpid::sys::MemoryMappedFile;
-const uint32_t OVERHEAD(4/*content-size*/ + 4/*sequence-number*/ + 8/*persistence-id*/);
+const uint32_t OVERHEAD(4/*content-size*/ + 4/*sequence-number*/ + 8/*persistence-id*/ + 8/*expiration*/);
 
 size_t encodedSize(const Message& msg)
 {
@@ -46,35 +51,55 @@ size_t encode(const Message& msg, char* 
     buffer.putLong(encoded);
     buffer.putLong(msg.getSequence());
     buffer.putLongLong(msg.getPersistentContext()->getPersistenceId());
+    sys::AbsTime expiration = msg.getExpiration();
+    int64_t t(0);
+    if (expiration < FAR_FUTURE) {
+        t = Duration(EPOCH, expiration);
+    }
+    buffer.putLongLong(t);
     msg.getPersistentContext()->encode(buffer);
     assert(buffer.getPosition() == required);
     return required;
 }
 
-size_t decode(ProtocolRegistry& protocols, Message& msg, const char* data, size_t size)
+size_t decode(ProtocolRegistry& protocols, Message& msg, const char* data, size_t size,
+              boost::intrusive_ptr<ExpiryPolicy> expiryPolicy)
 {
     qpid::framing::Buffer metadata(const_cast<char*>(data), size);
     uint32_t encoded = metadata.getLong();
     uint32_t sequence = metadata.getLong();
     uint64_t persistenceId = metadata.getLongLong();
+    int64_t t = metadata.getLongLong();
     assert(metadata.available() >= encoded);
     qpid::framing::Buffer buffer(const_cast<char*>(data) + metadata.getPosition(), encoded);
     msg = protocols.decode(buffer);
     assert(buffer.getPosition() == encoded);
     msg.setSequence(qpid::framing::SequenceNumber(sequence));
     msg.getPersistentContext()->setPersistenceId(persistenceId);
+    if (t) {
+        sys::AbsTime expiration(EPOCH, t);
+        msg.setExpiryPolicy(expiryPolicy);
+        msg.setExpiration(expiration);
+    }
     return encoded + metadata.getPosition();
 }
 
 }
 
-PagedQueue::PagedQueue(const std::string& name_, const std::string& directory, uint m, uint factor, ProtocolRegistry& p)
-    : name(name_), pageSize(file.getPageSize()*factor), maxLoaded(m), protocols(p), offset(0), loaded(0), version(0)
+PagedQueue::PagedQueue(const std::string& name_, const std::string& directory, uint m, uint factor, ProtocolRegistry& p,
+                       boost::intrusive_ptr<ExpiryPolicy> e)
+    : name(name_), pageSize(file.getPageSize()*factor), maxLoaded(m), protocols(p), offset(0), loaded(0), version(0),
+      expiryPolicy(e)
 {
     path = file.open(name, directory);
     QPID_LOG(debug, "PagedQueue[" << path << "]");
 }
 
+PagedQueue::~PagedQueue()
+{
+    file.close(path);
+}
+
 size_t PagedQueue::size()
 {
     size_t total(0);
@@ -294,7 +319,7 @@ Message* PagedQueue::Page::find(qpid::fr
     //if it is the last in the page, decrement the hint count of the page
 }
 
-void PagedQueue::Page::load(MemoryMappedFile& file, ProtocolRegistry& protocols)
+void PagedQueue::Page::load(MemoryMappedFile& file, ProtocolRegistry& protocols, boost::intrusive_ptr<ExpiryPolicy> expiryPolicy)
 {
     QPID_LOG(debug, "Page[" << offset << "]::load" << " used=" << used << ", size=" << size);
     assert(region == 0);
@@ -308,7 +333,7 @@ void PagedQueue::Page::load(MemoryMapped
         //decode messages into Page::messages
         for (size_t i = 0; i < count; ++i) {
             Message message;
-            used += decode(protocols, message, region + used, size - used);
+            used += decode(protocols, message, region + used, size - used, expiryPolicy);
             if (!contents.contains(message.getSequence())) {
                 message.setState(DELETED);
                 QPID_LOG(debug, "Setting state to deleted for message loaded at " << message.getSequence());
@@ -361,7 +386,7 @@ void PagedQueue::load(Page& page)
         assert(i != used.rend());
         unload(i->second);
     }
-    page.load(file, protocols);
+    page.load(file, protocols, expiryPolicy);
     ++loaded;
     QPID_LOG(debug, "PagedQueue[" << path << "] loaded page, " << loaded << " pages now loaded");
 }

Modified: qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/broker/PagedQueue.h
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/broker/PagedQueue.h?rev=1560634&r1=1560633&r2=1560634&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/broker/PagedQueue.h (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/broker/PagedQueue.h Thu Jan 23 11:01:02 2014
@@ -31,13 +31,16 @@
 
 namespace qpid {
 namespace broker {
+class ExpiryPolicy;
 class ProtocolRegistry;
 /**
  *
  */
 class PagedQueue : public Messages {
   public:
-    PagedQueue(const std::string& name, const std::string& directory, uint maxLoaded, uint pageFactor, ProtocolRegistry& protocols);
+    PagedQueue(const std::string& name, const std::string& directory, uint maxLoaded, uint pageFactor, ProtocolRegistry& protocols,
+               boost::intrusive_ptr<ExpiryPolicy>);
+    ~PagedQueue();
     size_t size();
     bool deleted(const QueueCursor&);
     void publish(const Message& added);
@@ -59,7 +62,7 @@ class PagedQueue : public Messages {
         bool add(const Message&);
         Message* next(uint32_t version, QueueCursor&);
         Message* find(qpid::framing::SequenceNumber);
-        void load(qpid::sys::MemoryMappedFile&,ProtocolRegistry&);
+        void load(qpid::sys::MemoryMappedFile&,ProtocolRegistry&, boost::intrusive_ptr<ExpiryPolicy>);
         void unload(qpid::sys::MemoryMappedFile&);
         void clear(qpid::sys::MemoryMappedFile&);
         size_t available() const;
@@ -86,6 +89,7 @@ class PagedQueue : public Messages {
     std::list<Page> free;
     uint loaded;
     uint32_t version;
+    boost::intrusive_ptr<ExpiryPolicy> expiryPolicy;//needed on reload
 
     void addPages(size_t count);
     Page& newPage(qpid::framing::SequenceNumber);

Modified: qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/broker/QueueFactory.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/broker/QueueFactory.cpp?rev=1560634&r1=1560633&r2=1560634&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/broker/QueueFactory.cpp (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/broker/QueueFactory.cpp Thu Jan 23 11:01:02 2014
@@ -80,7 +80,7 @@ boost::shared_ptr<Queue> QueueFactory::c
             queue->messages = std::auto_ptr<Messages>(new PagedQueue(name, broker->getPagingDirectoryPath(),
                                                                      settings.maxPages ? settings.maxPages : 4,
                                                                      settings.pageFactor ? settings.pageFactor : 1,
-                                                                     broker->getProtocolRegistry()));
+                                                                     broker->getProtocolRegistry(), broker->getExpiryPolicy()));
         }
     } else if (settings.lvqKey.empty()) {//LVQ already handled above
         queue->messages = std::auto_ptr<Messages>(new MessageDeque());

Modified: qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/broker/Selector.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/broker/Selector.cpp?rev=1560634&r1=1560633&r2=1560634&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/broker/Selector.cpp (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/broker/Selector.cpp Thu Jan 23 11:01:02 2014
@@ -30,6 +30,7 @@
 #include "qpid/log/Statement.h"
 #include "qpid/types/Variant.h"
 
+#include <stdexcept>
 #include <string>
 #include <sstream>
 #include "qpid/sys/unordered_map.h"

Modified: qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/broker/System.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/broker/System.cpp?rev=1560634&r1=1560633&r2=1560634&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/broker/System.cpp (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/broker/System.cpp Thu Jan 23 11:01:02 2014
@@ -64,7 +64,7 @@ System::System (string _dataDir, Broker*
             }
         }
 
-        mgmtObject = _qmf::System::shared_ptr(new _qmf::System(agent, this, types::Uuid(systemId.c_array())));
+        mgmtObject = _qmf::System::shared_ptr(new _qmf::System(agent, this, systemId));
         qpid::sys::SystemInfo::getSystemId (osName,
                                             nodeName,
                                             release,

Modified: qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/broker/amqp/Connection.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/broker/amqp/Connection.cpp?rev=1560634&r1=1560633&r2=1560634&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/broker/amqp/Connection.cpp (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/broker/amqp/Connection.cpp Thu Jan 23 11:01:02 2014
@@ -70,7 +70,7 @@ Connection::Connection(qpid::sys::Output
     : BrokerContext(b), ManagedConnection(getBroker(), i),
       connection(pn_connection()),
       transport(pn_transport()),
-      out(o), id(i), haveOutput(true), closeInitiated(false)
+      out(o), id(i), haveOutput(true), closeInitiated(false), closeRequested(false)
 {
     if (pn_transport_bind(transport, connection)) {
         //error
@@ -169,9 +169,22 @@ size_t Connection::encode(char* buffer, 
 bool Connection::canEncode()
 {
     if (!closeInitiated) {
+        if (closeRequested) {
+            close();
+            return true;
+        }
         try {
-            for (Sessions::iterator i = sessions.begin();i != sessions.end(); ++i) {
-                if (i->second->dispatch()) haveOutput = true;
+            for (Sessions::iterator i = sessions.begin();i != sessions.end();) {
+                if (i->second->endedByManagement()) {
+                    pn_session_close(i->first);
+                    i->second->close();
+                    sessions.erase(i++);
+                    haveOutput = true;
+                    QPID_LOG_CAT(debug, model, id << " session ended by management");
+                } else {
+                    if (i->second->dispatch()) haveOutput = true;
+                    ++i;
+                }
             }
             process();
         } catch (const Exception& e) {
@@ -372,4 +385,10 @@ void Connection::setUserId(const std::st
         throw Exception(qpid::amqp::error_conditions::RESOURCE_LIMIT_EXCEEDED, "User connection denied by configured limit");
     }
 }
+
+void Connection::closedByManagement()
+{
+    closeRequested = true;
+    out.activateOutput();
+}
 }}} // namespace qpid::broker::amqp

Modified: qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/broker/amqp/Connection.h
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/broker/amqp/Connection.h?rev=1560634&r1=1560633&r2=1560634&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/broker/amqp/Connection.h (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/broker/amqp/Connection.h Thu Jan 23 11:01:02 2014
@@ -69,12 +69,14 @@ class Connection : public BrokerContext,
     bool haveOutput;
     Sessions sessions;
     bool closeInitiated;
+    bool closeRequested;
 
     virtual void process();
     std::string getError();
     void close();
     void open();
     void readPeerProperties();
+    void closedByManagement();
 };
 }}} // namespace qpid::broker::amqp
 

Modified: qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/broker/amqp/ManagedConnection.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/broker/amqp/ManagedConnection.cpp?rev=1560634&r1=1560633&r2=1560634&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/broker/amqp/ManagedConnection.cpp (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/broker/amqp/ManagedConnection.cpp Thu Jan 23 11:01:02 2014
@@ -19,6 +19,8 @@
  *
  */
 #include "qpid/broker/amqp/ManagedConnection.h"
+#include "qpid/broker/amqp/Exception.h"
+#include "qpid/amqp/descriptors.h"
 #include "qpid/broker/Broker.h"
 #include "qpid/management/ManagementAgent.h"
 #include "qpid/log/Statement.h"
@@ -176,4 +178,34 @@ void ManagedConnection::incomingMessageR
     if (connection) connection->inc_msgsFromClient();
 }
 
+void ManagedConnection::closedByManagement()
+{
+    throw Exception(qpid::amqp::error_conditions::NOT_IMPLEMENTED, QPID_MSG(id << "Connection close requested, but not implemented"));
+}
+
+qpid::management::Manageable::status_t ManagedConnection::ManagementMethod(uint32_t methodId, qpid::management::Args&, std::string& error)
+{
+    qpid::management::Manageable::status_t status = qpid::management::Manageable::STATUS_UNKNOWN_METHOD;
+
+    try {
+        switch (methodId)
+        {
+          case _qmf::Connection::METHOD_CLOSE :
+            closedByManagement();
+            if (connection) connection->set_closing(true);
+            status = qpid::management::Manageable::STATUS_OK;
+            break;
+        }
+    } catch (const Exception& e) {
+        if (e.symbol() == qpid::amqp::error_conditions::NOT_IMPLEMENTED) {
+            status = qpid::management::Manageable::STATUS_NOT_IMPLEMENTED;
+        } else {
+            error = e.what();
+            status = qpid::management::Manageable::STATUS_EXCEPTION;
+        }
+    }
+
+    return status;
+}
+
 }}} // namespace qpid::broker::amqp

Modified: qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/broker/amqp/ManagedConnection.h
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/broker/amqp/ManagedConnection.h?rev=1560634&r1=1560633&r2=1560634&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/broker/amqp/ManagedConnection.h (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/broker/amqp/ManagedConnection.h Thu Jan 23 11:01:02 2014
@@ -61,6 +61,11 @@ class ManagedConnection : public qpid::m
     const std::map<std::string, types::Variant>& getClientProperties() const;
     virtual bool isLink() const;
     void opened();
+
+    qpid::management::Manageable::status_t ManagementMethod(uint32_t methodId, qpid::management::Args&, std::string&);
+
+  protected:
+    virtual void closedByManagement();
   private:
     const std::string id;
     std::string userid;

Modified: qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/broker/amqp/ManagedSession.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/broker/amqp/ManagedSession.cpp?rev=1560634&r1=1560633&r2=1560634&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/broker/amqp/ManagedSession.cpp (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/broker/amqp/ManagedSession.cpp Thu Jan 23 11:01:02 2014
@@ -20,6 +20,8 @@
  */
 #include "qpid/broker/amqp/ManagedSession.h"
 #include "qpid/broker/amqp/ManagedConnection.h"
+#include "qpid/broker/amqp/Exception.h"
+#include "qpid/amqp/descriptors.h"
 #include "qpid/broker/Broker.h"
 #include "qpid/management/ManagementAgent.h"
 #include "qpid/log/Statement.h"
@@ -89,4 +91,41 @@ ManagedConnection& ManagedSession::getPa
     return parent;
 }
 
+void ManagedSession::detachedByManagement()
+{
+    throw Exception(qpid::amqp::error_conditions::NOT_IMPLEMENTED, QPID_MSG(id << "Session detach requested, but not implemented"));
+}
+
+qpid::management::Manageable::status_t ManagedSession::ManagementMethod (uint32_t methodId,
+                                                                         qpid::management::Args& /*args*/,
+                                                                         std::string&  error)
+{
+    qpid::management::Manageable::status_t status = qpid::management::Manageable::STATUS_UNKNOWN_METHOD;
+
+    try {
+        switch (methodId)
+        {
+          case _qmf::Session::METHOD_DETACH :
+            detachedByManagement();
+            status = qpid::management::Manageable::STATUS_OK;
+            break;
+
+          case _qmf::Session::METHOD_CLOSE :
+          case _qmf::Session::METHOD_SOLICITACK :
+          case _qmf::Session::METHOD_RESETLIFESPAN :
+            status = Manageable::STATUS_NOT_IMPLEMENTED;
+            break;
+        }
+    } catch (const Exception& e) {
+        if (e.symbol() == qpid::amqp::error_conditions::NOT_IMPLEMENTED) {
+            status = qpid::management::Manageable::STATUS_NOT_IMPLEMENTED;
+        } else {
+            error = e.what();
+            status = qpid::management::Manageable::STATUS_EXCEPTION;
+        }
+    }
+
+    return status;
+}
+
 }}} // namespace qpid::broker::amqp

Modified: qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/broker/amqp/ManagedSession.h
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/broker/amqp/ManagedSession.h?rev=1560634&r1=1560633&r2=1560634&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/broker/amqp/ManagedSession.h (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/broker/amqp/ManagedSession.h Thu Jan 23 11:01:02 2014
@@ -48,6 +48,10 @@ class ManagedSession : public qpid::mana
     void outgoingMessageAccepted();
     void outgoingMessageRejected();
     ManagedConnection& getParent();
+
+    qpid::management::Manageable::status_t ManagementMethod (uint32_t, qpid::management::Args&, std::string&);
+  protected:
+    virtual void detachedByManagement();
   private:
     ManagedConnection& parent;
     const std::string id;

Modified: qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/broker/amqp/Session.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/broker/amqp/Session.cpp?rev=1560634&r1=1560633&r2=1560634&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/broker/amqp/Session.cpp (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/broker/amqp/Session.cpp Thu Jan 23 11:01:02 2014
@@ -202,7 +202,8 @@ class IncomingToExchange : public Decodi
 
 Session::Session(pn_session_t* s, Connection& c, qpid::sys::OutputControl& o)
     : ManagedSession(c.getBroker(), c, (boost::format("%1%") % s).str()), session(s), connection(c), out(o), deleted(false),
-      authorise(connection.getUserId(), connection.getBroker().getAcl()) {}
+      authorise(connection.getUserId(), connection.getBroker().getAcl()),
+      detachRequested() {}
 
 
 Session::ResolvedNode Session::resolve(const std::string name, pn_terminus_t* terminus, bool incoming)
@@ -689,6 +690,17 @@ Authorise& Session::getAuthorise()
     return authorise;
 }
 
+bool Session::endedByManagement() const
+{
+    return detachRequested;
+}
+
+void Session::detachedByManagement()
+{
+    detachRequested = true;
+    wakeup();
+}
+
 void IncomingToQueue::handle(qpid::broker::Message& message)
 {
     if (queue->isDeleted()) {

Modified: qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/broker/amqp/Session.h
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/broker/amqp/Session.h?rev=1560634&r1=1560633&r2=1560634&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/broker/amqp/Session.h (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/broker/amqp/Session.h Thu Jan 23 11:01:02 2014
@@ -66,6 +66,7 @@ class Session : public ManagedSession, p
     void readable(pn_link_t*, pn_delivery_t*);
     void writable(pn_link_t*, pn_delivery_t*);
     bool dispatch();
+    bool endedByManagement() const;
     void close();
 
     /**
@@ -79,6 +80,8 @@ class Session : public ManagedSession, p
     void wakeup();
 
     Authorise& getAuthorise();
+  protected:
+    void detachedByManagement();
   private:
     typedef std::map<pn_link_t*, boost::shared_ptr<Outgoing> > OutgoingLinks;
     typedef std::map<pn_link_t*, boost::shared_ptr<Incoming> > IncomingLinks;
@@ -92,6 +95,7 @@ class Session : public ManagedSession, p
     qpid::sys::Mutex lock;
     std::set< boost::shared_ptr<Queue> > exclusiveQueues;
     Authorise authorise;
+    bool detachRequested;
 
     struct ResolvedNode
     {

Modified: qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/broker/amqp/Topic.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/broker/amqp/Topic.cpp?rev=1560634&r1=1560633&r2=1560634&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/broker/amqp/Topic.cpp (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/broker/amqp/Topic.cpp Thu Jan 23 11:01:02 2014
@@ -163,7 +163,7 @@ bool TopicRegistry::add(boost::shared_pt
         topics.insert(Topics::value_type(topic->getName(), topic));
         return true;
     } else {
-        return false;
+        throw qpid::types::Exception(QPID_MSG("A topic named " << topic->getName() << " already exists"));
     }
 
 }

Modified: qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/broker/amqp_0_10/MessageTransfer.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/broker/amqp_0_10/MessageTransfer.cpp?rev=1560634&r1=1560633&r2=1560634&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/broker/amqp_0_10/MessageTransfer.cpp (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/broker/amqp_0_10/MessageTransfer.cpp Thu Jan 23 11:01:02 2014
@@ -79,7 +79,7 @@ amqp::MessageId MessageTransfer::getMess
 
     amqp::MessageId r;
     if (mp->hasMessageId()) {
-        r.set(amqp::CharSequence::create(&mp->getMessageId()[0],16), types::VAR_UUID);
+        r.set(amqp::CharSequence::create(mp->getMessageId().data(),16), types::VAR_UUID);
     }
     return r;
 }

Modified: qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/client/SslConnector.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/client/SslConnector.cpp?rev=1560634&r1=1560633&r2=1560634&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/client/SslConnector.cpp (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/client/SslConnector.cpp Thu Jan 23 11:01:02 2014
@@ -359,8 +359,10 @@ size_t SslConnector::decode(const char* 
                 throw Exception(QPID_MSG("Unsupported version: " << protocolInit
                                          << " supported version " << version));
             }
+            initiated = true;
+        } else {
+            return size - in.available();
         }
-        initiated = true;
     }
     AMQFrame frame;
     while(frame.decode(in)){

Modified: qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/client/TCPConnector.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/client/TCPConnector.cpp?rev=1560634&r1=1560633&r2=1560634&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/client/TCPConnector.cpp (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/client/TCPConnector.cpp Thu Jan 23 11:01:02 2014
@@ -289,8 +289,10 @@ size_t TCPConnector::decode(const char* 
                 throw Exception(QPID_MSG("Unsupported version: " << protocolInit
                                          << " supported version " << version));
             }
+            initiated = true;
+        } else {
+            return size - in.available();
         }
-        initiated = true;
     }
     AMQFrame frame;
     while(frame.decode(in)){

Modified: qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/framing/ProtocolVersion.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/framing/ProtocolVersion.cpp?rev=1560634&r1=1560633&r2=1560634&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/framing/ProtocolVersion.cpp (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/framing/ProtocolVersion.cpp Thu Jan 23 11:01:02 2014
@@ -26,7 +26,7 @@ using namespace qpid::framing;
 const std::string ProtocolVersion::toString() const
 {
     std::stringstream ss;
-    ss << major_ << "-" << minor_;
+    ss << unsigned(major_) << "-" << unsigned(minor_);
     if (major_ == 1) {
         if (protocol_ == SASL) ss << " (SASL)";
         else if (protocol_ == TLS) ss << " (TLS)";
@@ -46,7 +46,7 @@ bool ProtocolVersion::operator==(Protoco
     return major_ == p.major_ && minor_ == p.minor_;
 }
 
-uint8_t ProtocolVersion::AMQP(0);
-uint8_t ProtocolVersion::LEGACY_AMQP(1);
-uint8_t ProtocolVersion::TLS(2);
-uint8_t ProtocolVersion::SASL(3);
+const uint8_t ProtocolVersion::AMQP(0);
+const uint8_t ProtocolVersion::LEGACY_AMQP(1);
+const uint8_t ProtocolVersion::TLS(2);
+const uint8_t ProtocolVersion::SASL(3);

Modified: qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/framing/ProtocolVersion.h
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/framing/ProtocolVersion.h?rev=1560634&r1=1560633&r2=1560634&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/framing/ProtocolVersion.h (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/framing/ProtocolVersion.h Thu Jan 23 11:01:02 2014
@@ -54,10 +54,10 @@ public:
 
     QPID_COMMON_EXTERN bool operator==(ProtocolVersion p) const;
     QPID_COMMON_INLINE_EXTERN bool operator!=(ProtocolVersion p) const { return ! (*this == p); }
-    QPID_COMMON_EXTERN static uint8_t AMQP;
-    QPID_COMMON_EXTERN static uint8_t LEGACY_AMQP;
-    QPID_COMMON_EXTERN static uint8_t TLS;
-    QPID_COMMON_EXTERN static uint8_t SASL;
+    QPID_COMMON_EXTERN static const uint8_t AMQP;
+    QPID_COMMON_EXTERN static const uint8_t LEGACY_AMQP;
+    QPID_COMMON_EXTERN static const uint8_t TLS;
+    QPID_COMMON_EXTERN static const uint8_t SASL;
 };
 
 } // namespace framing

Modified: qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/framing/Uuid.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/framing/Uuid.cpp?rev=1560634&r1=1560633&r2=1560634&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/framing/Uuid.cpp (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/framing/Uuid.cpp Thu Jan 23 11:01:02 2014
@@ -29,47 +29,13 @@ namespace framing {
 
 using namespace std;
 
-static const size_t UNPARSED_SIZE=36; 
-
-Uuid::Uuid(bool unique) {
-    if (unique) {
-        generate();
-    } else {
-        clear();
-    }
-}
-
-Uuid::Uuid(const uint8_t* data) {
-    assign(data);
-}
-
-Uuid::Uuid(const std::string& s) {
-    if (s.size() != UNPARSED_SIZE)
-        throw IllegalArgumentException(QPID_MSG("Invalid UUID: " << s));
-    if (uuid_parse(const_cast<char *>(&s[0]), c_array()) != 0)
-        throw IllegalArgumentException(QPID_MSG("Invalid UUID: " << s));
-}
-
-void Uuid::assign(const uint8_t* data) {
-    // This const cast is for Solaris which has a 
-    // uuid_copy that takes a non const 2nd argument
-    uuid_copy(c_array(), const_cast<uint8_t*>(data));
-}
-
-void Uuid::generate() {
-    uuid_generate(c_array());
-}
-
-void Uuid::clear() {
-    uuid_clear(c_array());
-}
-
-// Force int 0/!0 to false/true; avoids compile warnings.
-bool Uuid::isNull() const {
-    // This const cast is for Solaris which has a 
-    // uuid_is_null that takes a non const argument
-    return !!uuid_is_null(const_cast<uint8_t*>(data()));
-}
+Uuid::Uuid(bool unique):
+  qpid::types::Uuid(unique)
+{}
+
+Uuid::Uuid(const uint8_t* data):
+  qpid::types::Uuid(data)
+{}
 
 void Uuid::encode(Buffer& buf) const {
     buf.putRawData(data(), size());
@@ -78,29 +44,9 @@ void Uuid::encode(Buffer& buf) const {
 void Uuid::decode(Buffer& buf) {
     if (buf.available() < size())
         throw IllegalArgumentException(QPID_MSG("Not enough data for UUID."));
-    buf.getRawData(c_array(), size());
-}
-
-ostream& operator<<(ostream& out, Uuid uuid) {
-    char unparsed[UNPARSED_SIZE + 1];
-    uuid_unparse(uuid.data(), unparsed);
-    return out << unparsed;
-}
-
-istream& operator>>(istream& in, Uuid& uuid) {
-    char unparsed[UNPARSED_SIZE + 1] = {0};
-    in.get(unparsed, sizeof(unparsed));
-    if (!in.fail()) {
-        if (uuid_parse(unparsed, uuid.c_array()) != 0) 
-            in.setstate(ios::failbit);
-    }
-    return in;
-}
 
-std::string Uuid::str() const {
-    std::ostringstream os;
-    os << *this;
-    return os.str();
+    // Break qpid::types::Uuid encapsulation - Nasty, but efficient
+    buf.getRawData(const_cast<uint8_t*>(data()), size());
 }
 
 }} // namespace qpid::framing

Modified: qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/framing/Uuid.h
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/framing/Uuid.h?rev=1560634&r1=1560633&r2=1560634&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/framing/Uuid.h (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/framing/Uuid.h Thu Jan 23 11:01:02 2014
@@ -22,7 +22,7 @@
 #include "qpid/CommonImportExport.h"
 #include "qpid/sys/IntegerTypes.h"
 
-#include <boost/array.hpp>
+#include "qpid/types/Uuid.h"
 
 #include <ostream>
 #include <istream>
@@ -33,62 +33,25 @@ namespace framing {
 class Buffer;
 
 /**
- * A UUID is represented as a boost::array of 16 bytes.
- *
- * Full value semantics, operators ==, < etc.  are provided by
- * boost::array so Uuid can be the key type in a map etc.
- *
- * TODO: change this implementation as it leaks boost into the
- * client API
+ * Framing UUID is now a thine wrapper around qpid::types::Uuid
  */
-struct Uuid : public boost::array<uint8_t, 16> {
+struct Uuid : public qpid::types::Uuid {
     /** If unique is true, generate a unique ID else a null ID. */
     QPID_COMMON_EXTERN Uuid(bool unique=false);
 
     /** Copy from 16 bytes of data. */
     QPID_COMMON_EXTERN Uuid(const uint8_t* data);
 
-    /** Parse format 1b4e28ba-2fa1-11d2-883f-b9a761bde3fb. */
-    QPID_COMMON_EXTERN Uuid(const std::string&);
-
-    // Default op= and copy ctor are fine.
-    // boost::array gives us ==, < etc.
-
-    /** Copy from 16 bytes of data. */
-    QPID_COMMON_EXTERN void assign(const uint8_t* data);
-
-    /** Set to a new unique identifier. */
-    QPID_COMMON_EXTERN void generate();
-
-    /** Set to all zeros. */
-    QPID_COMMON_EXTERN void clear();
-
-    /** Test for null (all zeros). */
-    QPID_COMMON_EXTERN bool isNull() const;
-    QPID_COMMON_INLINE_EXTERN operator bool() const { return !isNull(); }
-    QPID_COMMON_INLINE_EXTERN bool operator!() const { return isNull(); }
+    // We get most of our operations directly from qpid::types::Uuid
+    QPID_COMMON_INLINE_EXTERN static size_t size()
+        { return SIZE; }
 
     QPID_COMMON_EXTERN void encode(framing::Buffer& buf) const;
     QPID_COMMON_EXTERN void decode(framing::Buffer& buf);
     QPID_COMMON_INLINE_EXTERN uint32_t encodedSize() const
-        { return static_cast<uint32_t>(size()); }
-
-    /** String value in format 1b4e28ba-2fa1-11d2-883f-b9a761bde3fb. */
-    QPID_COMMON_EXTERN std::string str() const;
-
-    template <class S> void serialize(S& s) {
-        s.raw(begin(), size());
-    }
+        { return size(); }
 };
 
-/** Print in format 1b4e28ba-2fa1-11d2-883f-b9a761bde3fb. */
-QPID_COMMON_EXTERN std::ostream& operator<<(std::ostream&, Uuid);
-
-/** Read from format 1b4e28ba-2fa1-11d2-883f-b9a761bde3fb. */
-QPID_COMMON_EXTERN std::istream& operator>>(std::istream&, Uuid&);
-
 }} // namespace qpid::framing
 
-
-
 #endif  /*!QPID_FRAMING_UUID_H*/

Modified: qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp?rev=1560634&r1=1560633&r2=1560634&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp Thu Jan 23 11:01:02 2014
@@ -863,6 +863,23 @@ bool BrokerReplicator::unbind(boost::sha
 bool BrokerReplicator::isBound(boost::shared_ptr<Queue>, const string* const, const framing::FieldTable* const) { return false; }
 bool BrokerReplicator::hasBindings() { return false; }
 
+// ConnectionObserver methods
+void BrokerReplicator::connection(broker::Connection&) {}
+void BrokerReplicator::opened(broker::Connection&) {}
+
+void BrokerReplicator::closed(broker::Connection& c) {
+    if (link && &c == connect) disconnected();
+}
+
+void BrokerReplicator::forced(broker::Connection& c, const std::string& message) {
+    if (link && &c == link->getConnection()) {
+        haBroker.shutdown(
+            QPID_MSG(logPrefix << "Connection forced, cluster may be misconfigured: "
+                     << message));
+    }
+    closed(c);
+}
+
 string BrokerReplicator::getType() const { return QPID_CONFIGURATION_REPLICATOR; }
 
 void BrokerReplicator::disconnectedQueueReplicator(boost::shared_ptr<Exchange> ex) {

Modified: qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/ha/BrokerReplicator.h
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/ha/BrokerReplicator.h?rev=1560634&r1=1560633&r2=1560634&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/ha/BrokerReplicator.h (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/ha/BrokerReplicator.h Thu Jan 23 11:01:02 2014
@@ -90,10 +90,10 @@ class BrokerReplicator : public broker::
     bool hasBindings();
 
     // ConnectionObserver methods
-    void connection(broker::Connection&) {}
-    void opened(broker::Connection&) {}
-    void closed(broker::Connection& c) { if (link && &c == connect) disconnected(); }
-    void forced(broker::Connection& c, const std::string& /*message*/) { closed(c); }
+    void connection(broker::Connection&);
+    void opened(broker::Connection&);
+    void closed(broker::Connection&);
+    void forced(broker::Connection&, const std::string& /*message*/);
 
     QueueReplicatorPtr findQueueReplicator(const std::string& qname);
 

Modified: qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/ha/Membership.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/ha/Membership.cpp?rev=1560634&r1=1560633&r2=1560634&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/ha/Membership.cpp (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/ha/Membership.cpp Thu Jan 23 11:01:02 2014
@@ -146,7 +146,7 @@ bool checkTransition(BrokerStatus from, 
 
 void Membership::update(Mutex::ScopedLock& l) {
     QPID_LOG(info, "Membership: " <<  brokers);
-// Update managment and send update event.
+    // Update managment and send update event.
     BrokerStatus newStatus = getStatus(l);
     Variant::List brokerList = asList(l);
     if (mgmtObject) {

Modified: qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/ha/Primary.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/ha/Primary.cpp?rev=1560634&r1=1560633&r2=1560634&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/ha/Primary.cpp (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/ha/Primary.cpp Thu Jan 23 11:01:02 2014
@@ -284,7 +284,7 @@ void Primary::exchangeCreate(const Excha
         QPID_LOG(debug, logPrefix << "Created exchange " << ex->getName()
                  << " replication: " << printable(level));
          // Give each exchange a unique id to avoid confusion of same-named exchanges.
-        args.set(QPID_HA_UUID, FieldTable::ValuePtr(new UuidValue(&Uuid(true)[0])));
+        args.set(QPID_HA_UUID, FieldTable::ValuePtr(new UuidValue(Uuid(true).data())));
     }
     ex->setArgs(args);
 }

Modified: qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/ha/StatusCheck.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/ha/StatusCheck.cpp?rev=1560634&r1=1560633&r2=1560634&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/ha/StatusCheck.cpp (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/ha/StatusCheck.cpp Thu Jan 23 11:01:02 2014
@@ -57,20 +57,19 @@ void StatusCheckThread::run() {
     try {
         // Check for self connections
         Variant::Map options, clientProperties;
-        clientProperties[ConnectionObserver::ADMIN_TAG] = 1; // Allow connection to backups.
+        clientProperties[ConnectionObserver::ADMIN_TAG] = 1; // Allow connection to backups
         clientProperties[ConnectionObserver::ADDRESS_TAG] = url.str();
-        clientProperties[ConnectionObserver::BACKUP_TAG] = statusCheck.haBroker.getBrokerInfo().asMap();
+        clientProperties[ConnectionObserver::BACKUP_TAG] = statusCheck.brokerInfo.asMap();
 
         // Set connection options
-        Settings settings(statusCheck.haBroker.getSettings());
+        const Settings& settings = statusCheck.settings;
         if (settings.username.size()) options["username"] = settings.username;
         if (settings.password.size()) options["password"] = settings.password;
         if (settings.mechanism.size()) options["sasl_mechanisms"] = settings.mechanism;
         options["client-properties"] = clientProperties;
-        sys::Duration heartbeat(statusCheck.haBroker.getBroker().getOptions().linkHeartbeatInterval);
-        options["heartbeat"] = heartbeat/sys::TIME_SEC;
-        c = Connection(url.str(), options);
+        options["heartbeat"] = statusCheck.heartbeat/sys::TIME_SEC;
 
+        c = Connection(url.str(), options);
         c.open();
         Session session = c.createSession();
         messaging::Address responses("#;{create:always,node:{x-declare:{exclusive:True,auto-delete:True,arguments:{'qpid.replicate':none}}}}");
@@ -88,7 +87,7 @@ void StatusCheckThread::run() {
         content["_object_id"] = oid;
         encode(content, request);
         s.send(request);
-        messaging::Duration timeout(heartbeat/sys::TIME_MSEC);
+        messaging::Duration timeout(statusCheck.heartbeat/sys::TIME_MSEC);
         Message response = r.fetch(timeout);
         session.acknowledge();
         Variant::List contentIn;
@@ -103,17 +102,18 @@ void StatusCheckThread::run() {
             }
         }
         else
-            QPID_LOG(error, logPrefix << "Invalid response " << response.getContent())
-    } catch(const exception& error) {
-        // Its not an error to fail to connect to self.
-        if (statusCheck.haBroker.getBrokerInfo().getAddress() != url[0])
-            QPID_LOG(warning, logPrefix << error.what());
-    }
+            QPID_LOG(error, logPrefix << "Invalid response " << response.getContent());
+    } catch(...) {}
     try { c.close(); } catch(...) {}
     delete this;
 }
 
-StatusCheck::StatusCheck(HaBroker& hb) : promote(true), haBroker(hb)
+// Note: Don't use hb outside of the constructor, it may be deleted.
+StatusCheck::StatusCheck(HaBroker& hb) :
+    promote(true),
+    settings(hb.getSettings()),
+    heartbeat(hb.getBroker().getOptions().linkHeartbeatInterval),
+    brokerInfo(hb.getBrokerInfo())
 {}
 
 StatusCheck::~StatusCheck() {

Modified: qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/ha/StatusCheck.h
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/ha/StatusCheck.h?rev=1560634&r1=1560633&r2=1560634&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/ha/StatusCheck.h (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/ha/StatusCheck.h Thu Jan 23 11:01:02 2014
@@ -65,7 +65,9 @@ class StatusCheck
     sys::Mutex lock;
     std::vector<sys::Thread> threads;
     bool promote;
-    HaBroker& haBroker;
+    const Settings settings;
+    const sys::Duration heartbeat;
+    const BrokerInfo brokerInfo;
 
   friend class StatusCheckThread;
 };

Modified: qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/linearstore/ISSUES
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/linearstore/ISSUES?rev=1560634&r1=1560633&r2=1560634&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/linearstore/ISSUES (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/linearstore/ISSUES Thu Jan 23 11:01:02 2014
@@ -17,52 +17,82 @@
 # under the License.
 #
 
-LinearStore issues:
+Linear Store issues:
 
-Store:
-------
-
-1. (FIXED) Overwrite identity: When recovering a previously used file, if the write boundary coincides with old record
-   start, no way of discriminating old from new at boundary (used to use OWI).
-
-2. (FIXED) QPID-5357: Recycling files while in use not working, however, files are recovered to EFP during recovery. Must solve
-   #1 first.
-
-3. (FIXED) QPID-5358: Checksum not implemented in record tail, not checked during read.
-
-4. QPID-5359: Rework qpid management parameters and controls (QMF).
-
-5. QPID-5360: Consistent logging: rework logging to provide uniform and consistent logging from store (both logging
-   level and places where logging occurs).
-
-6. QPID-5361: No tests
-   * No existing tests for linearstore:
-   ** Basic broker-level tests for txn and non-txn recovery
-   ** Store-level tests which check write boundary conditions
-   ** Unit tests
-   ** Basic performance tests
-
-7: QPID-5362: No tools
-   * Store analysis and status
-   * Recovery/reading of message content
-
-8. One journal file lost when queue deleted. All files except for one are recycled back to the EFP.
-
-9. Complete exceptions - several exceptions thrown using jexception have no exception numbers
-
-Current bugs and performance issues:
-------------------------------------
-1. BZ 1035843 - Slow performance for producers
-2. (FIXED) QPID-5387 (BZ 1036071) - Crash when deleting queue
-3. (FIXED) QPID-5388 (BZ 1035802) - Segmentation fault when recovering empty queue
-4. (UNABLE TO REPRODUCE) BZ 1036026 - Unable to create durable queue - framing error - possibly caused by running both stores at the same time
-5. (UNABLE TO REPRODUCE) BZ 1038599 - Abort when deleting used queue after restart - may be dup of QPID-5387 (BZ 1036071)
-6. BZ 1039522 - Crash during recovery - JournalFile::getFqFileName() -JERR_JREC_BADRECTAIL
-7. BZ 1039525 - Crash during recovery - journal::jexception - JERR_JREC_BADRECTAIL
-8. (FIXED) QPID-5442 (BZ 1039949) - DTX test failure - missing XIDs
-9. (FIXED) QPID-5460 (BZ 1051097) - Transactional messages lost during recovery
-10. QPID-5464 - Incompletely created journal files accumulate in EFP
-11. QPID-5473 (BZ 1051924) - Recovery where last record in file is truncated (ie spans files), but following file is uninitialized causes crash
+Current/pending:
+================
+ Q-JIRA RHBZ     Description / Comments
+ ------ -------  ----------------------
+   5359 -        Linearstore: Implement new management schema and wire into store
+   5360 -        Linearstore: Evaluate and rework logging to produce a consistent log output
+   5361 -        Linearstore: No tests for linearstore functionality currently exist
+                   * No existing tests for linearstore:
+                   ** Basic broker-level tests for txn and non-txn recovery
+                   ** Store-level tests which check write boundary conditions
+                   ** EFP tests, including file recovery, error management
+                   ** Unit tests
+                   ** Basic performance tests
+   5362 -        Linearstore: No store tools exist for examining the journals
+                   svn r.1558888 2014-01-09: WIP checkin for linearstore version of qpid_qls_analyze. Needs testing and tidy-up.
+                   * Store analysis and status
+                   * Recovery/reading of message content
+                   * Empty file pool status and management
+   5464 -        [linearstore] Incompletely created journal files accumulate in EFP
+   5479 1053701  [linearstore] Using recovered store results in "JERR_JNLF_FILEOFFSOVFL: Attempted to increase submitted offset past file size. (JournalFile::submittedDblkCount)" error message
+                   * Probablilty: 2 of 600 (0.3%) using tx-test-soak.sh
+   5480 1053749  [linearstore] Recovery of store failure with "JERR_MAP_NOTFOUND: Key not found in map." error message
+                   * Probability: 6 of 600 (1.0%) using tx-test-soak.sh
+                   * If broker is started a second time after failure, it starts correctly and test completes ok.
+   5484 1035843  Slow performance for producers
+                   svn r.1558592 fixes an issue with using /dev/random as a source of random numbers for Journal serial numbers.
+   -    1036026  [LinearStore] Qpid linear store unable to create durable queue - framing-error: Queue <q-name>: create() failed: jexception 0x0000
+                   UNABLE TO REPRODUCE - but Frantizek has additional info
+   -    1039522  Qpid crashes while recovering from linear store around apid::linearstore::journal::JournalFile::getFqFileName() including enq_rec::decode() threw JERR_JREC_BAD_RECTAIL
+                   * Possible dup of 1039525
+                   * May be fixed by QPID-5483 - waiting for needinfo
+   -    1039525  Qpid crashes while recovering from linear store around apid::linearstore::journal::jexception::format including enq_rec::decode() threw JERR_JREC_BAD_REC_TAIL
+                   * Possible dup of 1039522
+                   * May be fixed by QPID-5483 - waiting for needinfo
+   5487 -        [linearstore] Replace use of /dev/urandom with c random generator calls
+
+Fixed/closed:
+=============
+ Q-JIRA RHBZ     Description / Comments
+ ------ -------  ----------------------
+   5357 1052518  Linearstore: Empty file recycling not functional
+                   svn r.1545563 2013-11-26: Propsed fix
+   5358 1052727  Linearstore: Checksums not implemented in record tail
+                   svn r.1547601 2013-12-03: Propsed fix
+   5387 1036071  Linearstore: Segmentation fault when deleting queue
+                   svn r.1547641 2013-12-03: Propsed fix
+   5388 1035802  Linearstore: Segmentation fault when recovering empty queue
+                   svn r.1547921 2013-12-04: Propsed fix
+NO-JIRA -        Added missing Apache copyright/license text
+                   svn r.1551304 2013-12-16: Propsed fix
+   5425 1052445  Linearstore: Transaction Prepared List (TPL) fails with jexception 0x0402 AtomicCounter::addLimit() threw JERR_JNLF_FILEOFFSOVFL
+                   svn r.1551361 2013-12-16: Proposed fix
+   5442 1039949  Linearstore: Dtx recover test fails
+                   svn r.1552772 2013-12-20: Proposed fix
+   5444 1052775  Linearstore: Recovering from qpid-txtest fails with "Inconsistent TPL 2PC count" error message
+                   svn r.1553148 2013-12-23: Proposed fix
+   -    1038599  [LinearStore] Abort when deleting used queue after restart
+                   CLOSED-NOTABUG 2014-01-06
+   5460 1051097  [linearstore] Recovery of store which contains prepared but incomplete transactions results in message loss
+                   svn r.1556892 2014-01-09: Proposed fix
+   5473 1051924  [linearstore] Recovery of journal in which last logical file contains truncated record causes crash
+                   svn r.1557620 2014-01-12: Proposed fix
+   5483 -        [linearstore] Recovery of journal with partly written record fails with "JERR_JREC_BADRECTAIL: Invalid data record tail" error message
+                   svn r.1558589 2014-01-15: Proposed fix
+                   * May be linked to RHBZ 1039522 - waiting for needinfo
+                   * May be linked to RHBZ 1039525 - waiting for needinfo
+
+Future:
+=======
+* One journal file lost when queue deleted. All files except for one are recycled back to the EFP.
+* Complete exceptions - several exceptions thrown using jexception have no exception numbers
+* Investigate ability of store to detect missing journal files, especially from logical end of a journal
+* Investigate ability of store to handle file muddle-ups (ie journal files from EFP which are not zeroed or other journals)
+* Look at improving the efficiency of recovery - right now the entire store is read once, and then each recovered record xid and data is read again
 
 Code tidy-up
 ------------

Modified: qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/linearstore/MessageStoreImpl.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/linearstore/MessageStoreImpl.cpp?rev=1560634&r1=1560633&r2=1560634&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/linearstore/MessageStoreImpl.cpp (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/linearstore/MessageStoreImpl.cpp Thu Jan 23 11:01:02 2014
@@ -66,7 +66,13 @@ MessageStoreImpl::MessageStoreImpl(qpid:
                                    jrnlLog(qpid::linearstore::journal::JournalLog::LOG_NOTICE),
                                    mgmtObject(),
                                    agent(0)
-{}
+{
+    // Test of values for QLS_RAND_SHIFT1, QLS_RAND_SHIFT2 and QLS_RAND_MASK
+    if((((uint64_t)RAND_MAX << QLS_RAND_SHIFT1) ^ ((uint64_t)RAND_MAX << QLS_RAND_SHIFT2) ^ (RAND_MAX & QLS_RAND_MASK)) != 0xffffffffffffffffULL) {
+        THROW_STORE_EXCEPTION("[linearstore] 64-bit random number generation alignment error");
+    }
+    ::srand(::time(NULL));
+}
 
 uint32_t MessageStoreImpl::chkJrnlWrPageCacheSize(const uint32_t param_, const std::string& paramName_)
 {

Modified: qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/linearstore/journal/Checksum.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/linearstore/journal/Checksum.cpp?rev=1560634&r1=1560633&r2=1560634&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/linearstore/journal/Checksum.cpp (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/linearstore/journal/Checksum.cpp Thu Jan 23 11:01:02 2014
@@ -30,9 +30,11 @@ Checksum::Checksum() : a(1UL), b(0UL), M
 Checksum::~Checksum() {}
 
 void Checksum::addData(const unsigned char* data, const std::size_t len) {
-    for (uint32_t i = 0; i < len; i++) {
-        a = (a + data[i]) % MOD_ADLER;
-        b = (a + b) % MOD_ADLER;
+    if (data) {
+        for (uint32_t i = 0; i < len; i++) {
+            a = (a + data[i]) % MOD_ADLER;
+            b = (a + b) % MOD_ADLER;
+        }
     }
 }
 

Modified: qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/linearstore/journal/EmptyFilePool.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/linearstore/journal/EmptyFilePool.cpp?rev=1560634&r1=1560633&r2=1560634&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/linearstore/journal/EmptyFilePool.cpp (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/linearstore/journal/EmptyFilePool.cpp Thu Jan 23 11:01:02 2014
@@ -28,9 +28,9 @@
 #include "qpid/linearstore/journal/JournalLog.h"
 #include "qpid/linearstore/journal/slock.h"
 #include "qpid/linearstore/journal/utils/file_hdr.h"
+#include "qpid/sys/uuid.h"
 #include <sys/stat.h>
 #include <unistd.h>
-#include <uuid/uuid.h>
 #include <vector>
 
 //#include <iostream> // DEBUG

Modified: qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/linearstore/journal/JournalFile.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/linearstore/journal/JournalFile.cpp?rev=1560634&r1=1560633&r2=1560634&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/linearstore/journal/JournalFile.cpp (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/linearstore/journal/JournalFile.cpp Thu Jan 23 11:01:02 2014
@@ -279,18 +279,8 @@ const std::string JournalFile::getFileNa
 
 //static
 uint64_t JournalFile::getRandom64() {
-    int randomData = ::open("/dev/random", O_RDONLY);
-    if (randomData < 0) {
-        throw jexception(); // TODO: Complete exception details
-    }
-    uint64_t randomNumber;
-    ::size_t size = sizeof(randomNumber);
-    ::ssize_t result = ::read(randomData, (char*)&randomNumber, size);
-    if (result < 0 || result != ssize_t(size)) {
-        throw jexception(); // TODO: Complete exception details
-    }
-    ::close(randomData);
-    return randomNumber;
+    // TODO: ::rand() is not thread safe, either lock or use rand_r(seed) with a thread-local seed.
+    return ((uint64_t)::rand() << QLS_RAND_SHIFT1) | ((uint64_t)::rand() << QLS_RAND_SHIFT2) | (::rand() & QLS_RAND_MASK);
 }
 
 bool JournalFile::isOpen() const {

Modified: qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/linearstore/journal/RecoveryManager.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/linearstore/journal/RecoveryManager.cpp?rev=1560634&r1=1560633&r2=1560634&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/linearstore/journal/RecoveryManager.cpp (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/linearstore/journal/RecoveryManager.cpp Thu Jan 23 11:01:02 2014
@@ -221,28 +221,34 @@ bool RecoveryManager::readNextRemainingR
 
     // Check enqueue record checksum
     Checksum checksum;
-    checksum.addData((unsigned char*)&enqueueHeader, sizeof(::enq_hdr_t));
+    checksum.addData((const unsigned char*)&enqueueHeader, sizeof(::enq_hdr_t));
     if (xidSize > 0) {
-        checksum.addData((unsigned char*)*xidPtrPtr, xidSize);
+        checksum.addData((const unsigned char*)*xidPtrPtr, xidSize);
     }
     if (dataSize > 0) {
-        checksum.addData((unsigned char*)*dataPtrPtr, dataSize);
+        checksum.addData((const unsigned char*)*dataPtrPtr, dataSize);
     }
     ::rec_tail_t enqueueTail;
     inFileStream_.read((char*)&enqueueTail, sizeof(::rec_tail_t));
     uint32_t cs = checksum.getChecksum();
 //std::cout << std::hex << "### rid=0x" << enqueueHeader._rhdr._rid << " rtcs=0x" << enqueueTail._checksum << " cs=0x" << cs << std::dec << std::endl; // DEBUG
-    int res = ::rec_tail_check(&enqueueTail, &enqueueHeader._rhdr, cs);
+    uint16_t res = ::rec_tail_check(&enqueueTail, &enqueueHeader._rhdr, cs);
     if (res != 0) {
         std::stringstream oss;
-        switch (res) {
-          case 1: oss << std::hex << "Magic: expected 0x" << ~enqueueHeader._rhdr._magic << "; found 0x" << enqueueTail._xmagic; break;
-          case 2: oss << std::hex << "Serial: expected 0x" << enqueueHeader._rhdr._serial << "; found 0x" << enqueueTail._serial; break;
-          case 3: oss << std::hex << "Record Id: expected 0x" << enqueueHeader._rhdr._rid << "; found 0x" << enqueueTail._rid; break;
-          case 4: oss << std::hex << "Checksum: expected 0x" << cs << "; found 0x" << enqueueTail._checksum; break;
-          default: oss << "Unknown error " << res;
+        oss << "Bad record tail:" << std::hex;
+        if (res & ::REC_TAIL_MAGIC_ERR_MASK) {
+            oss << std::endl << "  Magic: expected 0x" << ~enqueueHeader._rhdr._magic << "; found 0x" << enqueueTail._xmagic;
         }
-        throw jexception(jerrno::JERR_JREC_BADRECTAIL, oss.str(), "enq_rec", "decode"); // TODO: Don't throw exception, log info
+        if (res & ::REC_TAIL_SERIAL_ERR_MASK) {
+            oss << std::endl << "  Serial: expected 0x" << enqueueHeader._rhdr._serial << "; found 0x" << enqueueTail._serial;
+        }
+        if (res & ::REC_TAIL_RID_ERR_MASK) {
+            oss << std::endl << "  Record Id: expected 0x" << enqueueHeader._rhdr._rid << "; found 0x" << enqueueTail._rid;
+        }
+        if (res & ::REC_TAIL_CHECKSUM_ERR_MASK) {
+            oss << std::endl << "  Checksum: expected 0x" << cs << "; found 0x" << enqueueTail._checksum;
+        }
+        throw jexception(jerrno::JERR_JREC_BADRECTAIL, oss.str(), "RecoveryManager", "readNextRemainingRecord"); // TODO: Don't throw exception, log info
     }
 
     // Set data token
@@ -472,7 +478,13 @@ bool RecoveryManager::decodeRecord(jrec&
             done = record.decode(headerRecord, &inFileStream_, cumulativeSizeRead);
         }
         catch (const jexception& e) {
-            journalLogRef_.log(JournalLog::LOG_INFO, queueName_, e.what());
+            if (e.err_code() == jerrno::JERR_JREC_BADRECTAIL) {
+                std::ostringstream oss;
+                oss << jerrno::err_msg(e.err_code()) << e.additional_info();
+                journalLogRef_.log(JournalLog::LOG_INFO, queueName_, oss.str());
+            } else {
+                journalLogRef_.log(JournalLog::LOG_INFO, queueName_, e.what());
+            }
             checkJournalAlignment(start_file_offs);
             return false;
         }
@@ -602,7 +614,6 @@ bool RecoveryManager::getNextRecordHeade
                             oss << std::hex << "_tmap.set_aio_compl: txn_enq xid=\"" << xid << "\" rid=0x" << h._rid;
                             throw jexception(jerrno::JERR_MAP_NOTFOUND, oss.str(), "RecoveryManager", "getNextRecordHeader");
                         }
-                        std::free(xidp);
                     } else {
                         if (enqueueMapRef_.insert_pfid(h._rid, start_fid, file_pos) < enq_map::EMAP_OK) { // fail
                             // The only error code emap::insert_pfid() returns is enq_map::EMAP_DUP_RID.
@@ -641,7 +652,6 @@ bool RecoveryManager::getNextRecordHeade
                         oss << std::hex << "_tmap.set_aio_compl: txn_deq xid=\"" << xid << "\" rid=0x" << dr.rid();
                         throw jexception(jerrno::JERR_MAP_NOTFOUND, oss.str(), "RecoveryManager", "getNextRecordHeader");
                     }
-                    std::free(xidp);
                 } else {
                     uint64_t enq_fid;
                     if (enqueueMapRef_.get_remove_pfid(dr.deq_rid(), enq_fid, true) == enq_map::EMAP_OK) { // ignore not found error
@@ -675,7 +685,6 @@ bool RecoveryManager::getNextRecordHeade
                         enqueueMapRef_.unlock(itr->drid_); // ignore not found error
                     }
                 }
-                std::free(xidp);
             }
             break;
         case QLS_TXC_MAGIC:
@@ -711,7 +720,6 @@ bool RecoveryManager::getNextRecordHeade
                             fileNumberMap_[enq_fid]->decrEnqueuedRecordCount();
                     }
                 }
-                std::free(xidp);
             }
             break;
         case QLS_EMPTY_MAGIC:

Modified: qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/linearstore/journal/deq_rec.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/linearstore/journal/deq_rec.cpp?rev=1560634&r1=1560633&r2=1560634&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/linearstore/journal/deq_rec.cpp (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/linearstore/journal/deq_rec.cpp Thu Jan 23 11:01:02 2014
@@ -32,7 +32,7 @@ namespace journal {
 
 deq_rec::deq_rec():
         _xidp(0),
-        _buff(0)
+        _xid_buff(0)
 {
     ::deq_hdr_init(&_deq_hdr, QLS_DEQ_MAGIC, QLS_JRNL_VERSION, 0, 0, 0, 0, 0);
     ::rec_tail_copy(&_deq_tail, &_deq_hdr._rhdr, 0);
@@ -53,7 +53,7 @@ deq_rec::reset(const uint64_t serial, co
     _deq_hdr._deq_rid = drid;
     _deq_hdr._xidsize = xidlen;
     _xidp = xidp;
-    _buff = 0;
+    _xid_buff = 0;
     _deq_tail._serial = serial;
     _deq_tail._rid = rid;
     _deq_tail._checksum = 0UL;
@@ -192,15 +192,15 @@ deq_rec::decode(::rec_hdr_t& h, std::ifs
         // Read header, allocate (if req'd) for xid
         if (_deq_hdr._xidsize)
         {
-            _buff = std::malloc(_deq_hdr._xidsize);
-            MALLOC_CHK(_buff, "_buff", "enq_rec", "rcv_decode");
+            _xid_buff = std::malloc(_deq_hdr._xidsize);
+            MALLOC_CHK(_xid_buff, "_buff", "enq_rec", "rcv_decode");
         }
     }
     if (rec_offs < sizeof(_deq_hdr) + _deq_hdr._xidsize)
     {
         // Read xid (or continue reading xid)
         std::size_t offs = rec_offs - sizeof(_deq_hdr);
-        ifsp->read((char*)_buff + offs, _deq_hdr._xidsize - offs);
+        ifsp->read((char*)_xid_buff + offs, _deq_hdr._xidsize - offs);
         std::size_t size_read = ifsp->gcount();
         rec_offs += size_read;
         if (size_read < _deq_hdr._xidsize - offs)
@@ -228,39 +228,22 @@ deq_rec::decode(::rec_hdr_t& h, std::ifs
             assert(!ifsp->fail() && !ifsp->bad());
             return false;
         }
+        check_rec_tail();
     }
     ifsp->ignore(rec_size_dblks() * QLS_DBLK_SIZE_BYTES - rec_size());
     assert(!ifsp->fail() && !ifsp->bad());
-    if (_deq_hdr._xidsize) {
-        Checksum checksum;
-        checksum.addData((unsigned char*)&_deq_hdr, sizeof(_deq_hdr));
-        checksum.addData((unsigned char*)_buff, _deq_hdr._xidsize);
-        uint32_t cs = checksum.getChecksum();
-        int res = ::rec_tail_check(&_deq_tail, &_deq_hdr._rhdr, cs);
-        if (res != 0) {
-            std::stringstream oss;
-            switch (res) {
-              case 1: oss << std::hex << "Magic: expected 0x" << ~_deq_hdr._rhdr._magic << "; found 0x" << _deq_tail._xmagic; break;
-              case 2: oss << std::hex << "Serial: expected 0x" << _deq_hdr._rhdr._serial << "; found 0x" << _deq_tail._serial; break;
-              case 3: oss << std::hex << "Record Id: expected 0x" << _deq_hdr._rhdr._rid << "; found 0x" << _deq_tail._rid; break;
-              case 4: oss << std::hex << "Checksum: expected 0x" << cs << "; found 0x" << _deq_tail._checksum; break;
-              default: oss << "Unknown error " << res;
-            }
-            throw jexception(jerrno::JERR_JREC_BADRECTAIL, oss.str(), "deq_rec", "decode"); // TODO: Don't throw exception, log info
-        }
-    }
     return true;
 }
 
 std::size_t
 deq_rec::get_xid(void** const xidpp)
 {
-    if (!_buff)
+    if (!_xid_buff)
     {
         *xidpp = 0;
         return 0;
     }
-    *xidpp = _buff;
+    *xidpp = _xid_buff;
     return _deq_hdr._xidsize;
 }
 
@@ -291,9 +274,40 @@ deq_rec::rec_size() const
 }
 
 void
+deq_rec::check_rec_tail() const {
+    Checksum checksum;
+    checksum.addData((const unsigned char*)&_deq_hdr, sizeof(::deq_hdr_t));
+    if (_deq_hdr._xidsize > 0) {
+        checksum.addData((const unsigned char*)_xid_buff, _deq_hdr._xidsize);
+    }
+    uint32_t cs = checksum.getChecksum();
+    uint16_t res = ::rec_tail_check(&_deq_tail, &_deq_hdr._rhdr, cs);
+    if (res != 0) {
+        std::stringstream oss;
+        oss << std::hex;
+        if (res & ::REC_TAIL_MAGIC_ERR_MASK) {
+            oss << std::endl << "  Magic: expected 0x" << ~_deq_hdr._rhdr._magic << "; found 0x" << _deq_tail._xmagic;
+        }
+        if (res & ::REC_TAIL_SERIAL_ERR_MASK) {
+            oss << std::endl << "  Serial: expected 0x" << _deq_hdr._rhdr._serial << "; found 0x" << _deq_tail._serial;
+        }
+        if (res & ::REC_TAIL_RID_ERR_MASK) {
+            oss << std::endl << "  Record Id: expected 0x" << _deq_hdr._rhdr._rid << "; found 0x" << _deq_tail._rid;
+        }
+        if (res & ::REC_TAIL_CHECKSUM_ERR_MASK) {
+            oss << std::endl << "  Checksum: expected 0x" << cs << "; found 0x" << _deq_tail._checksum;
+        }
+        throw jexception(jerrno::JERR_JREC_BADRECTAIL, oss.str(), "deq_rec", "check_rec_tail");
+    }
+}
+
+void
 deq_rec::clean()
 {
-    // clean up allocated memory here
+    if (_xid_buff) {
+        std::free(_xid_buff);
+        _xid_buff = 0;
+    }
 }
 
 }}}

Modified: qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/linearstore/journal/deq_rec.h
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/linearstore/journal/deq_rec.h?rev=1560634&r1=1560633&r2=1560634&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/linearstore/journal/deq_rec.h (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/linearstore/journal/deq_rec.h Thu Jan 23 11:01:02 2014
@@ -39,7 +39,7 @@ class deq_rec : public jrec
 private:
     ::deq_hdr_t _deq_hdr;   ///< Local instance of dequeue header struct
     const void* _xidp;      ///< xid pointer for encoding (writing to disk)
-    void* _buff;            ///< Pointer to buffer to receive data read from disk
+    void* _xid_buff;        ///< Pointer to buffer to receive xid read from disk
     ::rec_tail_t _deq_tail; ///< Local instance of enqueue tail struct, only encoded if XID is present
 
 public:
@@ -59,6 +59,7 @@ public:
     inline std::size_t data_size() const { return 0; } // This record never carries data
     std::size_t xid_size() const;
     std::size_t rec_size() const;
+    void check_rec_tail() const;
 
 private:
     virtual void clean();

Modified: qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/linearstore/journal/enq_rec.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/linearstore/journal/enq_rec.cpp?rev=1560634&r1=1560633&r2=1560634&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/linearstore/journal/enq_rec.cpp (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/linearstore/journal/enq_rec.cpp Thu Jan 23 11:01:02 2014
@@ -34,7 +34,8 @@ enq_rec::enq_rec():
         jrec(), // superclass
         _xidp(0),
         _data(0),
-        _buff(0)
+        _xid_buff(0),
+        _data_buff(0)
 {
     ::enq_hdr_init(&_enq_hdr, QLS_ENQ_MAGIC, QLS_JRNL_VERSION, 0, 0, 0, 0, false);
     ::rec_tail_copy(&_enq_tail, &_enq_hdr._rhdr, 0);
@@ -57,7 +58,6 @@ enq_rec::reset(const uint64_t serial, co
     _enq_hdr._dsize = dlen;
     _xidp = xidp;
     _data = dbuf;
-    _buff = 0;
     _enq_tail._serial = serial;
     _enq_tail._rid = rid;
 }
@@ -229,15 +229,20 @@ enq_rec::decode(::rec_hdr_t& h, std::ifs
         rec_offs = sizeof(::enq_hdr_t);
         if (_enq_hdr._xidsize > 0)
         {
-            _buff = std::malloc(_enq_hdr._xidsize);
-            MALLOC_CHK(_buff, "_buff", "enq_rec", "rcv_decode");
+            _xid_buff = std::malloc(_enq_hdr._xidsize);
+            MALLOC_CHK(_xid_buff, "_xid_buff", "enq_rec", "decode");
+        }
+        if (_enq_hdr._dsize > 0)
+        {
+            _data_buff = std::malloc(_enq_hdr._dsize);
+            MALLOC_CHK(_data_buff, "_data_buff", "enq_rec", "decode")
         }
     }
     if (rec_offs < sizeof(_enq_hdr) + _enq_hdr._xidsize)
     {
         // Read xid (or continue reading xid)
         std::size_t offs = rec_offs - sizeof(_enq_hdr);
-        ifsp->read((char*)_buff + offs, _enq_hdr._xidsize - offs);
+        ifsp->read((char*)_xid_buff + offs, _enq_hdr._xidsize - offs);
         std::size_t size_read = ifsp->gcount();
         rec_offs += size_read;
         if (size_read < _enq_hdr._xidsize - offs)
@@ -253,9 +258,9 @@ enq_rec::decode(::rec_hdr_t& h, std::ifs
     {
         if (rec_offs < sizeof(_enq_hdr) + _enq_hdr._xidsize +  _enq_hdr._dsize)
         {
-            // Ignore data (or continue ignoring data)
+            // Read data (or continue reading data)
             std::size_t offs = rec_offs - sizeof(_enq_hdr) - _enq_hdr._xidsize;
-            ifsp->ignore(_enq_hdr._dsize - offs);
+            ifsp->read((char*)_data_buff + offs, _enq_hdr._dsize - offs);
             std::size_t size_read = ifsp->gcount();
             rec_offs += size_read;
             if (size_read < _enq_hdr._dsize - offs)
@@ -286,6 +291,7 @@ enq_rec::decode(::rec_hdr_t& h, std::ifs
             assert(!ifsp->fail() && !ifsp->bad());
             return false;
         }
+        check_rec_tail();
     }
     ifsp->ignore(rec_size_dblks() * QLS_DBLK_SIZE_BYTES - rec_size());
     assert(!ifsp->fail() && !ifsp->bad());
@@ -295,27 +301,25 @@ enq_rec::decode(::rec_hdr_t& h, std::ifs
 std::size_t
 enq_rec::get_xid(void** const xidpp)
 {
-    if (!_buff || !_enq_hdr._xidsize)
-    {
+    if (!_xid_buff || !_enq_hdr._xidsize) {
         *xidpp = 0;
         return 0;
     }
-    *xidpp = _buff;
+    *xidpp = _xid_buff;
     return _enq_hdr._xidsize;
 }
 
 std::size_t
 enq_rec::get_data(void** const datapp)
 {
-    if (!_buff)
-    {
+    if (!_data_buff) {
         *datapp = 0;
         return 0;
     }
     if (::is_enq_external(&_enq_hdr))
         *datapp = 0;
     else
-        *datapp = (void*)((char*)_buff + _enq_hdr._xidsize);
+        *datapp = _data_buff;
     return _enq_hdr._dsize;
 }
 
@@ -348,9 +352,46 @@ enq_rec::rec_size(const std::size_t xids
 }
 
 void
-enq_rec::clean()
-{
-    // clean up allocated memory here
+enq_rec::check_rec_tail() const {
+    Checksum checksum;
+    checksum.addData((const unsigned char*)&_enq_hdr, sizeof(::enq_hdr_t));
+    if (_enq_hdr._xidsize > 0) {
+        checksum.addData((const unsigned char*)_xid_buff, _enq_hdr._xidsize);
+    }
+    if (_enq_hdr._dsize > 0) {
+        checksum.addData((const unsigned char*)_data_buff, _enq_hdr._dsize);
+    }
+    uint32_t cs = checksum.getChecksum();
+    uint16_t res = ::rec_tail_check(&_enq_tail, &_enq_hdr._rhdr, cs);
+    if (res != 0) {
+        std::stringstream oss;
+        oss << std::hex;
+        if (res & ::REC_TAIL_MAGIC_ERR_MASK) {
+            oss << std::endl << "  Magic: expected 0x" << ~_enq_hdr._rhdr._magic << "; found 0x" << _enq_tail._xmagic;
+        }
+        if (res & ::REC_TAIL_SERIAL_ERR_MASK) {
+            oss << std::endl << "  Serial: expected 0x" << _enq_hdr._rhdr._serial << "; found 0x" << _enq_tail._serial;
+        }
+        if (res & ::REC_TAIL_RID_ERR_MASK) {
+            oss << std::endl << "  Record Id: expected 0x" << _enq_hdr._rhdr._rid << "; found 0x" << _enq_tail._rid;
+        }
+        if (res & ::REC_TAIL_CHECKSUM_ERR_MASK) {
+            oss << std::endl << "  Checksum: expected 0x" << cs << "; found 0x" << _enq_tail._checksum;
+        }
+        throw jexception(jerrno::JERR_JREC_BADRECTAIL, oss.str(), "enq_rec", "check_rec_tail");
+    }
+}
+
+void
+enq_rec::clean() {
+    if (_xid_buff) {
+        std::free(_xid_buff);
+        _xid_buff = 0;
+    }
+    if (_data_buff) {
+        std::free(_data_buff);
+        _data_buff = 0;
+    }
 }
 
 }}}

Modified: qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/linearstore/journal/enq_rec.h
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/linearstore/journal/enq_rec.h?rev=1560634&r1=1560633&r2=1560634&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/linearstore/journal/enq_rec.h (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/linearstore/journal/enq_rec.h Thu Jan 23 11:01:02 2014
@@ -40,7 +40,8 @@ private:
     ::enq_hdr_t _enq_hdr;   ///< Local instance of enqueue header struct
     const void* _xidp;      ///< xid pointer for encoding (for writing to disk)
     const void* _data;      ///< Pointer to data to be written to disk
-    void* _buff;            ///< Pointer to buffer to receive data read from disk
+    void* _xid_buff;
+    void* _data_buff;
     ::rec_tail_t _enq_tail; ///< Local instance of enqueue tail struct
 
 public:
@@ -62,6 +63,7 @@ public:
     std::size_t rec_size() const;
     static std::size_t rec_size(const std::size_t xidsize, const std::size_t dsize, const bool external);
     inline uint64_t rid() const { return _enq_hdr._rhdr._rid; }
+    void check_rec_tail() const;
 
 private:
     virtual void clean();

Modified: qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/linearstore/journal/jcfg.h
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/linearstore/journal/jcfg.h?rev=1560634&r1=1560633&r2=1560634&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/linearstore/journal/jcfg.h (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/linearstore/journal/jcfg.h Thu Jan 23 11:01:02 2014
@@ -19,6 +19,9 @@
  *
  */
 
+#include <cmath>
+#include <cstdlib>
+
 #ifndef QPID_QLS_JRNL_JCFG_H
 #define QPID_QLS_JRNL_JCFG_H
 
@@ -55,4 +58,14 @@
 #define QLS_CLEAN                                   /**< If defined, writes QLS_CLEAN_CHAR to all filled areas on disk */
 #define QLS_CLEAN_CHAR                  0xff        /**< Char used to clear empty space on disk */
 
+namespace qpid {
+namespace linearstore {
+
+    const int QLS_RAND_WIDTH = (int)(::log((RAND_MAX + 1ULL))/::log(2));
+    const int QLS_RAND_SHIFT1 = 64 - QLS_RAND_WIDTH;
+    const int QLS_RAND_SHIFT2 = QLS_RAND_SHIFT1 - QLS_RAND_WIDTH;
+    const int QLS_RAND_MASK = (int)::pow(2, QLS_RAND_SHIFT2) - 1;
+
+}}
+
 #endif /* ifndef QPID_QLS_JRNL_JCFG_H */

Modified: qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/linearstore/journal/txn_rec.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/linearstore/journal/txn_rec.cpp?rev=1560634&r1=1560633&r2=1560634&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/linearstore/journal/txn_rec.cpp (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/linearstore/journal/txn_rec.cpp Thu Jan 23 11:01:02 2014
@@ -32,7 +32,7 @@ namespace journal {
 
 txn_rec::txn_rec():
         _xidp(0),
-        _buff(0)
+        _xid_buff(0)
 {
     ::txn_hdr_init(&_txn_hdr, 0, QLS_JRNL_VERSION, 0, 0, 0, 0);
     ::rec_tail_init(&_txn_tail, 0, 0, 0, 0);
@@ -52,7 +52,7 @@ txn_rec::reset(const bool commitFlag, co
     _txn_hdr._rhdr._rid = rid;
     _txn_hdr._xidsize = xidlen;
     _xidp = xidp;
-    _buff = 0;
+    _xid_buff = 0;
     _txn_tail._xmagic = ~_txn_hdr._rhdr._magic;
     _txn_tail._serial = serial;
     _txn_tail._rid = rid;
@@ -184,14 +184,14 @@ txn_rec::decode(::rec_hdr_t& h, std::ifs
         ::rec_hdr_copy(&_txn_hdr._rhdr, &h);
         ifsp->read((char*)&_txn_hdr._xidsize, sizeof(_txn_hdr._xidsize));
         rec_offs = sizeof(::txn_hdr_t);
-        _buff = std::malloc(_txn_hdr._xidsize);
-        MALLOC_CHK(_buff, "_buff", "txn_rec", "rcv_decode");
+        _xid_buff = std::malloc(_txn_hdr._xidsize);
+        MALLOC_CHK(_xid_buff, "_buff", "txn_rec", "rcv_decode");
     }
     if (rec_offs < sizeof(txn_hdr_t) + _txn_hdr._xidsize)
     {
         // Read xid (or continue reading xid)
         std::size_t offs = rec_offs - sizeof(txn_hdr_t);
-        ifsp->read((char*)_buff + offs, _txn_hdr._xidsize - offs);
+        ifsp->read((char*)_xid_buff + offs, _txn_hdr._xidsize - offs);
         std::size_t size_read = ifsp->gcount();
         rec_offs += size_read;
         if (size_read < _txn_hdr._xidsize - offs)
@@ -218,39 +218,23 @@ txn_rec::decode(::rec_hdr_t& h, std::ifs
             assert(!ifsp->fail() && !ifsp->bad());
             return false;
         }
+        check_rec_tail();
     }
     ifsp->ignore(rec_size_dblks() * QLS_DBLK_SIZE_BYTES - rec_size());
     assert(!ifsp->fail() && !ifsp->bad());
     assert(_txn_hdr._xidsize > 0);
-
-    Checksum checksum;
-    checksum.addData((unsigned char*)&_txn_hdr, sizeof(_txn_hdr));
-    checksum.addData((unsigned char*)_buff, _txn_hdr._xidsize);
-    uint32_t cs = checksum.getChecksum();
-    int res = ::rec_tail_check(&_txn_tail, &_txn_hdr._rhdr, cs);
-    if (res != 0) {
-        std::stringstream oss;
-        switch (res) {
-          case 1: oss << std::hex << "Magic: expected 0x" << ~_txn_hdr._rhdr._magic << "; found 0x" << _txn_tail._xmagic; break;
-          case 2: oss << std::hex << "Serial: expected 0x" << _txn_hdr._rhdr._serial << "; found 0x" << _txn_tail._serial; break;
-          case 3: oss << std::hex << "Record Id: expected 0x" << _txn_hdr._rhdr._rid << "; found 0x" << _txn_tail._rid; break;
-          case 4: oss << std::hex << "Checksum: expected 0x" << cs << "; found 0x" << _txn_tail._checksum; break;
-          default: oss << "Unknown error " << res;
-        }
-        throw jexception(jerrno::JERR_JREC_BADRECTAIL, oss.str(), "txn_rec", "decode"); // TODO: Don't throw exception, log info
-    }
     return true;
 }
 
 std::size_t
 txn_rec::get_xid(void** const xidpp)
 {
-    if (!_buff)
+    if (!_xid_buff)
     {
         *xidpp = 0;
         return 0;
     }
-    *xidpp = _buff;
+    *xidpp = _xid_buff;
     return _txn_hdr._xidsize;
 }
 
@@ -282,9 +266,40 @@ txn_rec::rec_size() const
 }
 
 void
+txn_rec::check_rec_tail() const {
+    Checksum checksum;
+    checksum.addData((const unsigned char*)&_txn_hdr, sizeof(::txn_hdr_t));
+    if (_txn_hdr._xidsize > 0) {
+        checksum.addData((const unsigned char*)_xid_buff, _txn_hdr._xidsize);
+    }
+    uint32_t cs = checksum.getChecksum();
+    uint16_t res = ::rec_tail_check(&_txn_tail, &_txn_hdr._rhdr, cs);
+    if (res != 0) {
+        std::stringstream oss;
+        oss << std::hex;
+        if (res & ::REC_TAIL_MAGIC_ERR_MASK) {
+            oss << std::endl << "  Magic: expected 0x" << ~_txn_hdr._rhdr._magic << "; found 0x" << _txn_tail._xmagic;
+        }
+        if (res & ::REC_TAIL_SERIAL_ERR_MASK) {
+            oss << std::endl << "  Serial: expected 0x" << _txn_hdr._rhdr._serial << "; found 0x" << _txn_tail._serial;
+        }
+        if (res & ::REC_TAIL_RID_ERR_MASK) {
+            oss << std::endl << "  Record Id: expected 0x" << _txn_hdr._rhdr._rid << "; found 0x" << _txn_tail._rid;
+        }
+        if (res & ::REC_TAIL_CHECKSUM_ERR_MASK) {
+            oss << std::endl << "  Checksum: expected 0x" << cs << "; found 0x" << _txn_tail._checksum;
+        }
+        throw jexception(jerrno::JERR_JREC_BADRECTAIL, oss.str(), "txn_rec", "check_rec_tail");
+    }
+}
+
+void
 txn_rec::clean()
 {
-    // clean up allocated memory here
+    if (_xid_buff) {
+        std::free(_xid_buff);
+        _xid_buff = 0;
+    }
 }
 
 }}}

Modified: qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/linearstore/journal/txn_rec.h
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/linearstore/journal/txn_rec.h?rev=1560634&r1=1560633&r2=1560634&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/linearstore/journal/txn_rec.h (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/linearstore/journal/txn_rec.h Thu Jan 23 11:01:02 2014
@@ -39,7 +39,7 @@ class txn_rec : public jrec
 private:
     ::txn_hdr_t _txn_hdr;   ///< Local instance of transaction header struct
     const void* _xidp;      ///< xid pointer for encoding (writing to disk)
-    void* _buff;            ///< Pointer to buffer to receive data read from disk
+    void* _xid_buff;        ///< Pointer to buffer to receive xid read from disk
     ::rec_tail_t _txn_tail; ///< Local instance of enqueue tail struct
 
 public:
@@ -57,6 +57,7 @@ public:
     std::size_t xid_size() const;
     std::size_t rec_size() const;
     inline uint64_t rid() const { return _txn_hdr._rhdr._rid; }
+    void check_rec_tail() const;
 
 private:
     virtual void clean();

Modified: qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/linearstore/journal/utils/file_hdr.c
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/linearstore/journal/utils/file_hdr.c?rev=1560634&r1=1560633&r2=1560634&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/linearstore/journal/utils/file_hdr.c (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/linearstore/journal/utils/file_hdr.c Thu Jan 23 11:01:02 2014
@@ -94,23 +94,6 @@ int is_file_hdr_reset(file_hdr_t* target
            target->_queue_name_len == 0;
 }
 
-/*
-uint64_t random_64() {
-    int randomData = open("/dev/random", O_RDONLY);
-    if (randomData < 0) {
-        return 0ULL;
-    }
-    uint64_t randomNumber;
-    size_t size = sizeof(randomNumber);
-    ssize_t result = read(randomData, (char*)&randomNumber, size);
-    if (result != size) {
-        randomNumber = 0ULL;
-    }
-    close(randomData);
-    return randomNumber;
-}
-*/
-
 int set_time_now(file_hdr_t *fh)
 {
     struct timespec ts;



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org