You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kw...@apache.org on 2014/01/23 12:01:08 UTC
svn commit: r1560634 [2/7] - in /qpid/branches/java-broker-bdb-ha: ./ qpid/
qpid/bin/ qpid/cpp/ qpid/cpp/bindings/qmf2/examples/cpp/
qpid/cpp/bindings/qpid/dotnet/src/ qpid/cpp/bindings/qpid/dotnet/src/msvc10/
qpid/cpp/bindings/qpid/dotnet/src/msvc9/ q...
Modified: qpid/branches/java-broker-bdb-ha/qpid/cpp/src/linearstore.cmake
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/cpp/src/linearstore.cmake?rev=1560634&r1=1560633&r2=1560634&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/cpp/src/linearstore.cmake (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/cpp/src/linearstore.cmake Thu Jan 23 11:01:02 2014
@@ -30,39 +30,22 @@ else (DEFINED linearstore_force)
#
include (finddb.cmake)
if (DB_FOUND)
- #
- # find libaio
- #
- CHECK_LIBRARY_EXISTS (aio io_queue_init "" HAVE_AIO)
- CHECK_INCLUDE_FILES (libaio.h HAVE_AIO_H)
- if (HAVE_AIO AND HAVE_AIO_H)
- #
- # find libuuid
- #
- CHECK_LIBRARY_EXISTS (uuid uuid_compare "" HAVE_UUID)
- CHECK_INCLUDE_FILES(uuid/uuid.h HAVE_UUID_H)
- if (HAVE_UUID AND HAVE_UUID_H)
- #
- # allow linearstore to be built
- #
- message(STATUS "BerkeleyDB for C++, libaio and uuid found, Linearstore support enabled")
- set (linearstore_default ON)
- else (HAVE_UUID AND HAVE_UUID_H)
- if (NOT HAVE_UUID)
- message(STATUS "Linearstore requires uuid which is absent.")
- endif (NOT HAVE_UUID)
- if (NOT HAVE_UUID_H)
- message(STATUS "Linearstore requires uuid.h which is absent.")
- endif (NOT HAVE_UUID_H)
- endif (HAVE_UUID AND HAVE_UUID_H)
- else (HAVE_AIO AND HAVE_AIO_H)
+ #
+ # find libaio
+ #
+ CHECK_LIBRARY_EXISTS (aio io_queue_init "" HAVE_AIO)
+ CHECK_INCLUDE_FILES (libaio.h HAVE_AIO_H)
+ if (HAVE_AIO AND HAVE_AIO_H)
+ message(STATUS "BerkeleyDB for C++ and libaio found, Linearstore support enabled")
+ set (linearstore_default ON)
+ else (HAVE_AIO AND HAVE_AIO_H)
if (NOT HAVE_AIO)
message(STATUS "Linearstore requires libaio which is absent.")
endif (NOT HAVE_AIO)
if (NOT HAVE_AIO_H)
message(STATUS "Linearstore requires libaio.h which is absent.")
endif (NOT HAVE_AIO_H)
- endif (HAVE_AIO AND HAVE_AIO_H)
+ endif (HAVE_AIO AND HAVE_AIO_H)
else (DB_FOUND)
message(STATUS "Linearstore requires BerkeleyDB for C++ which is absent.")
endif (DB_FOUND)
@@ -84,12 +67,6 @@ if (BUILD_LINEARSTORE)
if (NOT HAVE_AIO_H)
message(FATAL_ERROR "Linearstore requires libaio.h which is absent.")
endif (NOT HAVE_AIO_H)
- if (NOT HAVE_UUID)
- message(FATAL_ERROR "Linearstore requires uuid which is absent.")
- endif (NOT HAVE_UUID)
- if (NOT HAVE_UUID_H)
- message(FATAL_ERROR "Linearstore requires uuid.h which is absent.")
- endif (NOT HAVE_UUID_H)
# Journal source files
set (linear_jrnl_SOURCES
@@ -105,8 +82,8 @@ if (BUILD_LINEARSTORE)
qpid/linearstore/journal/jdir.cpp
qpid/linearstore/journal/jerrno.cpp
qpid/linearstore/journal/jexception.cpp
- qpid/linearstore/journal/JournalFile.cpp
- qpid/linearstore/journal/JournalLog.cpp
+ qpid/linearstore/journal/JournalFile.cpp
+ qpid/linearstore/journal/JournalLog.cpp
qpid/linearstore/journal/LinearFileController.cpp
qpid/linearstore/journal/pmgr.cpp
qpid/linearstore/journal/RecoveryManager.cpp
Modified: qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/amqp/descriptors.h
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/amqp/descriptors.h?rev=1560634&r1=1560633&r2=1560634&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/amqp/descriptors.h (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/amqp/descriptors.h Thu Jan 23 11:01:02 2014
@@ -109,6 +109,7 @@ const std::string NOT_FOUND("amqp:not-fo
const std::string UNAUTHORIZED_ACCESS("amqp:unauthorized-access");
const std::string DECODE_ERROR("amqp:decode-error");
const std::string NOT_ALLOWED("amqp:not-allowed");
+const std::string NOT_IMPLEMENTED("amqp:not-implemented");
const std::string RESOURCE_LIMIT_EXCEEDED("amqp:resource-limit-exceeded");
const std::string RESOURCE_DELETED("amqp:resource-deleted");
const std::string PRECONDITION_FAILED("amqp:precondition-failed");
Propchange: qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/broker/
------------------------------------------------------------------------------
Merged /qpid/trunk/qpid/cpp/src/qpid/broker:r1558037-1560619
Modified: qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/broker/Exchange.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/broker/Exchange.cpp?rev=1560634&r1=1560633&r2=1560634&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/broker/Exchange.cpp (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/broker/Exchange.cpp Thu Jan 23 11:01:02 2014
@@ -476,7 +476,7 @@ void Exchange::destroy()
deletionListeners.swap(copy);
}
for (std::map<std::string, boost::function0<void> >::iterator i = copy.begin(); i != copy.end(); ++i) {
- QPID_LOG(notice, "Exchange::destroy() notifying " << i->first);
+ QPID_LOG(debug, "Exchange::destroy() notifying " << i->first);
if (i->second) i->second();
}
}
Modified: qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/broker/PagedQueue.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/broker/PagedQueue.cpp?rev=1560634&r1=1560633&r2=1560634&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/broker/PagedQueue.cpp (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/broker/PagedQueue.cpp Thu Jan 23 11:01:02 2014
@@ -24,13 +24,18 @@
#include "qpid/broker/Message.h"
#include "qpid/log/Statement.h"
#include "qpid/framing/reply_exceptions.h"
+#include "qpid/sys/Time.h"
#include <string.h>
namespace qpid {
namespace broker {
namespace {
+using qpid::sys::AbsTime;
+using qpid::sys::Duration;
+using qpid::sys::EPOCH;
+using qpid::sys::FAR_FUTURE;
using qpid::sys::MemoryMappedFile;
-const uint32_t OVERHEAD(4/*content-size*/ + 4/*sequence-number*/ + 8/*persistence-id*/);
+const uint32_t OVERHEAD(4/*content-size*/ + 4/*sequence-number*/ + 8/*persistence-id*/ + 8/*expiration*/);
size_t encodedSize(const Message& msg)
{
@@ -46,35 +51,55 @@ size_t encode(const Message& msg, char*
buffer.putLong(encoded);
buffer.putLong(msg.getSequence());
buffer.putLongLong(msg.getPersistentContext()->getPersistenceId());
+ sys::AbsTime expiration = msg.getExpiration();
+ int64_t t(0);
+ if (expiration < FAR_FUTURE) {
+ t = Duration(EPOCH, expiration);
+ }
+ buffer.putLongLong(t);
msg.getPersistentContext()->encode(buffer);
assert(buffer.getPosition() == required);
return required;
}
-size_t decode(ProtocolRegistry& protocols, Message& msg, const char* data, size_t size)
+size_t decode(ProtocolRegistry& protocols, Message& msg, const char* data, size_t size,
+ boost::intrusive_ptr<ExpiryPolicy> expiryPolicy)
{
qpid::framing::Buffer metadata(const_cast<char*>(data), size);
uint32_t encoded = metadata.getLong();
uint32_t sequence = metadata.getLong();
uint64_t persistenceId = metadata.getLongLong();
+ int64_t t = metadata.getLongLong();
assert(metadata.available() >= encoded);
qpid::framing::Buffer buffer(const_cast<char*>(data) + metadata.getPosition(), encoded);
msg = protocols.decode(buffer);
assert(buffer.getPosition() == encoded);
msg.setSequence(qpid::framing::SequenceNumber(sequence));
msg.getPersistentContext()->setPersistenceId(persistenceId);
+ if (t) {
+ sys::AbsTime expiration(EPOCH, t);
+ msg.setExpiryPolicy(expiryPolicy);
+ msg.setExpiration(expiration);
+ }
return encoded + metadata.getPosition();
}
}
-PagedQueue::PagedQueue(const std::string& name_, const std::string& directory, uint m, uint factor, ProtocolRegistry& p)
- : name(name_), pageSize(file.getPageSize()*factor), maxLoaded(m), protocols(p), offset(0), loaded(0), version(0)
+PagedQueue::PagedQueue(const std::string& name_, const std::string& directory, uint m, uint factor, ProtocolRegistry& p,
+ boost::intrusive_ptr<ExpiryPolicy> e)
+ : name(name_), pageSize(file.getPageSize()*factor), maxLoaded(m), protocols(p), offset(0), loaded(0), version(0),
+ expiryPolicy(e)
{
path = file.open(name, directory);
QPID_LOG(debug, "PagedQueue[" << path << "]");
}
+PagedQueue::~PagedQueue()
+{
+ file.close(path);
+}
+
size_t PagedQueue::size()
{
size_t total(0);
@@ -294,7 +319,7 @@ Message* PagedQueue::Page::find(qpid::fr
//if it is the last in the page, decrement the hint count of the page
}
-void PagedQueue::Page::load(MemoryMappedFile& file, ProtocolRegistry& protocols)
+void PagedQueue::Page::load(MemoryMappedFile& file, ProtocolRegistry& protocols, boost::intrusive_ptr<ExpiryPolicy> expiryPolicy)
{
QPID_LOG(debug, "Page[" << offset << "]::load" << " used=" << used << ", size=" << size);
assert(region == 0);
@@ -308,7 +333,7 @@ void PagedQueue::Page::load(MemoryMapped
//decode messages into Page::messages
for (size_t i = 0; i < count; ++i) {
Message message;
- used += decode(protocols, message, region + used, size - used);
+ used += decode(protocols, message, region + used, size - used, expiryPolicy);
if (!contents.contains(message.getSequence())) {
message.setState(DELETED);
QPID_LOG(debug, "Setting state to deleted for message loaded at " << message.getSequence());
@@ -361,7 +386,7 @@ void PagedQueue::load(Page& page)
assert(i != used.rend());
unload(i->second);
}
- page.load(file, protocols);
+ page.load(file, protocols, expiryPolicy);
++loaded;
QPID_LOG(debug, "PagedQueue[" << path << "] loaded page, " << loaded << " pages now loaded");
}
Modified: qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/broker/PagedQueue.h
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/broker/PagedQueue.h?rev=1560634&r1=1560633&r2=1560634&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/broker/PagedQueue.h (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/broker/PagedQueue.h Thu Jan 23 11:01:02 2014
@@ -31,13 +31,16 @@
namespace qpid {
namespace broker {
+class ExpiryPolicy;
class ProtocolRegistry;
/**
*
*/
class PagedQueue : public Messages {
public:
- PagedQueue(const std::string& name, const std::string& directory, uint maxLoaded, uint pageFactor, ProtocolRegistry& protocols);
+ PagedQueue(const std::string& name, const std::string& directory, uint maxLoaded, uint pageFactor, ProtocolRegistry& protocols,
+ boost::intrusive_ptr<ExpiryPolicy>);
+ ~PagedQueue();
size_t size();
bool deleted(const QueueCursor&);
void publish(const Message& added);
@@ -59,7 +62,7 @@ class PagedQueue : public Messages {
bool add(const Message&);
Message* next(uint32_t version, QueueCursor&);
Message* find(qpid::framing::SequenceNumber);
- void load(qpid::sys::MemoryMappedFile&,ProtocolRegistry&);
+ void load(qpid::sys::MemoryMappedFile&,ProtocolRegistry&, boost::intrusive_ptr<ExpiryPolicy>);
void unload(qpid::sys::MemoryMappedFile&);
void clear(qpid::sys::MemoryMappedFile&);
size_t available() const;
@@ -86,6 +89,7 @@ class PagedQueue : public Messages {
std::list<Page> free;
uint loaded;
uint32_t version;
+ boost::intrusive_ptr<ExpiryPolicy> expiryPolicy;//needed on reload
void addPages(size_t count);
Page& newPage(qpid::framing::SequenceNumber);
Modified: qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/broker/QueueFactory.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/broker/QueueFactory.cpp?rev=1560634&r1=1560633&r2=1560634&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/broker/QueueFactory.cpp (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/broker/QueueFactory.cpp Thu Jan 23 11:01:02 2014
@@ -80,7 +80,7 @@ boost::shared_ptr<Queue> QueueFactory::c
queue->messages = std::auto_ptr<Messages>(new PagedQueue(name, broker->getPagingDirectoryPath(),
settings.maxPages ? settings.maxPages : 4,
settings.pageFactor ? settings.pageFactor : 1,
- broker->getProtocolRegistry()));
+ broker->getProtocolRegistry(), broker->getExpiryPolicy()));
}
} else if (settings.lvqKey.empty()) {//LVQ already handled above
queue->messages = std::auto_ptr<Messages>(new MessageDeque());
Modified: qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/broker/Selector.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/broker/Selector.cpp?rev=1560634&r1=1560633&r2=1560634&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/broker/Selector.cpp (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/broker/Selector.cpp Thu Jan 23 11:01:02 2014
@@ -30,6 +30,7 @@
#include "qpid/log/Statement.h"
#include "qpid/types/Variant.h"
+#include <stdexcept>
#include <string>
#include <sstream>
#include "qpid/sys/unordered_map.h"
Modified: qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/broker/System.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/broker/System.cpp?rev=1560634&r1=1560633&r2=1560634&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/broker/System.cpp (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/broker/System.cpp Thu Jan 23 11:01:02 2014
@@ -64,7 +64,7 @@ System::System (string _dataDir, Broker*
}
}
- mgmtObject = _qmf::System::shared_ptr(new _qmf::System(agent, this, types::Uuid(systemId.c_array())));
+ mgmtObject = _qmf::System::shared_ptr(new _qmf::System(agent, this, systemId));
qpid::sys::SystemInfo::getSystemId (osName,
nodeName,
release,
Modified: qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/broker/amqp/Connection.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/broker/amqp/Connection.cpp?rev=1560634&r1=1560633&r2=1560634&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/broker/amqp/Connection.cpp (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/broker/amqp/Connection.cpp Thu Jan 23 11:01:02 2014
@@ -70,7 +70,7 @@ Connection::Connection(qpid::sys::Output
: BrokerContext(b), ManagedConnection(getBroker(), i),
connection(pn_connection()),
transport(pn_transport()),
- out(o), id(i), haveOutput(true), closeInitiated(false)
+ out(o), id(i), haveOutput(true), closeInitiated(false), closeRequested(false)
{
if (pn_transport_bind(transport, connection)) {
//error
@@ -169,9 +169,22 @@ size_t Connection::encode(char* buffer,
bool Connection::canEncode()
{
if (!closeInitiated) {
+ if (closeRequested) {
+ close();
+ return true;
+ }
try {
- for (Sessions::iterator i = sessions.begin();i != sessions.end(); ++i) {
- if (i->second->dispatch()) haveOutput = true;
+ for (Sessions::iterator i = sessions.begin();i != sessions.end();) {
+ if (i->second->endedByManagement()) {
+ pn_session_close(i->first);
+ i->second->close();
+ sessions.erase(i++);
+ haveOutput = true;
+ QPID_LOG_CAT(debug, model, id << " session ended by management");
+ } else {
+ if (i->second->dispatch()) haveOutput = true;
+ ++i;
+ }
}
process();
} catch (const Exception& e) {
@@ -372,4 +385,10 @@ void Connection::setUserId(const std::st
throw Exception(qpid::amqp::error_conditions::RESOURCE_LIMIT_EXCEEDED, "User connection denied by configured limit");
}
}
+
+void Connection::closedByManagement()
+{
+ closeRequested = true;
+ out.activateOutput();
+}
}}} // namespace qpid::broker::amqp
Modified: qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/broker/amqp/Connection.h
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/broker/amqp/Connection.h?rev=1560634&r1=1560633&r2=1560634&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/broker/amqp/Connection.h (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/broker/amqp/Connection.h Thu Jan 23 11:01:02 2014
@@ -69,12 +69,14 @@ class Connection : public BrokerContext,
bool haveOutput;
Sessions sessions;
bool closeInitiated;
+ bool closeRequested;
virtual void process();
std::string getError();
void close();
void open();
void readPeerProperties();
+ void closedByManagement();
};
}}} // namespace qpid::broker::amqp
Modified: qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/broker/amqp/ManagedConnection.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/broker/amqp/ManagedConnection.cpp?rev=1560634&r1=1560633&r2=1560634&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/broker/amqp/ManagedConnection.cpp (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/broker/amqp/ManagedConnection.cpp Thu Jan 23 11:01:02 2014
@@ -19,6 +19,8 @@
*
*/
#include "qpid/broker/amqp/ManagedConnection.h"
+#include "qpid/broker/amqp/Exception.h"
+#include "qpid/amqp/descriptors.h"
#include "qpid/broker/Broker.h"
#include "qpid/management/ManagementAgent.h"
#include "qpid/log/Statement.h"
@@ -176,4 +178,34 @@ void ManagedConnection::incomingMessageR
if (connection) connection->inc_msgsFromClient();
}
+void ManagedConnection::closedByManagement()
+{
+ throw Exception(qpid::amqp::error_conditions::NOT_IMPLEMENTED, QPID_MSG(id << "Connection close requested, but not implemented"));
+}
+
+qpid::management::Manageable::status_t ManagedConnection::ManagementMethod(uint32_t methodId, qpid::management::Args&, std::string& error)
+{
+ qpid::management::Manageable::status_t status = qpid::management::Manageable::STATUS_UNKNOWN_METHOD;
+
+ try {
+ switch (methodId)
+ {
+ case _qmf::Connection::METHOD_CLOSE :
+ closedByManagement();
+ if (connection) connection->set_closing(true);
+ status = qpid::management::Manageable::STATUS_OK;
+ break;
+ }
+ } catch (const Exception& e) {
+ if (e.symbol() == qpid::amqp::error_conditions::NOT_IMPLEMENTED) {
+ status = qpid::management::Manageable::STATUS_NOT_IMPLEMENTED;
+ } else {
+ error = e.what();
+ status = qpid::management::Manageable::STATUS_EXCEPTION;
+ }
+ }
+
+ return status;
+}
+
}}} // namespace qpid::broker::amqp
Modified: qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/broker/amqp/ManagedConnection.h
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/broker/amqp/ManagedConnection.h?rev=1560634&r1=1560633&r2=1560634&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/broker/amqp/ManagedConnection.h (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/broker/amqp/ManagedConnection.h Thu Jan 23 11:01:02 2014
@@ -61,6 +61,11 @@ class ManagedConnection : public qpid::m
const std::map<std::string, types::Variant>& getClientProperties() const;
virtual bool isLink() const;
void opened();
+
+ qpid::management::Manageable::status_t ManagementMethod(uint32_t methodId, qpid::management::Args&, std::string&);
+
+ protected:
+ virtual void closedByManagement();
private:
const std::string id;
std::string userid;
Modified: qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/broker/amqp/ManagedSession.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/broker/amqp/ManagedSession.cpp?rev=1560634&r1=1560633&r2=1560634&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/broker/amqp/ManagedSession.cpp (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/broker/amqp/ManagedSession.cpp Thu Jan 23 11:01:02 2014
@@ -20,6 +20,8 @@
*/
#include "qpid/broker/amqp/ManagedSession.h"
#include "qpid/broker/amqp/ManagedConnection.h"
+#include "qpid/broker/amqp/Exception.h"
+#include "qpid/amqp/descriptors.h"
#include "qpid/broker/Broker.h"
#include "qpid/management/ManagementAgent.h"
#include "qpid/log/Statement.h"
@@ -89,4 +91,41 @@ ManagedConnection& ManagedSession::getPa
return parent;
}
+void ManagedSession::detachedByManagement()
+{
+ throw Exception(qpid::amqp::error_conditions::NOT_IMPLEMENTED, QPID_MSG(id << "Session detach requested, but not implemented"));
+}
+
+qpid::management::Manageable::status_t ManagedSession::ManagementMethod (uint32_t methodId,
+ qpid::management::Args& /*args*/,
+ std::string& error)
+{
+ qpid::management::Manageable::status_t status = qpid::management::Manageable::STATUS_UNKNOWN_METHOD;
+
+ try {
+ switch (methodId)
+ {
+ case _qmf::Session::METHOD_DETACH :
+ detachedByManagement();
+ status = qpid::management::Manageable::STATUS_OK;
+ break;
+
+ case _qmf::Session::METHOD_CLOSE :
+ case _qmf::Session::METHOD_SOLICITACK :
+ case _qmf::Session::METHOD_RESETLIFESPAN :
+ status = Manageable::STATUS_NOT_IMPLEMENTED;
+ break;
+ }
+ } catch (const Exception& e) {
+ if (e.symbol() == qpid::amqp::error_conditions::NOT_IMPLEMENTED) {
+ status = qpid::management::Manageable::STATUS_NOT_IMPLEMENTED;
+ } else {
+ error = e.what();
+ status = qpid::management::Manageable::STATUS_EXCEPTION;
+ }
+ }
+
+ return status;
+}
+
}}} // namespace qpid::broker::amqp
Modified: qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/broker/amqp/ManagedSession.h
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/broker/amqp/ManagedSession.h?rev=1560634&r1=1560633&r2=1560634&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/broker/amqp/ManagedSession.h (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/broker/amqp/ManagedSession.h Thu Jan 23 11:01:02 2014
@@ -48,6 +48,10 @@ class ManagedSession : public qpid::mana
void outgoingMessageAccepted();
void outgoingMessageRejected();
ManagedConnection& getParent();
+
+ qpid::management::Manageable::status_t ManagementMethod (uint32_t, qpid::management::Args&, std::string&);
+ protected:
+ virtual void detachedByManagement();
private:
ManagedConnection& parent;
const std::string id;
Modified: qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/broker/amqp/Session.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/broker/amqp/Session.cpp?rev=1560634&r1=1560633&r2=1560634&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/broker/amqp/Session.cpp (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/broker/amqp/Session.cpp Thu Jan 23 11:01:02 2014
@@ -202,7 +202,8 @@ class IncomingToExchange : public Decodi
Session::Session(pn_session_t* s, Connection& c, qpid::sys::OutputControl& o)
: ManagedSession(c.getBroker(), c, (boost::format("%1%") % s).str()), session(s), connection(c), out(o), deleted(false),
- authorise(connection.getUserId(), connection.getBroker().getAcl()) {}
+ authorise(connection.getUserId(), connection.getBroker().getAcl()),
+ detachRequested() {}
Session::ResolvedNode Session::resolve(const std::string name, pn_terminus_t* terminus, bool incoming)
@@ -689,6 +690,17 @@ Authorise& Session::getAuthorise()
return authorise;
}
+bool Session::endedByManagement() const
+{
+ return detachRequested;
+}
+
+void Session::detachedByManagement()
+{
+ detachRequested = true;
+ wakeup();
+}
+
void IncomingToQueue::handle(qpid::broker::Message& message)
{
if (queue->isDeleted()) {
Modified: qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/broker/amqp/Session.h
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/broker/amqp/Session.h?rev=1560634&r1=1560633&r2=1560634&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/broker/amqp/Session.h (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/broker/amqp/Session.h Thu Jan 23 11:01:02 2014
@@ -66,6 +66,7 @@ class Session : public ManagedSession, p
void readable(pn_link_t*, pn_delivery_t*);
void writable(pn_link_t*, pn_delivery_t*);
bool dispatch();
+ bool endedByManagement() const;
void close();
/**
@@ -79,6 +80,8 @@ class Session : public ManagedSession, p
void wakeup();
Authorise& getAuthorise();
+ protected:
+ void detachedByManagement();
private:
typedef std::map<pn_link_t*, boost::shared_ptr<Outgoing> > OutgoingLinks;
typedef std::map<pn_link_t*, boost::shared_ptr<Incoming> > IncomingLinks;
@@ -92,6 +95,7 @@ class Session : public ManagedSession, p
qpid::sys::Mutex lock;
std::set< boost::shared_ptr<Queue> > exclusiveQueues;
Authorise authorise;
+ bool detachRequested;
struct ResolvedNode
{
Modified: qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/broker/amqp/Topic.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/broker/amqp/Topic.cpp?rev=1560634&r1=1560633&r2=1560634&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/broker/amqp/Topic.cpp (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/broker/amqp/Topic.cpp Thu Jan 23 11:01:02 2014
@@ -163,7 +163,7 @@ bool TopicRegistry::add(boost::shared_pt
topics.insert(Topics::value_type(topic->getName(), topic));
return true;
} else {
- return false;
+ throw qpid::types::Exception(QPID_MSG("A topic named " << topic->getName() << " already exists"));
}
}
Modified: qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/broker/amqp_0_10/MessageTransfer.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/broker/amqp_0_10/MessageTransfer.cpp?rev=1560634&r1=1560633&r2=1560634&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/broker/amqp_0_10/MessageTransfer.cpp (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/broker/amqp_0_10/MessageTransfer.cpp Thu Jan 23 11:01:02 2014
@@ -79,7 +79,7 @@ amqp::MessageId MessageTransfer::getMess
amqp::MessageId r;
if (mp->hasMessageId()) {
- r.set(amqp::CharSequence::create(&mp->getMessageId()[0],16), types::VAR_UUID);
+ r.set(amqp::CharSequence::create(mp->getMessageId().data(),16), types::VAR_UUID);
}
return r;
}
Modified: qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/client/SslConnector.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/client/SslConnector.cpp?rev=1560634&r1=1560633&r2=1560634&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/client/SslConnector.cpp (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/client/SslConnector.cpp Thu Jan 23 11:01:02 2014
@@ -359,8 +359,10 @@ size_t SslConnector::decode(const char*
throw Exception(QPID_MSG("Unsupported version: " << protocolInit
<< " supported version " << version));
}
+ initiated = true;
+ } else {
+ return size - in.available();
}
- initiated = true;
}
AMQFrame frame;
while(frame.decode(in)){
Modified: qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/client/TCPConnector.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/client/TCPConnector.cpp?rev=1560634&r1=1560633&r2=1560634&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/client/TCPConnector.cpp (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/client/TCPConnector.cpp Thu Jan 23 11:01:02 2014
@@ -289,8 +289,10 @@ size_t TCPConnector::decode(const char*
throw Exception(QPID_MSG("Unsupported version: " << protocolInit
<< " supported version " << version));
}
+ initiated = true;
+ } else {
+ return size - in.available();
}
- initiated = true;
}
AMQFrame frame;
while(frame.decode(in)){
Modified: qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/framing/ProtocolVersion.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/framing/ProtocolVersion.cpp?rev=1560634&r1=1560633&r2=1560634&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/framing/ProtocolVersion.cpp (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/framing/ProtocolVersion.cpp Thu Jan 23 11:01:02 2014
@@ -26,7 +26,7 @@ using namespace qpid::framing;
const std::string ProtocolVersion::toString() const
{
std::stringstream ss;
- ss << major_ << "-" << minor_;
+ ss << unsigned(major_) << "-" << unsigned(minor_);
if (major_ == 1) {
if (protocol_ == SASL) ss << " (SASL)";
else if (protocol_ == TLS) ss << " (TLS)";
@@ -46,7 +46,7 @@ bool ProtocolVersion::operator==(Protoco
return major_ == p.major_ && minor_ == p.minor_;
}
-uint8_t ProtocolVersion::AMQP(0);
-uint8_t ProtocolVersion::LEGACY_AMQP(1);
-uint8_t ProtocolVersion::TLS(2);
-uint8_t ProtocolVersion::SASL(3);
+const uint8_t ProtocolVersion::AMQP(0);
+const uint8_t ProtocolVersion::LEGACY_AMQP(1);
+const uint8_t ProtocolVersion::TLS(2);
+const uint8_t ProtocolVersion::SASL(3);
Modified: qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/framing/ProtocolVersion.h
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/framing/ProtocolVersion.h?rev=1560634&r1=1560633&r2=1560634&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/framing/ProtocolVersion.h (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/framing/ProtocolVersion.h Thu Jan 23 11:01:02 2014
@@ -54,10 +54,10 @@ public:
QPID_COMMON_EXTERN bool operator==(ProtocolVersion p) const;
QPID_COMMON_INLINE_EXTERN bool operator!=(ProtocolVersion p) const { return ! (*this == p); }
- QPID_COMMON_EXTERN static uint8_t AMQP;
- QPID_COMMON_EXTERN static uint8_t LEGACY_AMQP;
- QPID_COMMON_EXTERN static uint8_t TLS;
- QPID_COMMON_EXTERN static uint8_t SASL;
+ QPID_COMMON_EXTERN static const uint8_t AMQP;
+ QPID_COMMON_EXTERN static const uint8_t LEGACY_AMQP;
+ QPID_COMMON_EXTERN static const uint8_t TLS;
+ QPID_COMMON_EXTERN static const uint8_t SASL;
};
} // namespace framing
Modified: qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/framing/Uuid.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/framing/Uuid.cpp?rev=1560634&r1=1560633&r2=1560634&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/framing/Uuid.cpp (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/framing/Uuid.cpp Thu Jan 23 11:01:02 2014
@@ -29,47 +29,13 @@ namespace framing {
using namespace std;
-static const size_t UNPARSED_SIZE=36;
-
-Uuid::Uuid(bool unique) {
- if (unique) {
- generate();
- } else {
- clear();
- }
-}
-
-Uuid::Uuid(const uint8_t* data) {
- assign(data);
-}
-
-Uuid::Uuid(const std::string& s) {
- if (s.size() != UNPARSED_SIZE)
- throw IllegalArgumentException(QPID_MSG("Invalid UUID: " << s));
- if (uuid_parse(const_cast<char *>(&s[0]), c_array()) != 0)
- throw IllegalArgumentException(QPID_MSG("Invalid UUID: " << s));
-}
-
-void Uuid::assign(const uint8_t* data) {
- // This const cast is for Solaris which has a
- // uuid_copy that takes a non const 2nd argument
- uuid_copy(c_array(), const_cast<uint8_t*>(data));
-}
-
-void Uuid::generate() {
- uuid_generate(c_array());
-}
-
-void Uuid::clear() {
- uuid_clear(c_array());
-}
-
-// Force int 0/!0 to false/true; avoids compile warnings.
-bool Uuid::isNull() const {
- // This const cast is for Solaris which has a
- // uuid_is_null that takes a non const argument
- return !!uuid_is_null(const_cast<uint8_t*>(data()));
-}
+Uuid::Uuid(bool unique):
+ qpid::types::Uuid(unique)
+{}
+
+Uuid::Uuid(const uint8_t* data):
+ qpid::types::Uuid(data)
+{}
void Uuid::encode(Buffer& buf) const {
buf.putRawData(data(), size());
@@ -78,29 +44,9 @@ void Uuid::encode(Buffer& buf) const {
void Uuid::decode(Buffer& buf) {
if (buf.available() < size())
throw IllegalArgumentException(QPID_MSG("Not enough data for UUID."));
- buf.getRawData(c_array(), size());
-}
-
-ostream& operator<<(ostream& out, Uuid uuid) {
- char unparsed[UNPARSED_SIZE + 1];
- uuid_unparse(uuid.data(), unparsed);
- return out << unparsed;
-}
-
-istream& operator>>(istream& in, Uuid& uuid) {
- char unparsed[UNPARSED_SIZE + 1] = {0};
- in.get(unparsed, sizeof(unparsed));
- if (!in.fail()) {
- if (uuid_parse(unparsed, uuid.c_array()) != 0)
- in.setstate(ios::failbit);
- }
- return in;
-}
-std::string Uuid::str() const {
- std::ostringstream os;
- os << *this;
- return os.str();
+ // Break qpid::types::Uuid encapsulation - Nasty, but efficient
+ buf.getRawData(const_cast<uint8_t*>(data()), size());
}
}} // namespace qpid::framing
Modified: qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/framing/Uuid.h
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/framing/Uuid.h?rev=1560634&r1=1560633&r2=1560634&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/framing/Uuid.h (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/framing/Uuid.h Thu Jan 23 11:01:02 2014
@@ -22,7 +22,7 @@
#include "qpid/CommonImportExport.h"
#include "qpid/sys/IntegerTypes.h"
-#include <boost/array.hpp>
+#include "qpid/types/Uuid.h"
#include <ostream>
#include <istream>
@@ -33,62 +33,25 @@ namespace framing {
class Buffer;
/**
- * A UUID is represented as a boost::array of 16 bytes.
- *
- * Full value semantics, operators ==, < etc. are provided by
- * boost::array so Uuid can be the key type in a map etc.
- *
- * TODO: change this implementation as it leaks boost into the
- * client API
+ * Framing UUID is now a thine wrapper around qpid::types::Uuid
*/
-struct Uuid : public boost::array<uint8_t, 16> {
+struct Uuid : public qpid::types::Uuid {
/** If unique is true, generate a unique ID else a null ID. */
QPID_COMMON_EXTERN Uuid(bool unique=false);
/** Copy from 16 bytes of data. */
QPID_COMMON_EXTERN Uuid(const uint8_t* data);
- /** Parse format 1b4e28ba-2fa1-11d2-883f-b9a761bde3fb. */
- QPID_COMMON_EXTERN Uuid(const std::string&);
-
- // Default op= and copy ctor are fine.
- // boost::array gives us ==, < etc.
-
- /** Copy from 16 bytes of data. */
- QPID_COMMON_EXTERN void assign(const uint8_t* data);
-
- /** Set to a new unique identifier. */
- QPID_COMMON_EXTERN void generate();
-
- /** Set to all zeros. */
- QPID_COMMON_EXTERN void clear();
-
- /** Test for null (all zeros). */
- QPID_COMMON_EXTERN bool isNull() const;
- QPID_COMMON_INLINE_EXTERN operator bool() const { return !isNull(); }
- QPID_COMMON_INLINE_EXTERN bool operator!() const { return isNull(); }
+ // We get most of our operations directly from qpid::types::Uuid
+ QPID_COMMON_INLINE_EXTERN static size_t size()
+ { return SIZE; }
QPID_COMMON_EXTERN void encode(framing::Buffer& buf) const;
QPID_COMMON_EXTERN void decode(framing::Buffer& buf);
QPID_COMMON_INLINE_EXTERN uint32_t encodedSize() const
- { return static_cast<uint32_t>(size()); }
-
- /** String value in format 1b4e28ba-2fa1-11d2-883f-b9a761bde3fb. */
- QPID_COMMON_EXTERN std::string str() const;
-
- template <class S> void serialize(S& s) {
- s.raw(begin(), size());
- }
+ { return size(); }
};
-/** Print in format 1b4e28ba-2fa1-11d2-883f-b9a761bde3fb. */
-QPID_COMMON_EXTERN std::ostream& operator<<(std::ostream&, Uuid);
-
-/** Read from format 1b4e28ba-2fa1-11d2-883f-b9a761bde3fb. */
-QPID_COMMON_EXTERN std::istream& operator>>(std::istream&, Uuid&);
-
}} // namespace qpid::framing
-
-
#endif /*!QPID_FRAMING_UUID_H*/
Modified: qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp?rev=1560634&r1=1560633&r2=1560634&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp Thu Jan 23 11:01:02 2014
@@ -863,6 +863,23 @@ bool BrokerReplicator::unbind(boost::sha
bool BrokerReplicator::isBound(boost::shared_ptr<Queue>, const string* const, const framing::FieldTable* const) { return false; }
bool BrokerReplicator::hasBindings() { return false; }
+// ConnectionObserver methods
+void BrokerReplicator::connection(broker::Connection&) {}
+void BrokerReplicator::opened(broker::Connection&) {}
+
+void BrokerReplicator::closed(broker::Connection& c) {
+ if (link && &c == connect) disconnected();
+}
+
+void BrokerReplicator::forced(broker::Connection& c, const std::string& message) {
+ if (link && &c == link->getConnection()) {
+ haBroker.shutdown(
+ QPID_MSG(logPrefix << "Connection forced, cluster may be misconfigured: "
+ << message));
+ }
+ closed(c);
+}
+
string BrokerReplicator::getType() const { return QPID_CONFIGURATION_REPLICATOR; }
void BrokerReplicator::disconnectedQueueReplicator(boost::shared_ptr<Exchange> ex) {
Modified: qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/ha/BrokerReplicator.h
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/ha/BrokerReplicator.h?rev=1560634&r1=1560633&r2=1560634&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/ha/BrokerReplicator.h (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/ha/BrokerReplicator.h Thu Jan 23 11:01:02 2014
@@ -90,10 +90,10 @@ class BrokerReplicator : public broker::
bool hasBindings();
// ConnectionObserver methods
- void connection(broker::Connection&) {}
- void opened(broker::Connection&) {}
- void closed(broker::Connection& c) { if (link && &c == connect) disconnected(); }
- void forced(broker::Connection& c, const std::string& /*message*/) { closed(c); }
+ void connection(broker::Connection&);
+ void opened(broker::Connection&);
+ void closed(broker::Connection&);
+ void forced(broker::Connection&, const std::string& /*message*/);
QueueReplicatorPtr findQueueReplicator(const std::string& qname);
Modified: qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/ha/Membership.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/ha/Membership.cpp?rev=1560634&r1=1560633&r2=1560634&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/ha/Membership.cpp (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/ha/Membership.cpp Thu Jan 23 11:01:02 2014
@@ -146,7 +146,7 @@ bool checkTransition(BrokerStatus from,
void Membership::update(Mutex::ScopedLock& l) {
QPID_LOG(info, "Membership: " << brokers);
-// Update managment and send update event.
+ // Update managment and send update event.
BrokerStatus newStatus = getStatus(l);
Variant::List brokerList = asList(l);
if (mgmtObject) {
Modified: qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/ha/Primary.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/ha/Primary.cpp?rev=1560634&r1=1560633&r2=1560634&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/ha/Primary.cpp (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/ha/Primary.cpp Thu Jan 23 11:01:02 2014
@@ -284,7 +284,7 @@ void Primary::exchangeCreate(const Excha
QPID_LOG(debug, logPrefix << "Created exchange " << ex->getName()
<< " replication: " << printable(level));
// Give each exchange a unique id to avoid confusion of same-named exchanges.
- args.set(QPID_HA_UUID, FieldTable::ValuePtr(new UuidValue(&Uuid(true)[0])));
+ args.set(QPID_HA_UUID, FieldTable::ValuePtr(new UuidValue(Uuid(true).data())));
}
ex->setArgs(args);
}
Modified: qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/ha/StatusCheck.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/ha/StatusCheck.cpp?rev=1560634&r1=1560633&r2=1560634&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/ha/StatusCheck.cpp (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/ha/StatusCheck.cpp Thu Jan 23 11:01:02 2014
@@ -57,20 +57,19 @@ void StatusCheckThread::run() {
try {
// Check for self connections
Variant::Map options, clientProperties;
- clientProperties[ConnectionObserver::ADMIN_TAG] = 1; // Allow connection to backups.
+ clientProperties[ConnectionObserver::ADMIN_TAG] = 1; // Allow connection to backups
clientProperties[ConnectionObserver::ADDRESS_TAG] = url.str();
- clientProperties[ConnectionObserver::BACKUP_TAG] = statusCheck.haBroker.getBrokerInfo().asMap();
+ clientProperties[ConnectionObserver::BACKUP_TAG] = statusCheck.brokerInfo.asMap();
// Set connection options
- Settings settings(statusCheck.haBroker.getSettings());
+ const Settings& settings = statusCheck.settings;
if (settings.username.size()) options["username"] = settings.username;
if (settings.password.size()) options["password"] = settings.password;
if (settings.mechanism.size()) options["sasl_mechanisms"] = settings.mechanism;
options["client-properties"] = clientProperties;
- sys::Duration heartbeat(statusCheck.haBroker.getBroker().getOptions().linkHeartbeatInterval);
- options["heartbeat"] = heartbeat/sys::TIME_SEC;
- c = Connection(url.str(), options);
+ options["heartbeat"] = statusCheck.heartbeat/sys::TIME_SEC;
+ c = Connection(url.str(), options);
c.open();
Session session = c.createSession();
messaging::Address responses("#;{create:always,node:{x-declare:{exclusive:True,auto-delete:True,arguments:{'qpid.replicate':none}}}}");
@@ -88,7 +87,7 @@ void StatusCheckThread::run() {
content["_object_id"] = oid;
encode(content, request);
s.send(request);
- messaging::Duration timeout(heartbeat/sys::TIME_MSEC);
+ messaging::Duration timeout(statusCheck.heartbeat/sys::TIME_MSEC);
Message response = r.fetch(timeout);
session.acknowledge();
Variant::List contentIn;
@@ -103,17 +102,18 @@ void StatusCheckThread::run() {
}
}
else
- QPID_LOG(error, logPrefix << "Invalid response " << response.getContent())
- } catch(const exception& error) {
- // Its not an error to fail to connect to self.
- if (statusCheck.haBroker.getBrokerInfo().getAddress() != url[0])
- QPID_LOG(warning, logPrefix << error.what());
- }
+ QPID_LOG(error, logPrefix << "Invalid response " << response.getContent());
+ } catch(...) {}
try { c.close(); } catch(...) {}
delete this;
}
-StatusCheck::StatusCheck(HaBroker& hb) : promote(true), haBroker(hb)
+// Note: Don't use hb outside of the constructor, it may be deleted.
+StatusCheck::StatusCheck(HaBroker& hb) :
+ promote(true),
+ settings(hb.getSettings()),
+ heartbeat(hb.getBroker().getOptions().linkHeartbeatInterval),
+ brokerInfo(hb.getBrokerInfo())
{}
StatusCheck::~StatusCheck() {
Modified: qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/ha/StatusCheck.h
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/ha/StatusCheck.h?rev=1560634&r1=1560633&r2=1560634&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/ha/StatusCheck.h (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/ha/StatusCheck.h Thu Jan 23 11:01:02 2014
@@ -65,7 +65,9 @@ class StatusCheck
sys::Mutex lock;
std::vector<sys::Thread> threads;
bool promote;
- HaBroker& haBroker;
+ const Settings settings;
+ const sys::Duration heartbeat;
+ const BrokerInfo brokerInfo;
friend class StatusCheckThread;
};
Modified: qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/linearstore/ISSUES
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/linearstore/ISSUES?rev=1560634&r1=1560633&r2=1560634&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/linearstore/ISSUES (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/linearstore/ISSUES Thu Jan 23 11:01:02 2014
@@ -17,52 +17,82 @@
# under the License.
#
-LinearStore issues:
+Linear Store issues:
-Store:
-------
-
-1. (FIXED) Overwrite identity: When recovering a previously used file, if the write boundary coincides with old record
- start, no way of discriminating old from new at boundary (used to use OWI).
-
-2. (FIXED) QPID-5357: Recycling files while in use not working, however, files are recovered to EFP during recovery. Must solve
- #1 first.
-
-3. (FIXED) QPID-5358: Checksum not implemented in record tail, not checked during read.
-
-4. QPID-5359: Rework qpid management parameters and controls (QMF).
-
-5. QPID-5360: Consistent logging: rework logging to provide uniform and consistent logging from store (both logging
- level and places where logging occurs).
-
-6. QPID-5361: No tests
- * No existing tests for linearstore:
- ** Basic broker-level tests for txn and non-txn recovery
- ** Store-level tests which check write boundary conditions
- ** Unit tests
- ** Basic performance tests
-
-7: QPID-5362: No tools
- * Store analysis and status
- * Recovery/reading of message content
-
-8. One journal file lost when queue deleted. All files except for one are recycled back to the EFP.
-
-9. Complete exceptions - several exceptions thrown using jexception have no exception numbers
-
-Current bugs and performance issues:
-------------------------------------
-1. BZ 1035843 - Slow performance for producers
-2. (FIXED) QPID-5387 (BZ 1036071) - Crash when deleting queue
-3. (FIXED) QPID-5388 (BZ 1035802) - Segmentation fault when recovering empty queue
-4. (UNABLE TO REPRODUCE) BZ 1036026 - Unable to create durable queue - framing error - possibly caused by running both stores at the same time
-5. (UNABLE TO REPRODUCE) BZ 1038599 - Abort when deleting used queue after restart - may be dup of QPID-5387 (BZ 1036071)
-6. BZ 1039522 - Crash during recovery - JournalFile::getFqFileName() -JERR_JREC_BADRECTAIL
-7. BZ 1039525 - Crash during recovery - journal::jexception - JERR_JREC_BADRECTAIL
-8. (FIXED) QPID-5442 (BZ 1039949) - DTX test failure - missing XIDs
-9. (FIXED) QPID-5460 (BZ 1051097) - Transactional messages lost during recovery
-10. QPID-5464 - Incompletely created journal files accumulate in EFP
-11. QPID-5473 (BZ 1051924) - Recovery where last record in file is truncated (ie spans files), but following file is uninitialized causes crash
+Current/pending:
+================
+ Q-JIRA RHBZ Description / Comments
+ ------ ------- ----------------------
+ 5359 - Linearstore: Implement new management schema and wire into store
+ 5360 - Linearstore: Evaluate and rework logging to produce a consistent log output
+ 5361 - Linearstore: No tests for linearstore functionality currently exist
+ * No existing tests for linearstore:
+ ** Basic broker-level tests for txn and non-txn recovery
+ ** Store-level tests which check write boundary conditions
+ ** EFP tests, including file recovery, error management
+ ** Unit tests
+ ** Basic performance tests
+ 5362 - Linearstore: No store tools exist for examining the journals
+ svn r.1558888 2014-01-09: WIP checkin for linearstore version of qpid_qls_analyze. Needs testing and tidy-up.
+ * Store analysis and status
+ * Recovery/reading of message content
+ * Empty file pool status and management
+ 5464 - [linearstore] Incompletely created journal files accumulate in EFP
+ 5479 1053701 [linearstore] Using recovered store results in "JERR_JNLF_FILEOFFSOVFL: Attempted to increase submitted offset past file size. (JournalFile::submittedDblkCount)" error message
+ * Probablilty: 2 of 600 (0.3%) using tx-test-soak.sh
+ 5480 1053749 [linearstore] Recovery of store failure with "JERR_MAP_NOTFOUND: Key not found in map." error message
+ * Probability: 6 of 600 (1.0%) using tx-test-soak.sh
+ * If broker is started a second time after failure, it starts correctly and test completes ok.
+ 5484 1035843 Slow performance for producers
+ svn r.1558592 fixes an issue with using /dev/random as a source of random numbers for Journal serial numbers.
+ - 1036026 [LinearStore] Qpid linear store unable to create durable queue - framing-error: Queue <q-name>: create() failed: jexception 0x0000
+ UNABLE TO REPRODUCE - but Frantizek has additional info
+ - 1039522 Qpid crashes while recovering from linear store around apid::linearstore::journal::JournalFile::getFqFileName() including enq_rec::decode() threw JERR_JREC_BAD_RECTAIL
+ * Possible dup of 1039525
+ * May be fixed by QPID-5483 - waiting for needinfo
+ - 1039525 Qpid crashes while recovering from linear store around apid::linearstore::journal::jexception::format including enq_rec::decode() threw JERR_JREC_BAD_REC_TAIL
+ * Possible dup of 1039522
+ * May be fixed by QPID-5483 - waiting for needinfo
+ 5487 - [linearstore] Replace use of /dev/urandom with c random generator calls
+
+Fixed/closed:
+=============
+ Q-JIRA RHBZ Description / Comments
+ ------ ------- ----------------------
+ 5357 1052518 Linearstore: Empty file recycling not functional
+ svn r.1545563 2013-11-26: Propsed fix
+ 5358 1052727 Linearstore: Checksums not implemented in record tail
+ svn r.1547601 2013-12-03: Propsed fix
+ 5387 1036071 Linearstore: Segmentation fault when deleting queue
+ svn r.1547641 2013-12-03: Propsed fix
+ 5388 1035802 Linearstore: Segmentation fault when recovering empty queue
+ svn r.1547921 2013-12-04: Propsed fix
+NO-JIRA - Added missing Apache copyright/license text
+ svn r.1551304 2013-12-16: Propsed fix
+ 5425 1052445 Linearstore: Transaction Prepared List (TPL) fails with jexception 0x0402 AtomicCounter::addLimit() threw JERR_JNLF_FILEOFFSOVFL
+ svn r.1551361 2013-12-16: Proposed fix
+ 5442 1039949 Linearstore: Dtx recover test fails
+ svn r.1552772 2013-12-20: Proposed fix
+ 5444 1052775 Linearstore: Recovering from qpid-txtest fails with "Inconsistent TPL 2PC count" error message
+ svn r.1553148 2013-12-23: Proposed fix
+ - 1038599 [LinearStore] Abort when deleting used queue after restart
+ CLOSED-NOTABUG 2014-01-06
+ 5460 1051097 [linearstore] Recovery of store which contains prepared but incomplete transactions results in message loss
+ svn r.1556892 2014-01-09: Proposed fix
+ 5473 1051924 [linearstore] Recovery of journal in which last logical file contains truncated record causes crash
+ svn r.1557620 2014-01-12: Proposed fix
+ 5483 - [linearstore] Recovery of journal with partly written record fails with "JERR_JREC_BADRECTAIL: Invalid data record tail" error message
+ svn r.1558589 2014-01-15: Proposed fix
+ * May be linked to RHBZ 1039522 - waiting for needinfo
+ * May be linked to RHBZ 1039525 - waiting for needinfo
+
+Future:
+=======
+* One journal file lost when queue deleted. All files except for one are recycled back to the EFP.
+* Complete exceptions - several exceptions thrown using jexception have no exception numbers
+* Investigate ability of store to detect missing journal files, especially from logical end of a journal
+* Investigate ability of store to handle file muddle-ups (ie journal files from EFP which are not zeroed or other journals)
+* Look at improving the efficiency of recovery - right now the entire store is read once, and then each recovered record xid and data is read again
Code tidy-up
------------
Modified: qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/linearstore/MessageStoreImpl.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/linearstore/MessageStoreImpl.cpp?rev=1560634&r1=1560633&r2=1560634&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/linearstore/MessageStoreImpl.cpp (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/linearstore/MessageStoreImpl.cpp Thu Jan 23 11:01:02 2014
@@ -66,7 +66,13 @@ MessageStoreImpl::MessageStoreImpl(qpid:
jrnlLog(qpid::linearstore::journal::JournalLog::LOG_NOTICE),
mgmtObject(),
agent(0)
-{}
+{
+ // Test of values for QLS_RAND_SHIFT1, QLS_RAND_SHIFT2 and QLS_RAND_MASK
+ if((((uint64_t)RAND_MAX << QLS_RAND_SHIFT1) ^ ((uint64_t)RAND_MAX << QLS_RAND_SHIFT2) ^ (RAND_MAX & QLS_RAND_MASK)) != 0xffffffffffffffffULL) {
+ THROW_STORE_EXCEPTION("[linearstore] 64-bit random number generation alignment error");
+ }
+ ::srand(::time(NULL));
+}
uint32_t MessageStoreImpl::chkJrnlWrPageCacheSize(const uint32_t param_, const std::string& paramName_)
{
Modified: qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/linearstore/journal/Checksum.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/linearstore/journal/Checksum.cpp?rev=1560634&r1=1560633&r2=1560634&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/linearstore/journal/Checksum.cpp (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/linearstore/journal/Checksum.cpp Thu Jan 23 11:01:02 2014
@@ -30,9 +30,11 @@ Checksum::Checksum() : a(1UL), b(0UL), M
Checksum::~Checksum() {}
void Checksum::addData(const unsigned char* data, const std::size_t len) {
- for (uint32_t i = 0; i < len; i++) {
- a = (a + data[i]) % MOD_ADLER;
- b = (a + b) % MOD_ADLER;
+ if (data) {
+ for (uint32_t i = 0; i < len; i++) {
+ a = (a + data[i]) % MOD_ADLER;
+ b = (a + b) % MOD_ADLER;
+ }
}
}
Modified: qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/linearstore/journal/EmptyFilePool.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/linearstore/journal/EmptyFilePool.cpp?rev=1560634&r1=1560633&r2=1560634&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/linearstore/journal/EmptyFilePool.cpp (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/linearstore/journal/EmptyFilePool.cpp Thu Jan 23 11:01:02 2014
@@ -28,9 +28,9 @@
#include "qpid/linearstore/journal/JournalLog.h"
#include "qpid/linearstore/journal/slock.h"
#include "qpid/linearstore/journal/utils/file_hdr.h"
+#include "qpid/sys/uuid.h"
#include <sys/stat.h>
#include <unistd.h>
-#include <uuid/uuid.h>
#include <vector>
//#include <iostream> // DEBUG
Modified: qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/linearstore/journal/JournalFile.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/linearstore/journal/JournalFile.cpp?rev=1560634&r1=1560633&r2=1560634&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/linearstore/journal/JournalFile.cpp (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/linearstore/journal/JournalFile.cpp Thu Jan 23 11:01:02 2014
@@ -279,18 +279,8 @@ const std::string JournalFile::getFileNa
//static
uint64_t JournalFile::getRandom64() {
- int randomData = ::open("/dev/random", O_RDONLY);
- if (randomData < 0) {
- throw jexception(); // TODO: Complete exception details
- }
- uint64_t randomNumber;
- ::size_t size = sizeof(randomNumber);
- ::ssize_t result = ::read(randomData, (char*)&randomNumber, size);
- if (result < 0 || result != ssize_t(size)) {
- throw jexception(); // TODO: Complete exception details
- }
- ::close(randomData);
- return randomNumber;
+ // TODO: ::rand() is not thread safe, either lock or use rand_r(seed) with a thread-local seed.
+ return ((uint64_t)::rand() << QLS_RAND_SHIFT1) | ((uint64_t)::rand() << QLS_RAND_SHIFT2) | (::rand() & QLS_RAND_MASK);
}
bool JournalFile::isOpen() const {
Modified: qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/linearstore/journal/RecoveryManager.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/linearstore/journal/RecoveryManager.cpp?rev=1560634&r1=1560633&r2=1560634&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/linearstore/journal/RecoveryManager.cpp (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/linearstore/journal/RecoveryManager.cpp Thu Jan 23 11:01:02 2014
@@ -221,28 +221,34 @@ bool RecoveryManager::readNextRemainingR
// Check enqueue record checksum
Checksum checksum;
- checksum.addData((unsigned char*)&enqueueHeader, sizeof(::enq_hdr_t));
+ checksum.addData((const unsigned char*)&enqueueHeader, sizeof(::enq_hdr_t));
if (xidSize > 0) {
- checksum.addData((unsigned char*)*xidPtrPtr, xidSize);
+ checksum.addData((const unsigned char*)*xidPtrPtr, xidSize);
}
if (dataSize > 0) {
- checksum.addData((unsigned char*)*dataPtrPtr, dataSize);
+ checksum.addData((const unsigned char*)*dataPtrPtr, dataSize);
}
::rec_tail_t enqueueTail;
inFileStream_.read((char*)&enqueueTail, sizeof(::rec_tail_t));
uint32_t cs = checksum.getChecksum();
//std::cout << std::hex << "### rid=0x" << enqueueHeader._rhdr._rid << " rtcs=0x" << enqueueTail._checksum << " cs=0x" << cs << std::dec << std::endl; // DEBUG
- int res = ::rec_tail_check(&enqueueTail, &enqueueHeader._rhdr, cs);
+ uint16_t res = ::rec_tail_check(&enqueueTail, &enqueueHeader._rhdr, cs);
if (res != 0) {
std::stringstream oss;
- switch (res) {
- case 1: oss << std::hex << "Magic: expected 0x" << ~enqueueHeader._rhdr._magic << "; found 0x" << enqueueTail._xmagic; break;
- case 2: oss << std::hex << "Serial: expected 0x" << enqueueHeader._rhdr._serial << "; found 0x" << enqueueTail._serial; break;
- case 3: oss << std::hex << "Record Id: expected 0x" << enqueueHeader._rhdr._rid << "; found 0x" << enqueueTail._rid; break;
- case 4: oss << std::hex << "Checksum: expected 0x" << cs << "; found 0x" << enqueueTail._checksum; break;
- default: oss << "Unknown error " << res;
+ oss << "Bad record tail:" << std::hex;
+ if (res & ::REC_TAIL_MAGIC_ERR_MASK) {
+ oss << std::endl << " Magic: expected 0x" << ~enqueueHeader._rhdr._magic << "; found 0x" << enqueueTail._xmagic;
}
- throw jexception(jerrno::JERR_JREC_BADRECTAIL, oss.str(), "enq_rec", "decode"); // TODO: Don't throw exception, log info
+ if (res & ::REC_TAIL_SERIAL_ERR_MASK) {
+ oss << std::endl << " Serial: expected 0x" << enqueueHeader._rhdr._serial << "; found 0x" << enqueueTail._serial;
+ }
+ if (res & ::REC_TAIL_RID_ERR_MASK) {
+ oss << std::endl << " Record Id: expected 0x" << enqueueHeader._rhdr._rid << "; found 0x" << enqueueTail._rid;
+ }
+ if (res & ::REC_TAIL_CHECKSUM_ERR_MASK) {
+ oss << std::endl << " Checksum: expected 0x" << cs << "; found 0x" << enqueueTail._checksum;
+ }
+ throw jexception(jerrno::JERR_JREC_BADRECTAIL, oss.str(), "RecoveryManager", "readNextRemainingRecord"); // TODO: Don't throw exception, log info
}
// Set data token
@@ -472,7 +478,13 @@ bool RecoveryManager::decodeRecord(jrec&
done = record.decode(headerRecord, &inFileStream_, cumulativeSizeRead);
}
catch (const jexception& e) {
- journalLogRef_.log(JournalLog::LOG_INFO, queueName_, e.what());
+ if (e.err_code() == jerrno::JERR_JREC_BADRECTAIL) {
+ std::ostringstream oss;
+ oss << jerrno::err_msg(e.err_code()) << e.additional_info();
+ journalLogRef_.log(JournalLog::LOG_INFO, queueName_, oss.str());
+ } else {
+ journalLogRef_.log(JournalLog::LOG_INFO, queueName_, e.what());
+ }
checkJournalAlignment(start_file_offs);
return false;
}
@@ -602,7 +614,6 @@ bool RecoveryManager::getNextRecordHeade
oss << std::hex << "_tmap.set_aio_compl: txn_enq xid=\"" << xid << "\" rid=0x" << h._rid;
throw jexception(jerrno::JERR_MAP_NOTFOUND, oss.str(), "RecoveryManager", "getNextRecordHeader");
}
- std::free(xidp);
} else {
if (enqueueMapRef_.insert_pfid(h._rid, start_fid, file_pos) < enq_map::EMAP_OK) { // fail
// The only error code emap::insert_pfid() returns is enq_map::EMAP_DUP_RID.
@@ -641,7 +652,6 @@ bool RecoveryManager::getNextRecordHeade
oss << std::hex << "_tmap.set_aio_compl: txn_deq xid=\"" << xid << "\" rid=0x" << dr.rid();
throw jexception(jerrno::JERR_MAP_NOTFOUND, oss.str(), "RecoveryManager", "getNextRecordHeader");
}
- std::free(xidp);
} else {
uint64_t enq_fid;
if (enqueueMapRef_.get_remove_pfid(dr.deq_rid(), enq_fid, true) == enq_map::EMAP_OK) { // ignore not found error
@@ -675,7 +685,6 @@ bool RecoveryManager::getNextRecordHeade
enqueueMapRef_.unlock(itr->drid_); // ignore not found error
}
}
- std::free(xidp);
}
break;
case QLS_TXC_MAGIC:
@@ -711,7 +720,6 @@ bool RecoveryManager::getNextRecordHeade
fileNumberMap_[enq_fid]->decrEnqueuedRecordCount();
}
}
- std::free(xidp);
}
break;
case QLS_EMPTY_MAGIC:
Modified: qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/linearstore/journal/deq_rec.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/linearstore/journal/deq_rec.cpp?rev=1560634&r1=1560633&r2=1560634&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/linearstore/journal/deq_rec.cpp (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/linearstore/journal/deq_rec.cpp Thu Jan 23 11:01:02 2014
@@ -32,7 +32,7 @@ namespace journal {
deq_rec::deq_rec():
_xidp(0),
- _buff(0)
+ _xid_buff(0)
{
::deq_hdr_init(&_deq_hdr, QLS_DEQ_MAGIC, QLS_JRNL_VERSION, 0, 0, 0, 0, 0);
::rec_tail_copy(&_deq_tail, &_deq_hdr._rhdr, 0);
@@ -53,7 +53,7 @@ deq_rec::reset(const uint64_t serial, co
_deq_hdr._deq_rid = drid;
_deq_hdr._xidsize = xidlen;
_xidp = xidp;
- _buff = 0;
+ _xid_buff = 0;
_deq_tail._serial = serial;
_deq_tail._rid = rid;
_deq_tail._checksum = 0UL;
@@ -192,15 +192,15 @@ deq_rec::decode(::rec_hdr_t& h, std::ifs
// Read header, allocate (if req'd) for xid
if (_deq_hdr._xidsize)
{
- _buff = std::malloc(_deq_hdr._xidsize);
- MALLOC_CHK(_buff, "_buff", "enq_rec", "rcv_decode");
+ _xid_buff = std::malloc(_deq_hdr._xidsize);
+ MALLOC_CHK(_xid_buff, "_buff", "enq_rec", "rcv_decode");
}
}
if (rec_offs < sizeof(_deq_hdr) + _deq_hdr._xidsize)
{
// Read xid (or continue reading xid)
std::size_t offs = rec_offs - sizeof(_deq_hdr);
- ifsp->read((char*)_buff + offs, _deq_hdr._xidsize - offs);
+ ifsp->read((char*)_xid_buff + offs, _deq_hdr._xidsize - offs);
std::size_t size_read = ifsp->gcount();
rec_offs += size_read;
if (size_read < _deq_hdr._xidsize - offs)
@@ -228,39 +228,22 @@ deq_rec::decode(::rec_hdr_t& h, std::ifs
assert(!ifsp->fail() && !ifsp->bad());
return false;
}
+ check_rec_tail();
}
ifsp->ignore(rec_size_dblks() * QLS_DBLK_SIZE_BYTES - rec_size());
assert(!ifsp->fail() && !ifsp->bad());
- if (_deq_hdr._xidsize) {
- Checksum checksum;
- checksum.addData((unsigned char*)&_deq_hdr, sizeof(_deq_hdr));
- checksum.addData((unsigned char*)_buff, _deq_hdr._xidsize);
- uint32_t cs = checksum.getChecksum();
- int res = ::rec_tail_check(&_deq_tail, &_deq_hdr._rhdr, cs);
- if (res != 0) {
- std::stringstream oss;
- switch (res) {
- case 1: oss << std::hex << "Magic: expected 0x" << ~_deq_hdr._rhdr._magic << "; found 0x" << _deq_tail._xmagic; break;
- case 2: oss << std::hex << "Serial: expected 0x" << _deq_hdr._rhdr._serial << "; found 0x" << _deq_tail._serial; break;
- case 3: oss << std::hex << "Record Id: expected 0x" << _deq_hdr._rhdr._rid << "; found 0x" << _deq_tail._rid; break;
- case 4: oss << std::hex << "Checksum: expected 0x" << cs << "; found 0x" << _deq_tail._checksum; break;
- default: oss << "Unknown error " << res;
- }
- throw jexception(jerrno::JERR_JREC_BADRECTAIL, oss.str(), "deq_rec", "decode"); // TODO: Don't throw exception, log info
- }
- }
return true;
}
std::size_t
deq_rec::get_xid(void** const xidpp)
{
- if (!_buff)
+ if (!_xid_buff)
{
*xidpp = 0;
return 0;
}
- *xidpp = _buff;
+ *xidpp = _xid_buff;
return _deq_hdr._xidsize;
}
@@ -291,9 +274,40 @@ deq_rec::rec_size() const
}
void
+deq_rec::check_rec_tail() const {
+ Checksum checksum;
+ checksum.addData((const unsigned char*)&_deq_hdr, sizeof(::deq_hdr_t));
+ if (_deq_hdr._xidsize > 0) {
+ checksum.addData((const unsigned char*)_xid_buff, _deq_hdr._xidsize);
+ }
+ uint32_t cs = checksum.getChecksum();
+ uint16_t res = ::rec_tail_check(&_deq_tail, &_deq_hdr._rhdr, cs);
+ if (res != 0) {
+ std::stringstream oss;
+ oss << std::hex;
+ if (res & ::REC_TAIL_MAGIC_ERR_MASK) {
+ oss << std::endl << " Magic: expected 0x" << ~_deq_hdr._rhdr._magic << "; found 0x" << _deq_tail._xmagic;
+ }
+ if (res & ::REC_TAIL_SERIAL_ERR_MASK) {
+ oss << std::endl << " Serial: expected 0x" << _deq_hdr._rhdr._serial << "; found 0x" << _deq_tail._serial;
+ }
+ if (res & ::REC_TAIL_RID_ERR_MASK) {
+ oss << std::endl << " Record Id: expected 0x" << _deq_hdr._rhdr._rid << "; found 0x" << _deq_tail._rid;
+ }
+ if (res & ::REC_TAIL_CHECKSUM_ERR_MASK) {
+ oss << std::endl << " Checksum: expected 0x" << cs << "; found 0x" << _deq_tail._checksum;
+ }
+ throw jexception(jerrno::JERR_JREC_BADRECTAIL, oss.str(), "deq_rec", "check_rec_tail");
+ }
+}
+
+void
deq_rec::clean()
{
- // clean up allocated memory here
+ if (_xid_buff) {
+ std::free(_xid_buff);
+ _xid_buff = 0;
+ }
}
}}}
Modified: qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/linearstore/journal/deq_rec.h
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/linearstore/journal/deq_rec.h?rev=1560634&r1=1560633&r2=1560634&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/linearstore/journal/deq_rec.h (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/linearstore/journal/deq_rec.h Thu Jan 23 11:01:02 2014
@@ -39,7 +39,7 @@ class deq_rec : public jrec
private:
::deq_hdr_t _deq_hdr; ///< Local instance of dequeue header struct
const void* _xidp; ///< xid pointer for encoding (writing to disk)
- void* _buff; ///< Pointer to buffer to receive data read from disk
+ void* _xid_buff; ///< Pointer to buffer to receive xid read from disk
::rec_tail_t _deq_tail; ///< Local instance of enqueue tail struct, only encoded if XID is present
public:
@@ -59,6 +59,7 @@ public:
inline std::size_t data_size() const { return 0; } // This record never carries data
std::size_t xid_size() const;
std::size_t rec_size() const;
+ void check_rec_tail() const;
private:
virtual void clean();
Modified: qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/linearstore/journal/enq_rec.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/linearstore/journal/enq_rec.cpp?rev=1560634&r1=1560633&r2=1560634&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/linearstore/journal/enq_rec.cpp (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/linearstore/journal/enq_rec.cpp Thu Jan 23 11:01:02 2014
@@ -34,7 +34,8 @@ enq_rec::enq_rec():
jrec(), // superclass
_xidp(0),
_data(0),
- _buff(0)
+ _xid_buff(0),
+ _data_buff(0)
{
::enq_hdr_init(&_enq_hdr, QLS_ENQ_MAGIC, QLS_JRNL_VERSION, 0, 0, 0, 0, false);
::rec_tail_copy(&_enq_tail, &_enq_hdr._rhdr, 0);
@@ -57,7 +58,6 @@ enq_rec::reset(const uint64_t serial, co
_enq_hdr._dsize = dlen;
_xidp = xidp;
_data = dbuf;
- _buff = 0;
_enq_tail._serial = serial;
_enq_tail._rid = rid;
}
@@ -229,15 +229,20 @@ enq_rec::decode(::rec_hdr_t& h, std::ifs
rec_offs = sizeof(::enq_hdr_t);
if (_enq_hdr._xidsize > 0)
{
- _buff = std::malloc(_enq_hdr._xidsize);
- MALLOC_CHK(_buff, "_buff", "enq_rec", "rcv_decode");
+ _xid_buff = std::malloc(_enq_hdr._xidsize);
+ MALLOC_CHK(_xid_buff, "_xid_buff", "enq_rec", "decode");
+ }
+ if (_enq_hdr._dsize > 0)
+ {
+ _data_buff = std::malloc(_enq_hdr._dsize);
+ MALLOC_CHK(_data_buff, "_data_buff", "enq_rec", "decode")
}
}
if (rec_offs < sizeof(_enq_hdr) + _enq_hdr._xidsize)
{
// Read xid (or continue reading xid)
std::size_t offs = rec_offs - sizeof(_enq_hdr);
- ifsp->read((char*)_buff + offs, _enq_hdr._xidsize - offs);
+ ifsp->read((char*)_xid_buff + offs, _enq_hdr._xidsize - offs);
std::size_t size_read = ifsp->gcount();
rec_offs += size_read;
if (size_read < _enq_hdr._xidsize - offs)
@@ -253,9 +258,9 @@ enq_rec::decode(::rec_hdr_t& h, std::ifs
{
if (rec_offs < sizeof(_enq_hdr) + _enq_hdr._xidsize + _enq_hdr._dsize)
{
- // Ignore data (or continue ignoring data)
+ // Read data (or continue reading data)
std::size_t offs = rec_offs - sizeof(_enq_hdr) - _enq_hdr._xidsize;
- ifsp->ignore(_enq_hdr._dsize - offs);
+ ifsp->read((char*)_data_buff + offs, _enq_hdr._dsize - offs);
std::size_t size_read = ifsp->gcount();
rec_offs += size_read;
if (size_read < _enq_hdr._dsize - offs)
@@ -286,6 +291,7 @@ enq_rec::decode(::rec_hdr_t& h, std::ifs
assert(!ifsp->fail() && !ifsp->bad());
return false;
}
+ check_rec_tail();
}
ifsp->ignore(rec_size_dblks() * QLS_DBLK_SIZE_BYTES - rec_size());
assert(!ifsp->fail() && !ifsp->bad());
@@ -295,27 +301,25 @@ enq_rec::decode(::rec_hdr_t& h, std::ifs
std::size_t
enq_rec::get_xid(void** const xidpp)
{
- if (!_buff || !_enq_hdr._xidsize)
- {
+ if (!_xid_buff || !_enq_hdr._xidsize) {
*xidpp = 0;
return 0;
}
- *xidpp = _buff;
+ *xidpp = _xid_buff;
return _enq_hdr._xidsize;
}
std::size_t
enq_rec::get_data(void** const datapp)
{
- if (!_buff)
- {
+ if (!_data_buff) {
*datapp = 0;
return 0;
}
if (::is_enq_external(&_enq_hdr))
*datapp = 0;
else
- *datapp = (void*)((char*)_buff + _enq_hdr._xidsize);
+ *datapp = _data_buff;
return _enq_hdr._dsize;
}
@@ -348,9 +352,46 @@ enq_rec::rec_size(const std::size_t xids
}
void
-enq_rec::clean()
-{
- // clean up allocated memory here
+enq_rec::check_rec_tail() const {
+ Checksum checksum;
+ checksum.addData((const unsigned char*)&_enq_hdr, sizeof(::enq_hdr_t));
+ if (_enq_hdr._xidsize > 0) {
+ checksum.addData((const unsigned char*)_xid_buff, _enq_hdr._xidsize);
+ }
+ if (_enq_hdr._dsize > 0) {
+ checksum.addData((const unsigned char*)_data_buff, _enq_hdr._dsize);
+ }
+ uint32_t cs = checksum.getChecksum();
+ uint16_t res = ::rec_tail_check(&_enq_tail, &_enq_hdr._rhdr, cs);
+ if (res != 0) {
+ std::stringstream oss;
+ oss << std::hex;
+ if (res & ::REC_TAIL_MAGIC_ERR_MASK) {
+ oss << std::endl << " Magic: expected 0x" << ~_enq_hdr._rhdr._magic << "; found 0x" << _enq_tail._xmagic;
+ }
+ if (res & ::REC_TAIL_SERIAL_ERR_MASK) {
+ oss << std::endl << " Serial: expected 0x" << _enq_hdr._rhdr._serial << "; found 0x" << _enq_tail._serial;
+ }
+ if (res & ::REC_TAIL_RID_ERR_MASK) {
+ oss << std::endl << " Record Id: expected 0x" << _enq_hdr._rhdr._rid << "; found 0x" << _enq_tail._rid;
+ }
+ if (res & ::REC_TAIL_CHECKSUM_ERR_MASK) {
+ oss << std::endl << " Checksum: expected 0x" << cs << "; found 0x" << _enq_tail._checksum;
+ }
+ throw jexception(jerrno::JERR_JREC_BADRECTAIL, oss.str(), "enq_rec", "check_rec_tail");
+ }
+}
+
+void
+enq_rec::clean() {
+ if (_xid_buff) {
+ std::free(_xid_buff);
+ _xid_buff = 0;
+ }
+ if (_data_buff) {
+ std::free(_data_buff);
+ _data_buff = 0;
+ }
}
}}}
Modified: qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/linearstore/journal/enq_rec.h
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/linearstore/journal/enq_rec.h?rev=1560634&r1=1560633&r2=1560634&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/linearstore/journal/enq_rec.h (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/linearstore/journal/enq_rec.h Thu Jan 23 11:01:02 2014
@@ -40,7 +40,8 @@ private:
::enq_hdr_t _enq_hdr; ///< Local instance of enqueue header struct
const void* _xidp; ///< xid pointer for encoding (for writing to disk)
const void* _data; ///< Pointer to data to be written to disk
- void* _buff; ///< Pointer to buffer to receive data read from disk
+ void* _xid_buff;
+ void* _data_buff;
::rec_tail_t _enq_tail; ///< Local instance of enqueue tail struct
public:
@@ -62,6 +63,7 @@ public:
std::size_t rec_size() const;
static std::size_t rec_size(const std::size_t xidsize, const std::size_t dsize, const bool external);
inline uint64_t rid() const { return _enq_hdr._rhdr._rid; }
+ void check_rec_tail() const;
private:
virtual void clean();
Modified: qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/linearstore/journal/jcfg.h
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/linearstore/journal/jcfg.h?rev=1560634&r1=1560633&r2=1560634&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/linearstore/journal/jcfg.h (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/linearstore/journal/jcfg.h Thu Jan 23 11:01:02 2014
@@ -19,6 +19,9 @@
*
*/
+#include <cmath>
+#include <cstdlib>
+
#ifndef QPID_QLS_JRNL_JCFG_H
#define QPID_QLS_JRNL_JCFG_H
@@ -55,4 +58,14 @@
#define QLS_CLEAN /**< If defined, writes QLS_CLEAN_CHAR to all filled areas on disk */
#define QLS_CLEAN_CHAR 0xff /**< Char used to clear empty space on disk */
+namespace qpid {
+namespace linearstore {
+
+ const int QLS_RAND_WIDTH = (int)(::log((RAND_MAX + 1ULL))/::log(2));
+ const int QLS_RAND_SHIFT1 = 64 - QLS_RAND_WIDTH;
+ const int QLS_RAND_SHIFT2 = QLS_RAND_SHIFT1 - QLS_RAND_WIDTH;
+ const int QLS_RAND_MASK = (int)::pow(2, QLS_RAND_SHIFT2) - 1;
+
+}}
+
#endif /* ifndef QPID_QLS_JRNL_JCFG_H */
Modified: qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/linearstore/journal/txn_rec.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/linearstore/journal/txn_rec.cpp?rev=1560634&r1=1560633&r2=1560634&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/linearstore/journal/txn_rec.cpp (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/linearstore/journal/txn_rec.cpp Thu Jan 23 11:01:02 2014
@@ -32,7 +32,7 @@ namespace journal {
txn_rec::txn_rec():
_xidp(0),
- _buff(0)
+ _xid_buff(0)
{
::txn_hdr_init(&_txn_hdr, 0, QLS_JRNL_VERSION, 0, 0, 0, 0);
::rec_tail_init(&_txn_tail, 0, 0, 0, 0);
@@ -52,7 +52,7 @@ txn_rec::reset(const bool commitFlag, co
_txn_hdr._rhdr._rid = rid;
_txn_hdr._xidsize = xidlen;
_xidp = xidp;
- _buff = 0;
+ _xid_buff = 0;
_txn_tail._xmagic = ~_txn_hdr._rhdr._magic;
_txn_tail._serial = serial;
_txn_tail._rid = rid;
@@ -184,14 +184,14 @@ txn_rec::decode(::rec_hdr_t& h, std::ifs
::rec_hdr_copy(&_txn_hdr._rhdr, &h);
ifsp->read((char*)&_txn_hdr._xidsize, sizeof(_txn_hdr._xidsize));
rec_offs = sizeof(::txn_hdr_t);
- _buff = std::malloc(_txn_hdr._xidsize);
- MALLOC_CHK(_buff, "_buff", "txn_rec", "rcv_decode");
+ _xid_buff = std::malloc(_txn_hdr._xidsize);
+ MALLOC_CHK(_xid_buff, "_buff", "txn_rec", "rcv_decode");
}
if (rec_offs < sizeof(txn_hdr_t) + _txn_hdr._xidsize)
{
// Read xid (or continue reading xid)
std::size_t offs = rec_offs - sizeof(txn_hdr_t);
- ifsp->read((char*)_buff + offs, _txn_hdr._xidsize - offs);
+ ifsp->read((char*)_xid_buff + offs, _txn_hdr._xidsize - offs);
std::size_t size_read = ifsp->gcount();
rec_offs += size_read;
if (size_read < _txn_hdr._xidsize - offs)
@@ -218,39 +218,23 @@ txn_rec::decode(::rec_hdr_t& h, std::ifs
assert(!ifsp->fail() && !ifsp->bad());
return false;
}
+ check_rec_tail();
}
ifsp->ignore(rec_size_dblks() * QLS_DBLK_SIZE_BYTES - rec_size());
assert(!ifsp->fail() && !ifsp->bad());
assert(_txn_hdr._xidsize > 0);
-
- Checksum checksum;
- checksum.addData((unsigned char*)&_txn_hdr, sizeof(_txn_hdr));
- checksum.addData((unsigned char*)_buff, _txn_hdr._xidsize);
- uint32_t cs = checksum.getChecksum();
- int res = ::rec_tail_check(&_txn_tail, &_txn_hdr._rhdr, cs);
- if (res != 0) {
- std::stringstream oss;
- switch (res) {
- case 1: oss << std::hex << "Magic: expected 0x" << ~_txn_hdr._rhdr._magic << "; found 0x" << _txn_tail._xmagic; break;
- case 2: oss << std::hex << "Serial: expected 0x" << _txn_hdr._rhdr._serial << "; found 0x" << _txn_tail._serial; break;
- case 3: oss << std::hex << "Record Id: expected 0x" << _txn_hdr._rhdr._rid << "; found 0x" << _txn_tail._rid; break;
- case 4: oss << std::hex << "Checksum: expected 0x" << cs << "; found 0x" << _txn_tail._checksum; break;
- default: oss << "Unknown error " << res;
- }
- throw jexception(jerrno::JERR_JREC_BADRECTAIL, oss.str(), "txn_rec", "decode"); // TODO: Don't throw exception, log info
- }
return true;
}
std::size_t
txn_rec::get_xid(void** const xidpp)
{
- if (!_buff)
+ if (!_xid_buff)
{
*xidpp = 0;
return 0;
}
- *xidpp = _buff;
+ *xidpp = _xid_buff;
return _txn_hdr._xidsize;
}
@@ -282,9 +266,40 @@ txn_rec::rec_size() const
}
void
+txn_rec::check_rec_tail() const {
+ Checksum checksum;
+ checksum.addData((const unsigned char*)&_txn_hdr, sizeof(::txn_hdr_t));
+ if (_txn_hdr._xidsize > 0) {
+ checksum.addData((const unsigned char*)_xid_buff, _txn_hdr._xidsize);
+ }
+ uint32_t cs = checksum.getChecksum();
+ uint16_t res = ::rec_tail_check(&_txn_tail, &_txn_hdr._rhdr, cs);
+ if (res != 0) {
+ std::stringstream oss;
+ oss << std::hex;
+ if (res & ::REC_TAIL_MAGIC_ERR_MASK) {
+ oss << std::endl << " Magic: expected 0x" << ~_txn_hdr._rhdr._magic << "; found 0x" << _txn_tail._xmagic;
+ }
+ if (res & ::REC_TAIL_SERIAL_ERR_MASK) {
+ oss << std::endl << " Serial: expected 0x" << _txn_hdr._rhdr._serial << "; found 0x" << _txn_tail._serial;
+ }
+ if (res & ::REC_TAIL_RID_ERR_MASK) {
+ oss << std::endl << " Record Id: expected 0x" << _txn_hdr._rhdr._rid << "; found 0x" << _txn_tail._rid;
+ }
+ if (res & ::REC_TAIL_CHECKSUM_ERR_MASK) {
+ oss << std::endl << " Checksum: expected 0x" << cs << "; found 0x" << _txn_tail._checksum;
+ }
+ throw jexception(jerrno::JERR_JREC_BADRECTAIL, oss.str(), "txn_rec", "check_rec_tail");
+ }
+}
+
+void
txn_rec::clean()
{
- // clean up allocated memory here
+ if (_xid_buff) {
+ std::free(_xid_buff);
+ _xid_buff = 0;
+ }
}
}}}
Modified: qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/linearstore/journal/txn_rec.h
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/linearstore/journal/txn_rec.h?rev=1560634&r1=1560633&r2=1560634&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/linearstore/journal/txn_rec.h (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/linearstore/journal/txn_rec.h Thu Jan 23 11:01:02 2014
@@ -39,7 +39,7 @@ class txn_rec : public jrec
private:
::txn_hdr_t _txn_hdr; ///< Local instance of transaction header struct
const void* _xidp; ///< xid pointer for encoding (writing to disk)
- void* _buff; ///< Pointer to buffer to receive data read from disk
+ void* _xid_buff; ///< Pointer to buffer to receive xid read from disk
::rec_tail_t _txn_tail; ///< Local instance of enqueue tail struct
public:
@@ -57,6 +57,7 @@ public:
std::size_t xid_size() const;
std::size_t rec_size() const;
inline uint64_t rid() const { return _txn_hdr._rhdr._rid; }
+ void check_rec_tail() const;
private:
virtual void clean();
Modified: qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/linearstore/journal/utils/file_hdr.c
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/linearstore/journal/utils/file_hdr.c?rev=1560634&r1=1560633&r2=1560634&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/linearstore/journal/utils/file_hdr.c (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/linearstore/journal/utils/file_hdr.c Thu Jan 23 11:01:02 2014
@@ -94,23 +94,6 @@ int is_file_hdr_reset(file_hdr_t* target
target->_queue_name_len == 0;
}
-/*
-uint64_t random_64() {
- int randomData = open("/dev/random", O_RDONLY);
- if (randomData < 0) {
- return 0ULL;
- }
- uint64_t randomNumber;
- size_t size = sizeof(randomNumber);
- ssize_t result = read(randomData, (char*)&randomNumber, size);
- if (result != size) {
- randomNumber = 0ULL;
- }
- close(randomData);
- return randomNumber;
-}
-*/
-
int set_time_now(file_hdr_t *fh)
{
struct timespec ts;
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org