You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kp...@apache.org on 2013/10/21 23:26:11 UTC
svn commit: r1534383 [2/4] - in /qpid/branches/linearstore/qpid/cpp/src: ./
qpid/broker/ qpid/legacystore/jrnl/ qpid/linearstore/
qpid/linearstore/jrnl/ qpid/linearstore/jrnl/utils/
Modified: qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/JournalFile.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/JournalFile.cpp?rev=1534383&r1=1534382&r2=1534383&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/JournalFile.cpp (original)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/JournalFile.cpp Mon Oct 21 21:26:10 2013
@@ -31,21 +31,21 @@
namespace qpid {
namespace qls_jrnl {
-JournalFile::JournalFile(const std::string& fqFileName_,
- const uint64_t fileSeqNum_,
- const uint32_t fileSize_kib_) :
- fqFileName(fqFileName_),
- fileSeqNum(fileSeqNum_),
- fileHandle(-1),
- fileCloseFlag(false),
- fileHeaderBasePtr (0),
- fileHeaderPtr(0),
- aioControlBlockPtr(0),
- fileSizeDblks(((fileSize_kib_ * 1024) + (QLS_JRNL_FHDR_RES_SIZE_SBLKS * JRNL_SBLK_SIZE_BYTES)) / JRNL_DBLK_SIZE_BYTES),
- enqueuedRecordCount(0),
- submittedDblkCount(0),
- completedDblkCount(0),
- outstandingAioOpsCount(0)
+JournalFile::JournalFile(const std::string& fqFileName,
+ const uint64_t fileSeqNum,
+ const efpDataSize_kib_t efpDataSize_kib) :
+ fqFileName_(fqFileName),
+ fileSeqNum_(fileSeqNum),
+ fileHandle_(-1),
+ fileCloseFlag_(false),
+ fileHeaderBasePtr_ (0),
+ fileHeaderPtr_(0),
+ aioControlBlockPtr_(0),
+ fileSize_dblks_(((efpDataSize_kib * 1024) + (QLS_JRNL_FHDR_RES_SIZE_SBLKS * QLS_SBLK_SIZE_BYTES)) / QLS_DBLK_SIZE_BYTES),
+ enqueuedRecordCount_(0),
+ submittedDblkCount_(0),
+ completedDblkCount_(0),
+ outstandingAioOpsCount_(0)
{}
JournalFile::~JournalFile() {
@@ -53,227 +53,223 @@ JournalFile::~JournalFile() {
}
void
-JournalFile::initialize() {
- if (::posix_memalign(&fileHeaderBasePtr, QLS_AIO_ALIGN_BOUNDARY, QLS_JRNL_FHDR_RES_SIZE_SBLKS * JRNL_SBLK_SIZE_KIB * 1024))
+JournalFile::initialize(const uint32_t completedDblkCount) {
+ if (::posix_memalign(&fileHeaderBasePtr_, QLS_AIO_ALIGN_BOUNDARY_BYTES, QLS_JRNL_FHDR_RES_SIZE_SBLKS * QLS_SBLK_SIZE_KIB * 1024))
{
std::ostringstream oss;
- oss << "posix_memalign(): blksize=" << QLS_AIO_ALIGN_BOUNDARY << " size=" << (QLS_JRNL_FHDR_RES_SIZE_SBLKS * JRNL_SBLK_SIZE_KIB * 1024);
+ oss << "posix_memalign(): blksize=" << QLS_AIO_ALIGN_BOUNDARY_BYTES << " size=" << (QLS_JRNL_FHDR_RES_SIZE_SBLKS * QLS_SBLK_SIZE_KIB * 1024);
oss << FORMAT_SYSERR(errno);
throw jexception(jerrno::JERR__MALLOC, oss.str(), "JournalFile", "initialize");
}
- fileHeaderPtr = (::file_hdr_t*)fileHeaderBasePtr;
- aioControlBlockPtr = new aio_cb;
+ fileHeaderPtr_ = (::file_hdr_t*)fileHeaderBasePtr_;
+ aioControlBlockPtr_ = new aio_cb;
+ if (completedDblkCount > 0UL) {
+ submittedDblkCount_.add(completedDblkCount);
+ completedDblkCount_.add(completedDblkCount);
+ }
}
void
JournalFile::finalize() {
- if (fileHeaderBasePtr != 0) {
- std::free(fileHeaderBasePtr);
- fileHeaderBasePtr = 0;
- fileHeaderPtr = 0;
+ if (fileHeaderBasePtr_ != 0) {
+ std::free(fileHeaderBasePtr_);
+ fileHeaderBasePtr_ = 0;
+ fileHeaderPtr_ = 0;
}
- if (aioControlBlockPtr != 0) {
- std::free(aioControlBlockPtr);
- aioControlBlockPtr = 0;
+ if (aioControlBlockPtr_ != 0) {
+ delete(aioControlBlockPtr_);
+ aioControlBlockPtr_ = 0;
}
}
-const std::string
-JournalFile::getDirectory() const {
- return fqFileName.substr(0, fqFileName.rfind('/'));
+const std::string JournalFile::getDirectory() const {
+ return fqFileName_.substr(0, fqFileName_.rfind('/'));
+}
+
+const std::string JournalFile::getFileName() const {
+ return fqFileName_.substr(fqFileName_.rfind('/')+1);
}
-const std::string
-JournalFile::getFileName() const {
- return fqFileName.substr(fqFileName.rfind('/')+1);
+const std::string JournalFile::getFqFileName() const {
+ return fqFileName_;
}
-const std::string
-JournalFile::getFqFileName() const {
- return fqFileName;
+uint64_t JournalFile::getFileSeqNum() const {
+ return fileSeqNum_;
}
-uint64_t
-JournalFile::getFileSeqNum() const {
- return fileSeqNum;
+bool JournalFile::isOpen() const {
+ return fileHandle_ >= 0;
}
-int
-JournalFile::open() {
- fileHandle = ::open(fqFileName.c_str(), O_WRONLY | O_DIRECT, S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH); // 0644 -rw-r--r--
- if (fileHandle < 0) {
+int JournalFile::open() {
+ fileHandle_ = ::open(fqFileName_.c_str(), O_WRONLY | O_DIRECT, S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH); // 0644 -rw-r--r--
+ if (fileHandle_ < 0) {
std::ostringstream oss;
- oss << "file=\"" << fqFileName << "\"" << FORMAT_SYSERR(errno);
+ oss << "file=\"" << fqFileName_ << "\"" << FORMAT_SYSERR(errno);
throw jexception(jerrno::JERR_JNLF_OPEN, oss.str(), "JournalFile", "open");
}
- return fileHandle;
+ return fileHandle_;
}
-bool
-JournalFile::isOpen() const {
- return fileHandle >= 0;
-}
-
-void
-JournalFile::close() {
- if (fileHandle >= 0) {
+void JournalFile::close() {
+ if (fileHandle_ >= 0) {
if (getOutstandingAioDblks()) {
- fileCloseFlag = true; // Close later when all outstanding AIOs have returned
+ fileCloseFlag_ = true; // Close later when all outstanding AIOs have returned
} else {
- int res = ::close(fileHandle);
- fileHandle = -1;
+ int res = ::close(fileHandle_);
+ fileHandle_ = -1;
if (res != 0) {
std::ostringstream oss;
- oss << "file=\"" << fqFileName << "\"" << FORMAT_SYSERR(errno);
+ oss << "file=\"" << fqFileName_ << "\"" << FORMAT_SYSERR(errno);
throw jexception(jerrno::JERR_JNLF_CLOSE, oss.str(), "JournalFile", "open");
}
}
}
}
-void
-JournalFile::asyncFileHeaderWrite(io_context_t ioContextPtr_,
- const efpPartitionNumber_t efpPartitionNumber_,
- const efpDataSize_kib_t efpDataSize_kib_,
- const uint16_t userFlags_,
- const uint64_t recordId_,
- const uint64_t firstRecordOffset_,
- const std::string queueName_) {
- ::file_hdr_create(fileHeaderPtr, QLS_FILE_MAGIC, QLS_JRNL_VERSION, QLS_JRNL_FHDR_RES_SIZE_SBLKS, efpPartitionNumber_, efpDataSize_kib_);
- ::file_hdr_init(fileHeaderBasePtr, QLS_JRNL_FHDR_RES_SIZE_SBLKS * JRNL_SBLK_SIZE_KIB * 1024, userFlags_, recordId_, firstRecordOffset_, fileSeqNum, queueName_.size(), queueName_.data());
- aio::prep_pwrite(aioControlBlockPtr, fileHandle, (void*)fileHeaderBasePtr, QLS_JRNL_FHDR_RES_SIZE_SBLKS * JRNL_SBLK_SIZE_KIB * 1024, 0UL);
- if (aio::submit(ioContextPtr_, 1, &aioControlBlockPtr) < 0)
+void JournalFile::asyncFileHeaderWrite(io_context_t ioContextPtr,
+ const efpPartitionNumber_t efpPartitionNumber,
+ const efpDataSize_kib_t efpDataSize_kib,
+ const uint16_t userFlags,
+ const uint64_t recordId,
+ const uint64_t firstRecordOffset,
+ const std::string queueName) {
+ ::file_hdr_create(fileHeaderPtr_, QLS_FILE_MAGIC, QLS_JRNL_VERSION, QLS_JRNL_FHDR_RES_SIZE_SBLKS, efpPartitionNumber, efpDataSize_kib);
+ ::file_hdr_init(fileHeaderBasePtr_,
+ QLS_JRNL_FHDR_RES_SIZE_SBLKS * QLS_SBLK_SIZE_KIB * 1024,
+ userFlags,
+ recordId,
+ firstRecordOffset,
+ fileSeqNum_,
+ queueName.size(),
+ queueName.data());
+ aio::prep_pwrite(aioControlBlockPtr_,
+ fileHandle_,
+ (void*)fileHeaderBasePtr_,
+ QLS_JRNL_FHDR_RES_SIZE_SBLKS * QLS_SBLK_SIZE_KIB * 1024,
+ 0UL);
+ if (aio::submit(ioContextPtr, 1, &aioControlBlockPtr_) < 0)
throw jexception(jerrno::JERR__AIO, "JournalFile", "asyncPageWrite");
- addSubmittedDblkCount(QLS_JRNL_FHDR_RES_SIZE_SBLKS * JRNL_SBLK_SIZE_DBLKS);
+ addSubmittedDblkCount(QLS_JRNL_FHDR_RES_SIZE_SBLKS * QLS_SBLK_SIZE_DBLKS);
incrOutstandingAioOperationCount();
}
-void
-JournalFile::asyncPageWrite(io_context_t ioContextPtr_,
- aio_cb* aioControlBlockPtr_,
- void* data_,
- uint32_t dataSize_dblks_) {
- aio::prep_pwrite_2(aioControlBlockPtr_, fileHandle, data_, dataSize_dblks_ * JRNL_DBLK_SIZE_BYTES, submittedDblkCount.get() * JRNL_DBLK_SIZE_BYTES);
- pmgr::page_cb* pcbp = (pmgr::page_cb*)(aioControlBlockPtr_->data); // This page's control block (pcb)
- pcbp->_wdblks = dataSize_dblks_;
+void JournalFile::asyncPageWrite(io_context_t ioContextPtr,
+ aio_cb* aioControlBlockPtr,
+ void* data,
+ uint32_t dataSize_dblks) {
+ aio::prep_pwrite_2(aioControlBlockPtr,
+ fileHandle_,
+ data,
+ dataSize_dblks * QLS_DBLK_SIZE_BYTES,
+ submittedDblkCount_.get() * QLS_DBLK_SIZE_BYTES);
+ pmgr::page_cb* pcbp = (pmgr::page_cb*)(aioControlBlockPtr->data); // This page's control block (pcb)
+ pcbp->_wdblks = dataSize_dblks;
pcbp->_jfp = this;
- if (aio::submit(ioContextPtr_, 1, &aioControlBlockPtr_) < 0)
- throw jexception(jerrno::JERR__AIO, "JournalFile", "asyncPageWrite");
- addSubmittedDblkCount(dataSize_dblks_);
+ if (aio::submit(ioContextPtr, 1, &aioControlBlockPtr) < 0) {
+ throw jexception(jerrno::JERR__AIO, "JournalFile", "asyncPageWrite"); // TODO: complete exception details
+ }
+ addSubmittedDblkCount(dataSize_dblks);
incrOutstandingAioOperationCount();
}
-uint32_t
-JournalFile::getEnqueuedRecordCount() const {
- return enqueuedRecordCount.get();
+uint32_t JournalFile::getEnqueuedRecordCount() const {
+ return enqueuedRecordCount_.get();
}
-uint32_t
-JournalFile::incrEnqueuedRecordCount() {
- return enqueuedRecordCount.increment();
+uint32_t JournalFile::incrEnqueuedRecordCount() {
+ return enqueuedRecordCount_.increment();
}
-uint32_t
-JournalFile::addEnqueuedRecordCount(const uint32_t a) {
- return enqueuedRecordCount.add(a);
+uint32_t JournalFile::addEnqueuedRecordCount(const uint32_t a) {
+ return enqueuedRecordCount_.add(a);
}
-uint32_t
-JournalFile::decrEnqueuedRecordCount() {
- return enqueuedRecordCount.decrementLimit();
+uint32_t JournalFile::decrEnqueuedRecordCount() {
+ return enqueuedRecordCount_.decrementLimit();
}
-uint32_t
-JournalFile::subtrEnqueuedRecordCount(const uint32_t s) {
- return enqueuedRecordCount.subtractLimit(s);
+uint32_t JournalFile::subtrEnqueuedRecordCount(const uint32_t s) {
+ return enqueuedRecordCount_.subtractLimit(s);
}
-uint32_t
-JournalFile::getSubmittedDblkCount() const {
- return submittedDblkCount.get();
+uint32_t JournalFile::getSubmittedDblkCount() const {
+ return submittedDblkCount_.get();
}
-uint32_t
-JournalFile::addSubmittedDblkCount(const uint32_t a) {
- return submittedDblkCount.addLimit(a, fileSizeDblks, jerrno::JERR_JNLF_FILEOFFSOVFL);
+uint32_t JournalFile::addSubmittedDblkCount(const uint32_t a) {
+ return submittedDblkCount_.addLimit(a, fileSize_dblks_, jerrno::JERR_JNLF_FILEOFFSOVFL);
}
-uint32_t
-JournalFile::getCompletedDblkCount() const {
- return completedDblkCount.get();
+uint32_t JournalFile::getCompletedDblkCount() const {
+ return completedDblkCount_.get();
}
-uint32_t
-JournalFile::addCompletedDblkCount(const uint32_t a) {
- return completedDblkCount.addLimit(a, submittedDblkCount.get(), jerrno::JERR_JNLF_CMPLOFFSOVFL);
+uint32_t JournalFile::addCompletedDblkCount(const uint32_t a) {
+ return completedDblkCount_.addLimit(a, submittedDblkCount_.get(), jerrno::JERR_JNLF_CMPLOFFSOVFL);
}
uint16_t JournalFile::getOutstandingAioOperationCount() const {
- return outstandingAioOpsCount.get();
+ return outstandingAioOpsCount_.get();
}
uint16_t JournalFile::incrOutstandingAioOperationCount() {
- return outstandingAioOpsCount.increment();
+ return outstandingAioOpsCount_.increment();
}
uint16_t JournalFile::decrOutstandingAioOperationCount() {
- uint16_t r = outstandingAioOpsCount.decrementLimit();
- if (fileCloseFlag && outstandingAioOpsCount == 0) { // Delayed close
+ uint16_t r = outstandingAioOpsCount_.decrementLimit();
+ if (fileCloseFlag_ && outstandingAioOpsCount_ == 0) { // Delayed close
close();
}
return r;
}
-bool
-JournalFile::isEmpty() const {
- return submittedDblkCount == 0;
+// --- Status helper functions ---
+
+bool JournalFile::isEmpty() const {
+ return submittedDblkCount_ == 0;
}
-bool
-JournalFile::isDataEmpty() const {
- return submittedDblkCount <= QLS_JRNL_FHDR_RES_SIZE_SBLKS * JRNL_SBLK_SIZE_DBLKS;
+bool JournalFile::isDataEmpty() const {
+ return submittedDblkCount_ <= QLS_JRNL_FHDR_RES_SIZE_SBLKS * QLS_SBLK_SIZE_DBLKS;
}
-u_int32_t
-JournalFile::dblksRemaining() const {
- return fileSizeDblks - submittedDblkCount;
+u_int32_t JournalFile::dblksRemaining() const {
+ return fileSize_dblks_ - submittedDblkCount_;
}
-bool
-JournalFile::isFull() const {
- return submittedDblkCount == fileSizeDblks;
+bool JournalFile::isFull() const {
+ return submittedDblkCount_ == fileSize_dblks_;
}
-bool
-JournalFile::isFullAndComplete() const {
- return completedDblkCount == fileSizeDblks;
+bool JournalFile::isFullAndComplete() const {
+ return completedDblkCount_ == fileSize_dblks_;
}
-u_int32_t
-JournalFile::getOutstandingAioDblks() const {
- return submittedDblkCount - completedDblkCount;
+u_int32_t JournalFile::getOutstandingAioDblks() const {
+ return submittedDblkCount_ - completedDblkCount_;
}
-bool
-JournalFile::getNextFile() const {
+bool JournalFile::getNextFile() const {
return isFull();
}
-bool
-JournalFile::isNoEnqueuedRecordsRemaining() const {
+bool JournalFile::isNoEnqueuedRecordsRemaining() const {
return !isDataEmpty() && // Must be written to, not empty
- enqueuedRecordCount == 0; // No remaining enqueued records
+ enqueuedRecordCount_ == 0; // No remaining enqueued records
}
-const std::string
-JournalFile::status_str(const uint8_t indentDepth_) const {
- std::string indent((size_t)indentDepth_, '.');
+// debug aid
+const std::string JournalFile::status_str(const uint8_t indentDepth) const {
+ std::string indent((size_t)indentDepth, '.');
std::ostringstream oss;
oss << indent << "JournalFile: fileName=" << getFileName() << std::endl;
oss << indent << " directory=" << getDirectory() << std::endl;
- oss << indent << " fileSizeDblks=" << fileSizeDblks << std::endl;
+ oss << indent << " fileSizeDblks=" << fileSize_dblks_ << std::endl;
oss << indent << " open=" << (isOpen() ? "T" : "F") << std::endl;
- oss << indent << " fileHandle=" << fileHandle << std::endl;
+ oss << indent << " fileHandle=" << fileHandle_ << std::endl;
oss << indent << " enqueuedRecordCount=" << getEnqueuedRecordCount() << std::endl;
oss << indent << " submittedDblkCount=" << getSubmittedDblkCount() << std::endl;
oss << indent << " completedDblkCount=" << getCompletedDblkCount() << std::endl;
Modified: qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/JournalFile.h
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/JournalFile.h?rev=1534383&r1=1534382&r2=1534383&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/JournalFile.h (original)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/JournalFile.h Mon Oct 21 21:26:10 2013
@@ -36,26 +36,27 @@ namespace qls_jrnl {
class JournalFile
{
protected:
- const std::string fqFileName;
- const uint64_t fileSeqNum;
- int fileHandle;
- bool fileCloseFlag;
- void* fileHeaderBasePtr;
- ::file_hdr_t* fileHeaderPtr;
- aio_cb* aioControlBlockPtr;
- uint32_t fileSizeDblks; ///< File size in data blocks, including file header
- AtomicCounter<uint32_t> enqueuedRecordCount; ///< Count of enqueued records
- AtomicCounter<uint32_t> submittedDblkCount; ///< Write file count (data blocks) for submitted AIO
- AtomicCounter<uint32_t> completedDblkCount; ///< Write file count (data blocks) for completed AIO
- AtomicCounter<uint16_t> outstandingAioOpsCount; ///< Outstanding AIO operations on this file
+ const std::string fqFileName_;
+ const uint64_t fileSeqNum_;
+ int fileHandle_;
+ bool fileCloseFlag_;
+ void* fileHeaderBasePtr_;
+ ::file_hdr_t* fileHeaderPtr_;
+ aio_cb* aioControlBlockPtr_;
+ uint32_t fileSize_dblks_; ///< File size in data blocks, including file header
+
+ AtomicCounter<uint32_t> enqueuedRecordCount_; ///< Count of enqueued records
+ AtomicCounter<uint32_t> submittedDblkCount_; ///< Write file count (data blocks) for submitted AIO
+ AtomicCounter<uint32_t> completedDblkCount_; ///< Write file count (data blocks) for completed AIO
+ AtomicCounter<uint16_t> outstandingAioOpsCount_; ///< Outstanding AIO operations on this file
public:
JournalFile(const std::string& fqFileName,
const uint64_t fileSeqNum,
- const uint32_t fileSize_kib);
+ const efpDataSize_kib_t efpDataSize_kib);
virtual ~JournalFile();
- void initialize();
+ void initialize(const uint32_t completedDblkCount);
void finalize();
const std::string getDirectory() const;
@@ -63,8 +64,8 @@ public:
const std::string getFqFileName() const;
uint64_t getFileSeqNum() const;
- int open();
bool isOpen() const;
+ int open();
void close();
void asyncFileHeaderWrite(io_context_t ioContextPtr,
const efpPartitionNumber_t efpPartitionNumber,
@@ -104,7 +105,7 @@ public:
bool getNextFile() const; ///< True when next file is needed
bool isNoEnqueuedRecordsRemaining() const; ///< True when all enqueued records (or parts) have been dequeued
- // Debug aid
+ // debug aid
const std::string status_str(const uint8_t indentDepth) const;
};
Modified: qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/JournalLog.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/JournalLog.cpp?rev=1534383&r1=1534382&r2=1534383&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/JournalLog.cpp (original)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/JournalLog.cpp Mon Oct 21 21:26:10 2013
@@ -25,28 +25,27 @@
namespace qpid {
namespace qls_jrnl {
-JournalLog::JournalLog() {}
+JournalLog::JournalLog(log_level_t logLevelThreshold) : logLevelThreshold_(logLevelThreshold) {}
JournalLog::~JournalLog() {}
-void
-JournalLog::log(log_level_t ll, const std::string& jid, const std::string& log_stmt) const {
- log(ll, jid.c_str(), log_stmt.c_str());
+void JournalLog::log(const log_level_t logLevel,
+ const std::string& logStatement) const {
+ if (logLevel >= logLevelThreshold_) {
+ std::cerr << log_level_str(logLevel) << ": " << logStatement << std::endl;
+ }
}
-void
-JournalLog::log(log_level_t ll, const char* jid, const char* const log_stmt) const {
- if (ll > LOG_ERROR) {
- std::cerr << log_level_str(ll) << ": Journal \"" << jid << "\": " << log_stmt << std::endl;
- } else if (ll >= LOG_INFO) {
- std::cout << log_level_str(ll) << ": Journal \"" << jid << "\": " << log_stmt << std::endl;
+void JournalLog::log(log_level_t logLevel,
+ const std::string& journalId,
+ const std::string& logStatement) const {
+ if (logLevel >= logLevelThreshold_) {
+ std::cerr << log_level_str(logLevel) << ": Journal \"" << journalId << "\": " << logStatement << std::endl;
}
-
}
-const char*
-JournalLog::log_level_str(log_level_t ll) {
- switch (ll)
+const char* JournalLog::log_level_str(log_level_t logLevel) {
+ switch (logLevel)
{
case LOG_TRACE: return "TRACE";
case LOG_DEBUG: return "DEBUG";
Modified: qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/JournalLog.h
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/JournalLog.h?rev=1534383&r1=1534382&r2=1534383&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/JournalLog.h (original)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/JournalLog.h Mon Oct 21 21:26:10 2013
@@ -30,8 +30,7 @@ namespace qls_jrnl {
class JournalLog
{
public:
- typedef enum _log_level
- {
+ typedef enum _log_level {
LOG_TRACE = 0,
LOG_DEBUG,
LOG_INFO,
@@ -42,13 +41,17 @@ public:
} log_level_t;
protected:
- JournalLog();
- virtual ~JournalLog();
+ const log_level_t logLevelThreshold_;
public:
- virtual void log(log_level_t level, const std::string& jid, const std::string& log_stmt) const;
- virtual void log(log_level_t level, const char* jid, const char* const log_stmt) const;
- static const char* log_level_str(log_level_t ll);
+ JournalLog(log_level_t logLevelThreshold);
+ virtual ~JournalLog();
+ virtual void log(const log_level_t logLevel,
+ const std::string& logStatement) const;
+ virtual void log(const log_level_t logLevel,
+ const std::string& journalId,
+ const std::string& logStatement) const;
+ static const char* log_level_str(const log_level_t logLevel);
};
}} // namespace qpid::qls_jrnl
Modified: qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/LinearFileController.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/LinearFileController.cpp?rev=1534383&r1=1534382&r2=1534383&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/LinearFileController.cpp (original)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/LinearFileController.cpp Mon Oct 21 21:26:10 2013
@@ -29,284 +29,265 @@
#include "qpid/linearstore/jrnl/slock.h"
#include "qpid/linearstore/jrnl/utils/file_hdr.h"
-#include <iostream> // DEBUG
-
namespace qpid {
namespace qls_jrnl {
-LinearFileController::LinearFileController(jcntl& jcntlRef_) :
- jcntlRef(jcntlRef_),
- emptyFilePoolPtr(0),
- currentJournalFilePtr(0),
- fileSeqCounter(0),
- recordIdCounter(0)
+LinearFileController::LinearFileController(jcntl& jcntlRef) :
+ jcntlRef_(jcntlRef),
+ emptyFilePoolPtr_(0),
+ currentJournalFilePtr_(0),
+ fileSeqCounter_(0),
+ recordIdCounter_(0)
{}
LinearFileController::~LinearFileController() {}
-void
-LinearFileController::initialize(const std::string& journalDirectory_,
- EmptyFilePool* emptyFilePoolPtr_) {
- journalDirectory.assign(journalDirectory_);
- emptyFilePoolPtr = emptyFilePoolPtr_;
+void LinearFileController::initialize(const std::string& journalDirectory,
+ EmptyFilePool* emptyFilePoolPtr,
+ uint64_t initialFileNumberVal) {
+ journalDirectory_.assign(journalDirectory);
+ emptyFilePoolPtr_ = emptyFilePoolPtr;
+ fileSeqCounter_ = initialFileNumberVal;
}
-void
-LinearFileController::finalize() {
- while (!journalFileList.empty()) {
- delete journalFileList.front();
- journalFileList.pop_front();
+void LinearFileController::finalize() {
+ while (!journalFileList_.empty()) {
+ delete journalFileList_.front();
+ journalFileList_.pop_front();
}
}
-void
-LinearFileController::pullEmptyFileFromEfp() {
- if (currentJournalFilePtr)
- currentJournalFilePtr->close();
- std::string ef = emptyFilePoolPtr->takeEmptyFile(journalDirectory); // Moves file from EFP only, returns new file name
- std::cout << "*** LinearFileController::pullEmptyFileFromEfp() qn=" << jcntlRef.id() << " ef=" << ef << std::endl; // DEBUG
- currentJournalFilePtr = new JournalFile(ef, getNextFileSeqNum(), emptyFilePoolPtr->dataSize_kib());
- currentJournalFilePtr->initialize();
+void LinearFileController::addJournalFile(const std::string& fileName,
+ const uint64_t fileNumber,
+ const uint32_t fileSize_kib,
+ const uint32_t completedDblkCount) {
+ if (currentJournalFilePtr_)
+ currentJournalFilePtr_->close();
+ currentJournalFilePtr_ = new JournalFile(fileName, fileNumber, fileSize_kib);
+ currentJournalFilePtr_->initialize(completedDblkCount);
{
- slock l(journalFileListMutex);
- journalFileList.push_back(currentJournalFilePtr);
+ slock l(journalFileListMutex_);
+ journalFileList_.push_back(currentJournalFilePtr_);
}
- currentJournalFilePtr->open();
+ currentJournalFilePtr_->open();
}
-void
-LinearFileController::purgeFilesToEfp() {
- slock l(journalFileListMutex);
- while (journalFileList.front()->isNoEnqueuedRecordsRemaining()) {
- emptyFilePoolPtr->returnEmptyFile(journalFileList.front());
- delete journalFileList.front();
- journalFileList.pop_front();
- }
+efpDataSize_kib_t LinearFileController::dataSize_kib() const {
+ return emptyFilePoolPtr_->dataSize_kib();
+}
+
+efpDataSize_sblks_t LinearFileController::dataSize_sblks() const {
+ return emptyFilePoolPtr_->dataSize_sblks();
+}
+
+efpFileSize_kib_t LinearFileController::fileSize_kib() const {
+ return emptyFilePoolPtr_->fileSize_kib();
}
-efpDataSize_kib_t
-LinearFileController::dataSize_kib() const {
- return emptyFilePoolPtr->dataSize_kib();
+efpFileSize_sblks_t LinearFileController::fileSize_sblks() const {
+ return emptyFilePoolPtr_->fileSize_sblks();
}
-efpFileSize_kib_t
-LinearFileController::fileSize_kib() const {
- return emptyFilePoolPtr->fileSize_kib();
+uint64_t LinearFileController::getNextRecordId() {
+ return recordIdCounter_.increment();
}
-efpDataSize_sblks_t
-LinearFileController::dataSize_sblks() const {
- return emptyFilePoolPtr->dataSize_sblks();
+void LinearFileController::pullEmptyFileFromEfp() {
+ if (currentJournalFilePtr_)
+ currentJournalFilePtr_->close();
+ std::string ef = emptyFilePoolPtr_->takeEmptyFile(journalDirectory_); // Moves file from EFP only, returns new file name
+//std::cout << "*** LinearFileController::pullEmptyFileFromEfp() qn=" << jcntlRef.id() << " ef=" << ef << std::endl; // DEBUG
+ addJournalFile(ef, getNextFileSeqNum(), emptyFilePoolPtr_->dataSize_kib(), 0);
}
-efpFileSize_sblks_t
-LinearFileController::fileSize_sblks() const {
- return emptyFilePoolPtr->fileSize_sblks();
+void LinearFileController::purgeFilesToEfp() {
+ slock l(journalFileListMutex_);
+ while (journalFileList_.front()->isNoEnqueuedRecordsRemaining()) {
+ emptyFilePoolPtr_->returnEmptyFile(journalFileList_.front()->getFqFileName());
+ delete journalFileList_.front();
+ journalFileList_.pop_front();
+ }
}
-uint64_t
-LinearFileController::getNextRecordId() {
- return recordIdCounter.increment();
+uint32_t LinearFileController::getEnqueuedRecordCount(const efpFileCount_t fileSeqNumber) {
+ slock l(journalFileListMutex_);
+ return find(fileSeqNumber)->getEnqueuedRecordCount();
+}
+
+uint32_t LinearFileController::incrEnqueuedRecordCount(const efpFileCount_t fileSeqNumber) {
+ assertCurrentJournalFileValid("incrEnqueuedRecordCount");
+ return find(fileSeqNumber)->incrEnqueuedRecordCount();
}
-uint32_t
-LinearFileController::decrEnqueuedRecordCount(const efpFileCount_t fileSeqNumber) {
- slock l(journalFileListMutex);
+uint32_t LinearFileController::decrEnqueuedRecordCount(const efpFileCount_t fileSeqNumber) {
+ slock l(journalFileListMutex_);
return find(fileSeqNumber)->decrEnqueuedRecordCount();
}
-uint32_t
-LinearFileController::addWriteCompletedDblkCount(const efpFileCount_t fileSeqNumber, const uint32_t a) {
- slock l(journalFileListMutex);
+uint32_t LinearFileController::addWriteCompletedDblkCount(const efpFileCount_t fileSeqNumber, const uint32_t a) {
+ slock l(journalFileListMutex_);
return find(fileSeqNumber)->addCompletedDblkCount(a);
}
-uint16_t
-LinearFileController::decrOutstandingAioOperationCount(const efpFileCount_t fileSeqNumber) {
- slock l(journalFileListMutex);
+uint16_t LinearFileController::decrOutstandingAioOperationCount(const efpFileCount_t fileSeqNumber) {
+ slock l(journalFileListMutex_);
return find(fileSeqNumber)->decrOutstandingAioOperationCount();
}
-void
-LinearFileController::asyncFileHeaderWrite(io_context_t ioContextPtr,
- const uint16_t userFlags,
- const uint64_t recordId,
- const uint64_t firstRecordOffset) {
- currentJournalFilePtr->asyncFileHeaderWrite(ioContextPtr,
- emptyFilePoolPtr->getPartitionNumber(),
- emptyFilePoolPtr->dataSize_kib(),
- userFlags,
- recordId,
- firstRecordOffset,
- jcntlRef.id());
-}
-
-void
-LinearFileController::asyncPageWrite(io_context_t ioContextPtr,
- aio_cb* aioControlBlockPtr,
- void* data,
- uint32_t dataSize_dblks) {
+void LinearFileController::asyncFileHeaderWrite(io_context_t ioContextPtr,
+ const uint16_t userFlags,
+ const uint64_t recordId,
+ const uint64_t firstRecordOffset) {
+ currentJournalFilePtr_->asyncFileHeaderWrite(ioContextPtr,
+ emptyFilePoolPtr_->getPartitionNumber(),
+ emptyFilePoolPtr_->dataSize_kib(),
+ userFlags,
+ recordId,
+ firstRecordOffset,
+ jcntlRef_.id());
+}
+
+void LinearFileController::asyncPageWrite(io_context_t ioContextPtr,
+ aio_cb* aioControlBlockPtr,
+ void* data,
+ uint32_t dataSize_dblks) {
assertCurrentJournalFileValid("asyncPageWrite");
- currentJournalFilePtr->asyncPageWrite(ioContextPtr, aioControlBlockPtr, data, dataSize_dblks);
+ currentJournalFilePtr_->asyncPageWrite(ioContextPtr, aioControlBlockPtr, data, dataSize_dblks);
}
-uint64_t
-LinearFileController::getCurrentFileSeqNum() const {
+uint64_t LinearFileController::getCurrentFileSeqNum() const {
assertCurrentJournalFileValid("getCurrentFileSeqNum");
- return currentJournalFilePtr->getFileSeqNum();
+ return currentJournalFilePtr_->getFileSeqNum();
}
-uint32_t
-LinearFileController::getEnqueuedRecordCount() const {
+uint32_t LinearFileController::getEnqueuedRecordCount() const {
assertCurrentJournalFileValid("getEnqueuedRecordCount");
- return currentJournalFilePtr->getEnqueuedRecordCount();
+ return currentJournalFilePtr_->getEnqueuedRecordCount();
}
-uint32_t
-LinearFileController::incrEnqueuedRecordCount() {
+uint32_t LinearFileController::incrEnqueuedRecordCount() {
assertCurrentJournalFileValid("incrEnqueuedRecordCount");
- return currentJournalFilePtr->incrEnqueuedRecordCount();
+ return currentJournalFilePtr_->incrEnqueuedRecordCount();
}
-uint32_t
-LinearFileController::addEnqueuedRecordCount(const uint32_t a) {
+uint32_t LinearFileController::addEnqueuedRecordCount(const uint32_t a) {
assertCurrentJournalFileValid("addEnqueuedRecordCount");
- return currentJournalFilePtr->addEnqueuedRecordCount(a);
+ return currentJournalFilePtr_->addEnqueuedRecordCount(a);
}
-uint32_t
-LinearFileController::decrEnqueuedRecordCount() {
+uint32_t LinearFileController::decrEnqueuedRecordCount() {
assertCurrentJournalFileValid("decrEnqueuedRecordCount");
- return currentJournalFilePtr->decrEnqueuedRecordCount();
+ return currentJournalFilePtr_->decrEnqueuedRecordCount();
}
-uint32_t
-LinearFileController::subtrEnqueuedRecordCount(const uint32_t s) {
+uint32_t LinearFileController::subtrEnqueuedRecordCount(const uint32_t s) {
assertCurrentJournalFileValid("subtrEnqueuedRecordCount");
- return currentJournalFilePtr->subtrEnqueuedRecordCount(s);
+ return currentJournalFilePtr_->subtrEnqueuedRecordCount(s);
}
-uint32_t
-LinearFileController::getWriteSubmittedDblkCount() const {
+uint32_t LinearFileController::getWriteSubmittedDblkCount() const {
assertCurrentJournalFileValid("getWriteSubmittedDblkCount");
- return currentJournalFilePtr->getSubmittedDblkCount();
+ return currentJournalFilePtr_->getSubmittedDblkCount();
}
-uint32_t
-LinearFileController::addWriteSubmittedDblkCount(const uint32_t a) {
+uint32_t LinearFileController::addWriteSubmittedDblkCount(const uint32_t a) {
assertCurrentJournalFileValid("addWriteSubmittedDblkCount");
- return currentJournalFilePtr->addSubmittedDblkCount(a);
+ return currentJournalFilePtr_->addSubmittedDblkCount(a);
}
-uint32_t
-LinearFileController::getWriteCompletedDblkCount() const {
+uint32_t LinearFileController::getWriteCompletedDblkCount() const {
assertCurrentJournalFileValid("getWriteCompletedDblkCount");
- return currentJournalFilePtr->getCompletedDblkCount();
+ return currentJournalFilePtr_->getCompletedDblkCount();
}
-uint32_t
-LinearFileController::addWriteCompletedDblkCount(const uint32_t a) {
+uint32_t LinearFileController::addWriteCompletedDblkCount(const uint32_t a) {
assertCurrentJournalFileValid("addWriteCompletedDblkCount");
- return currentJournalFilePtr->addCompletedDblkCount(a);
+ return currentJournalFilePtr_->addCompletedDblkCount(a);
}
-uint16_t
-LinearFileController::getOutstandingAioOperationCount() const {
+uint16_t LinearFileController::getOutstandingAioOperationCount() const {
assertCurrentJournalFileValid("getOutstandingAioOperationCount");
- return currentJournalFilePtr->getOutstandingAioOperationCount();
+ return currentJournalFilePtr_->getOutstandingAioOperationCount();
}
-uint16_t
-LinearFileController::incrOutstandingAioOperationCount() {
+uint16_t LinearFileController::incrOutstandingAioOperationCount() {
assertCurrentJournalFileValid("incrOutstandingAioOperationCount");
- return currentJournalFilePtr->incrOutstandingAioOperationCount();
+ return currentJournalFilePtr_->incrOutstandingAioOperationCount();
}
-uint16_t
-LinearFileController::decrOutstandingAioOperationCount() {
+uint16_t LinearFileController::decrOutstandingAioOperationCount() {
assertCurrentJournalFileValid("decrOutstandingAioOperationCount");
- return currentJournalFilePtr->decrOutstandingAioOperationCount();
+ return currentJournalFilePtr_->decrOutstandingAioOperationCount();
}
-bool
-LinearFileController::isEmpty() const {
+bool LinearFileController::isEmpty() const {
assertCurrentJournalFileValid("isEmpty");
- return currentJournalFilePtr->isEmpty();
+ return currentJournalFilePtr_->isEmpty();
}
-bool
-LinearFileController::isDataEmpty() const {
+bool LinearFileController::isDataEmpty() const {
assertCurrentJournalFileValid("isDataEmpty");
- return currentJournalFilePtr->isDataEmpty();
+ return currentJournalFilePtr_->isDataEmpty();
}
-u_int32_t
-LinearFileController::dblksRemaining() const {
+u_int32_t LinearFileController::dblksRemaining() const {
assertCurrentJournalFileValid("dblksRemaining");
- return currentJournalFilePtr->dblksRemaining();
+ return currentJournalFilePtr_->dblksRemaining();
}
-bool
-LinearFileController::isFull() const {
+bool LinearFileController::isFull() const {
assertCurrentJournalFileValid("isFull");
- return currentJournalFilePtr->isFull();
+ return currentJournalFilePtr_->isFull();
}
-bool
-LinearFileController::isFullAndComplete() const {
+bool LinearFileController::isFullAndComplete() const {
assertCurrentJournalFileValid("isFullAndComplete");
- return currentJournalFilePtr->isFullAndComplete();
+ return currentJournalFilePtr_->isFullAndComplete();
}
-u_int32_t
-LinearFileController::getOutstandingAioDblks() const {
+u_int32_t LinearFileController::getOutstandingAioDblks() const {
assertCurrentJournalFileValid("getOutstandingAioDblks");
- return currentJournalFilePtr->getOutstandingAioDblks();
+ return currentJournalFilePtr_->getOutstandingAioDblks();
}
-bool
-LinearFileController::getNextFile() const {
+bool LinearFileController::needNextFile() const {
assertCurrentJournalFileValid("getNextFile");
- return currentJournalFilePtr->getNextFile();
+ return currentJournalFilePtr_->getNextFile();
}
-const std::string
-LinearFileController::status(const uint8_t indentDepth) const {
+const std::string LinearFileController::status(const uint8_t indentDepth) const {
std::string indent((size_t)indentDepth, '.');
std::ostringstream oss;
- oss << indent << "LinearFileController: queue=" << jcntlRef.id() << std::endl;
- oss << indent << " journalDirectory=" << journalDirectory << std::endl;
- oss << indent << " fileSeqCounter=" << fileSeqCounter.get() << std::endl;
- oss << indent << " recordIdCounter=" << recordIdCounter.get() << std::endl;
- oss << indent << " journalFileList.size=" << journalFileList.size() << std::endl;
+ oss << indent << "LinearFileController: queue=" << jcntlRef_.id() << std::endl;
+ oss << indent << " journalDirectory=" << journalDirectory_ << std::endl;
+ oss << indent << " fileSeqCounter=" << fileSeqCounter_.get() << std::endl;
+ oss << indent << " recordIdCounter=" << recordIdCounter_.get() << std::endl;
+ oss << indent << " journalFileList.size=" << journalFileList_.size() << std::endl;
if (checkCurrentJournalFileValid()) {
- oss << currentJournalFilePtr->status_str(indentDepth+2);
+ oss << currentJournalFilePtr_->status_str(indentDepth+2);
} else {
oss << indent << " <No current journal file>" << std::endl;
}
return oss.str();
}
-// protected
+// --- protected functions ---
-bool
-LinearFileController::checkCurrentJournalFileValid() const {
- return currentJournalFilePtr != 0;
+bool LinearFileController::checkCurrentJournalFileValid() const {
+ return currentJournalFilePtr_ != 0;
}
-void
-LinearFileController::assertCurrentJournalFileValid(const char* const functionName) const {
+void LinearFileController::assertCurrentJournalFileValid(const char* const functionName) const {
if (!checkCurrentJournalFileValid()) {
throw jexception(jerrno::JERR__NULL, "LinearFileController", functionName);
}
}
// NOTE: NOT THREAD SAFE - journalFileList is accessed by multiple threads - use under external lock
-JournalFile*
-LinearFileController::find(const efpFileCount_t fileSeqNumber) {
- if (currentJournalFilePtr != 0 && currentJournalFilePtr->getFileSeqNum() == fileSeqNumber)
- return currentJournalFilePtr;
- for (JournalFileListItr_t i=journalFileList.begin(); i!=journalFileList.end(); ++i) {
+JournalFile* LinearFileController::find(const efpFileCount_t fileSeqNumber) {
+ if (currentJournalFilePtr_ != 0 && currentJournalFilePtr_->getFileSeqNum() == fileSeqNumber)
+ return currentJournalFilePtr_;
+ for (JournalFileListItr_t i=journalFileList_.begin(); i!=journalFileList_.end(); ++i) {
if ((*i)->getFileSeqNum() == fileSeqNumber) {
return *i;
}
@@ -316,9 +297,8 @@ LinearFileController::find(const efpFile
throw jexception(jerrno::JERR_LFCR_SEQNUMNOTFOUND, oss.str(), "LinearFileController", "find");
}
-uint64_t
-LinearFileController::getNextFileSeqNum() {
- return fileSeqCounter.increment();
+uint64_t LinearFileController::getNextFileSeqNum() {
+ return fileSeqCounter_.increment();
}
}} // namespace qpid::qls_jrnl
Modified: qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/LinearFileController.h
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/LinearFileController.h?rev=1534383&r1=1534382&r2=1534383&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/LinearFileController.h (original)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/LinearFileController.h Mon Oct 21 21:26:10 2013
@@ -23,14 +23,16 @@
#define QPID_LINEARSTORE_LINEARFILECONTROLLER_H_
#include <deque>
-#include "qpid/linearstore/jrnl/aio.h"
#include "qpid/linearstore/jrnl/AtomicCounter.h"
#include "qpid/linearstore/jrnl/EmptyFilePoolTypes.h"
-#include "qpid/linearstore/jrnl/smutex.h"
-struct file_hdr_t;
+// libaio forward declares
+typedef struct io_context* io_context_t;
+typedef struct iocb aio_cb;
+
namespace qpid {
namespace qls_jrnl {
+
class EmptyFilePool;
class jcntl;
class JournalFile;
@@ -41,35 +43,44 @@ protected:
typedef std::deque<JournalFile*> JournalFileList_t;
typedef JournalFileList_t::iterator JournalFileListItr_t;
- jcntl& jcntlRef;
- std::string journalDirectory;
- EmptyFilePool* emptyFilePoolPtr;
- JournalFile* currentJournalFilePtr;
- AtomicCounter<uint64_t> fileSeqCounter;
- AtomicCounter<uint64_t> recordIdCounter;
+ jcntl& jcntlRef_;
+ std::string journalDirectory_;
+ EmptyFilePool* emptyFilePoolPtr_;
+ JournalFile* currentJournalFilePtr_;
+ AtomicCounter<uint64_t> fileSeqCounter_;
+ AtomicCounter<uint64_t> recordIdCounter_;
- JournalFileList_t journalFileList;
- smutex journalFileListMutex;
+ JournalFileList_t journalFileList_;
+ smutex journalFileListMutex_;
public:
- LinearFileController(jcntl& jcntlRef_);
+ LinearFileController(jcntl& jcntlRef);
virtual ~LinearFileController();
- void initialize(const std::string& journalDirectory_, EmptyFilePool* emptyFilePoolPtr_);
+ void initialize(const std::string& journalDirectory,
+ EmptyFilePool* emptyFilePoolPtr,
+ uint64_t initialFileNumberVal);
void finalize();
- void pullEmptyFileFromEfp();
- void purgeFilesToEfp();
+ void addJournalFile(const std::string& fileName,
+ const uint64_t fileNumber,
+ const uint32_t fileSize_kib,
+ const uint32_t completedDblkCount);
+
efpDataSize_kib_t dataSize_kib() const;
- efpFileSize_kib_t fileSize_kib() const;
efpDataSize_sblks_t dataSize_sblks() const;
+ efpFileSize_kib_t fileSize_kib() const;
efpFileSize_sblks_t fileSize_sblks() const;
-
uint64_t getNextRecordId();
+ void pullEmptyFileFromEfp();
+ void purgeFilesToEfp();
- // Functions for manipulating counts of non-current JournalFile instances in journalFileList
+ // Functions for manipulating counts of non-current JournalFile instances in journalFileList_
+ uint32_t getEnqueuedRecordCount(const efpFileCount_t fileSeqNumber);
+ uint32_t incrEnqueuedRecordCount(const efpFileCount_t fileSeqNumber);
uint32_t decrEnqueuedRecordCount(const efpFileCount_t fileSeqNumber);
- uint32_t addWriteCompletedDblkCount(const efpFileCount_t fileSeqNumber, const uint32_t a);
+ uint32_t addWriteCompletedDblkCount(const efpFileCount_t fileSeqNumber,
+ const uint32_t a);
uint16_t decrOutstandingAioOperationCount(const efpFileCount_t fileSeqNumber);
// Pass-through functions for JournalFile class
@@ -106,18 +117,20 @@ public:
bool isFull() const; // True if all possible dblks have been submitted (but may not yet have returned from AIO)
bool isFullAndComplete() const; // True if all submitted dblks have returned from AIO
u_int32_t getOutstandingAioDblks() const; // Dblks still to be written
- bool getNextFile() const; // True when next file is needed
+ bool needNextFile() const; // True when next file is needed
// Debug aid
const std::string status(const uint8_t indentDepth) const;
protected:
- bool checkCurrentJournalFileValid() const;
void assertCurrentJournalFileValid(const char* const functionName) const;
- JournalFile* find(const efpFileCount_t fileSeqNumber); // NOT THREAD SAFE - use under external lock
+ bool checkCurrentJournalFileValid() const;
+ JournalFile* find(const efpFileCount_t fileSeqNumber);
uint64_t getNextFileSeqNum();
};
+typedef void (LinearFileController::*lfcAddJournalFileFn)(const std::string&, const uint64_t, const uint32_t, const uint32_t);
+
}} // namespace qpid::qls_jrnl
#endif // QPID_LINEARSTORE_LINEARFILECONTROLLER_H_
Modified: qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/RecoveryManager.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/RecoveryManager.cpp?rev=1534383&r1=1534382&r2=1534383&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/RecoveryManager.cpp (original)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/RecoveryManager.cpp Mon Oct 21 21:26:10 2013
@@ -21,79 +21,625 @@
#include "qpid/linearstore/jrnl/RecoveryManager.h"
+#include <algorithm>
+#include <cstdlib>
#include <iomanip>
+#include "qpid/linearstore/jrnl/data_tok.h"
+#include "qpid/linearstore/jrnl/deq_rec.h"
+#include "qpid/linearstore/jrnl/EmptyFilePoolManager.h"
+#include "qpid/linearstore/jrnl/enq_map.h"
+#include "qpid/linearstore/jrnl/enq_rec.h"
#include "qpid/linearstore/jrnl/jcfg.h"
+#include "qpid/linearstore/jrnl/jdir.h"
+#include "qpid/linearstore/jrnl/JournalLog.h"
+#include "qpid/linearstore/jrnl/jrec.h"
+#include "qpid/linearstore/jrnl/LinearFileController.h"
+#include "qpid/linearstore/jrnl/txn_map.h"
+#include "qpid/linearstore/jrnl/txn_rec.h"
+#include "qpid/linearstore/jrnl/utils/enq_hdr.h"
+#include "qpid/linearstore/jrnl/utils/file_hdr.h"
#include <sstream>
+#include <string>
+#include <vector>
namespace qpid {
namespace qls_jrnl
{
-RecoveryManager::RecoveryManager() : _journalFileList(),
- _fileNumberNameMap(),
- _enqueueCountList(),
- _journalEmptyFlag(false),
- _firstRecordOffset(0),
- _endOffset(0),
- _highestRecordId(0ULL),
- _lastFileFullFlag(false),
- _currentRid(0ULL),
- _currentFileNumber(0ULL),
- _currentFileName(),
- _fileSize(0),
- _recordStart(0),
- _inFileStream(),
- _readComplete(false)
+RecoveryManager::RecoveryManager(const std::string& journalDirectory,
+ const std::string& queuename,
+ enq_map& enqueueMapRef,
+ txn_map& transactionMapRef,
+ JournalLog& journalLogRef) :
+ journalDirectory_(journalDirectory),
+ queueName_(queuename),
+ enqueueMapRef_(enqueueMapRef),
+ transactionMapRef_(transactionMapRef),
+ journalLogRef_(journalLogRef),
+ journalEmptyFlag_(false),
+ firstRecordOffset_(0),
+ endOffset_(0),
+ highestRecordId_(0ULL),
+ highestFileNumber_(0ULL),
+ lastFileFullFlag_(false),
+ fileSize_kib_(0)
{}
RecoveryManager::~RecoveryManager() {}
-std::string
-RecoveryManager::toString(const std::string& jid,
- bool compact) {
+void RecoveryManager::analyzeJournals(const std::vector<std::string>* preparedTransactionListPtr,
+ EmptyFilePoolManager* emptyFilePoolManager,
+ EmptyFilePool** emptyFilePoolPtrPtr) {
+ // Analyze file headers of existing journal files
+ efpIdentity_t efpIdentity;
+ analyzeJournalFileHeaders(efpIdentity);
+ *emptyFilePoolPtrPtr = emptyFilePoolManager->getEmptyFilePool(efpIdentity);
+ fileSize_kib_ = (*emptyFilePoolPtrPtr)->fileSize_kib();
+ // Check for file full condition
+ lastFileFullFlag_ = endOffset_ == (std::streamoff)(*emptyFilePoolPtrPtr)->fileSize_kib() * 1024;
+
+ // Restore all read and write pointers and transactions
+ if (!journalEmptyFlag_) {
+ while (getNextRecordHeader()) {
+
+ }
+ if (inFileStream_.is_open()) {
+ inFileStream_.close();
+ }
+ // Remove leading files which have no enqueued records
+ removeEmptyFiles(*emptyFilePoolPtrPtr);
+
+ // Remove all txns from tmap that are not in the prepared list
+ if (preparedTransactionListPtr) {
+ std::vector<std::string> xidList;
+ transactionMapRef_.xid_list(xidList);
+ for (std::vector<std::string>::iterator itr = xidList.begin(); itr != xidList.end(); itr++) {
+ std::vector<std::string>::const_iterator pitr =
+ std::find(preparedTransactionListPtr->begin(), preparedTransactionListPtr->end(), *itr);
+ if (pitr == preparedTransactionListPtr->end()) { // not found in prepared list
+ txn_data_list tdl = transactionMapRef_.get_remove_tdata_list(*itr); // tdl will be empty if xid not found
+ // Unlock any affected enqueues in emap
+ for (tdl_itr i=tdl.begin(); i<tdl.end(); i++) {
+ if (i->_enq_flag) { // enq op - decrement enqueue count
+ enqueueCountList_[i->_pfid]--;
+ } else if (enqueueMapRef_.is_enqueued(i->_drid, true)) { // deq op - unlock enq record
+ int16_t ret = enqueueMapRef_.unlock(i->_drid);
+ if (ret < enq_map::EMAP_OK) { // fail
+ // enq_map::unlock()'s only error is enq_map::EMAP_RID_NOT_FOUND
+ std::ostringstream oss;
+ oss << std::hex << "_emap.unlock(): drid=0x\"" << i->_drid;
+ throw jexception(jerrno::JERR_MAP_NOTFOUND, oss.str(), "RecoveryManager", "analyzeJournals");
+ }
+ }
+ }
+ }
+ }
+ }
+ enqueueMapRef_.rid_list(recordIdList_);
+ recordIdListConstItr_ = recordIdList_.begin();
+ }
+}
+
+std::streamoff RecoveryManager::getEndOffset() const {
+ return endOffset_;
+}
+
+uint64_t RecoveryManager::getHighestFileNumber() const {
+ return highestFileNumber_;
+}
+
+uint64_t RecoveryManager::getHighestRecordId() const {
+ return highestRecordId_;
+}
+
+bool RecoveryManager::isLastFileFull() const {
+ return lastFileFullFlag_;
+}
+
+bool RecoveryManager::readNextRemainingRecord(void** const dataPtrPtr,
+ std::size_t& dataSize,
+ void** const xidPtrPtr,
+ std::size_t& xidSize,
+ bool& transient,
+ bool& external,
+ data_tok* const dtokp,
+ bool /*ignore_pending_txns*/) {
+ if (!dtokp->is_readable()) {
+ std::ostringstream oss;
+ oss << std::hex << std::setfill('0') << "dtok_id=0x" << std::setw(8) << dtokp->id();
+ oss << "; dtok_rid=0x" << std::setw(16) << dtokp->rid() << "; dtok_wstate=" << dtokp->wstate_str();
+ throw jexception(jerrno::JERR_JCNTL_ENQSTATE, oss.str(), "RecoveryManager", "readNextRemainingRecord");
+ }
+ if (recordIdListConstItr_ == recordIdList_.end()) {
+ return false;
+ }
+ enq_map::emap_data_struct_t eds;
+ enqueueMapRef_.get_data(*recordIdListConstItr_, eds);
+ uint64_t fileNumber = eds._pfid;
+ currentJournalFileConstItr_ = fileNumberNameMap_.find(fileNumber);
+ getNextFile(false);
+
+ inFileStream_.seekg(eds._file_posn, std::ifstream::beg);
+ if (!inFileStream_.good()) {
+ std::ostringstream oss;
+ oss << "Could not find offset 0x" << std::hex << eds._file_posn << " in file " << getCurrentFileName();
+ throw jexception(jerrno::JERR__FILEIO, oss.str(), "RecoveryManager", "readNextRemainingRecord");
+ }
+ ::enq_hdr_t enqueueHeader;
+ inFileStream_.read((char*)&enqueueHeader, sizeof(::enq_hdr_t));
+ if (inFileStream_.gcount() != sizeof(::enq_hdr_t)) {
+ std::ostringstream oss;
+ oss << "Could not read enqueue header from file " << getCurrentFileName() << " at offset 0x" << std::hex << eds._file_posn;
+ throw jexception(jerrno::JERR__FILEIO, oss.str(), "RecoveryManager", "readNextRemainingRecord");
+ }
+ // check flags
+ transient = ::is_enq_transient(&enqueueHeader);
+ external = ::is_enq_external(&enqueueHeader);
+
+ // read xid
+ xidSize = enqueueHeader._xidsize;
+ *xidPtrPtr = ::malloc(xidSize);
+ if (*xidPtrPtr == 0) {
+ std::ostringstream oss;
+ oss << "xidPtr, size=0x" << std::hex << xidSize;
+ throw jexception(jerrno::JERR__MALLOC, oss.str(), "RecoveryManager", "readNextRemainingRecord");
+ }
+ readJournalData((char*)*xidPtrPtr, xidSize);
+
+ // read data
+ dataSize = enqueueHeader._dsize;
+ *dataPtrPtr = ::malloc(dataSize);
+ if (*xidPtrPtr == 0) {
+ std::ostringstream oss;
+ oss << "dataPtr, size=0x" << std::hex << dataSize;
+ throw jexception(jerrno::JERR__MALLOC, oss.str(), "RecoveryManager", "readNextRemainingRecord");
+ }
+ readJournalData((char*)*dataPtrPtr, dataSize);
+ return true;
+}
+
+void RecoveryManager::setLinearFileControllerJournals(lfcAddJournalFileFn fnPtr,
+ LinearFileController* lfcPtr) {
+//std::cout << "****** RecoveryManager::setLinearFileControllerJournals():" << std::endl; // DEBUG
+ for (fileNumberNameMapConstItr_t i = fileNumberNameMap_.begin(); i != fileNumberNameMap_.end(); ++i) {
+ uint32_t fileDblkCount = i->first == highestFileNumber_ ? // Is this this last file?
+ endOffset_ / QLS_DBLK_SIZE_BYTES : // Last file uses _endOffset
+ fileSize_kib_ * 1024 / QLS_DBLK_SIZE_BYTES; // All others use file size to make them full
+ (lfcPtr->*fnPtr)(i->second, i->first, fileSize_kib_, fileDblkCount);
+//std::cout << " ** f=" << i->second.substr(i->second.rfind('/')+1) << ",fn=" << i->first << ",s=" << _fileSize_kib << ",eo=" << fileDblkCount << "(" << (fileDblkCount * QLS_DBLK_SIZE_BYTES / 1024) << "kiB)" << std::endl; // DEBUG
+ }
+}
+
+std::string RecoveryManager::toString(const std::string& jid,
+ bool compact) {
std::ostringstream oss;
if (compact) {
oss << "Recovery journal analysis (jid=\"" << jid << "\"):";
oss << " jfl=[";
- for (std::map<uint64_t, std::string>::const_iterator i=_fileNumberNameMap.begin(); i!=_fileNumberNameMap.end(); ++i) {
- if (i!=_fileNumberNameMap.begin()) oss << " ";
+ for (fileNumberNameMapConstItr_t i=fileNumberNameMap_.begin(); i!=fileNumberNameMap_.end(); ++i) {
+ if (i!=fileNumberNameMap_.begin()) oss << " ";
oss << i->first << ":" << i->second.substr(i->second.rfind('/')+1);
}
oss << "] ecl=[ ";
- for (std::vector<uint32_t>::const_iterator j = _enqueueCountList.begin(); j!=_enqueueCountList.end(); ++j) {
- if (j != _enqueueCountList.begin()) oss << " ";
+ for (enqueueCountListConstItr_t j = enqueueCountList_.begin(); j!=enqueueCountList_.end(); ++j) {
+ if (j != enqueueCountList_.begin()) oss << " ";
oss << *j;
}
- oss << " ] empty=" << (_journalEmptyFlag ? "T" : "F");
- oss << " fro=0x" << std::hex << _firstRecordOffset << std::dec << " (" << (_firstRecordOffset/JRNL_DBLK_SIZE_BYTES) << " dblks)";
- oss << " eo=0x" << std::hex << _endOffset << std::dec << " (" << (_endOffset/JRNL_DBLK_SIZE_BYTES) << " dblks)";
- oss << " hrid=0x" << std::hex << _highestRecordId << std::dec;
- oss << " lffull=" << (_lastFileFullFlag ? "T" : "F");
+ oss << " ] empty=" << (journalEmptyFlag_ ? "T" : "F");
+ oss << " fro=0x" << std::hex << firstRecordOffset_ << std::dec << " (" << (firstRecordOffset_/QLS_DBLK_SIZE_BYTES) << " dblks)";
+ oss << " eo=0x" << std::hex << endOffset_ << std::dec << " (" << (endOffset_/QLS_DBLK_SIZE_BYTES) << " dblks)";
+ oss << " hrid=0x" << std::hex << highestRecordId_ << std::dec;
+ oss << " hfnum=0x" << std::hex << highestFileNumber_ << std::dec;
+ oss << " lffull=" << (lastFileFullFlag_ ? "T" : "F");
} else {
oss << "Recovery journal analysis (jid=\"" << jid << "\"):" << std::endl;
- oss << " Number of journal files = " << _fileNumberNameMap.size() << std::endl;
+ oss << " Number of journal files = " << fileNumberNameMap_.size() << std::endl;
oss << " Journal File List:" << std::endl;
- for (std::map<uint64_t, std::string>::const_iterator i=_fileNumberNameMap.begin(); i!=_fileNumberNameMap.end(); ++i) {
+ for (fileNumberNameMapConstItr_t i=fileNumberNameMap_.begin(); i!=fileNumberNameMap_.end(); ++i) {
oss << " " << i->first << ": " << i->second.substr(i->second.rfind('/')+1) << std::endl;
}
oss << " Enqueue Counts: [ " << std::endl;
- for (std::vector<uint32_t>::const_iterator j = _enqueueCountList.begin(); j!=_enqueueCountList.end(); ++j) {
- if (j != _enqueueCountList.begin()) oss << ", ";
+ for (enqueueCountListConstItr_t j = enqueueCountList_.begin(); j!=enqueueCountList_.end(); ++j) {
+ if (j != enqueueCountList_.begin()) oss << ", ";
oss << *j;
}
oss << " ]" << std::endl;
- for (unsigned i=0; i<_enqueueCountList.size(); i++)
- oss << " File " << std::setw(2) << i << ": " << _enqueueCountList[i] << std::endl;
- oss << " Journal empty (_jempty) = " << (_journalEmptyFlag ? "TRUE" : "FALSE") << std::endl;
- oss << " First record offset in first fid (_fro) = 0x" << std::hex << _firstRecordOffset <<
- std::dec << " (" << (_firstRecordOffset/JRNL_DBLK_SIZE_BYTES) << " dblks)" << std::endl;
- oss << " End offset (_eo) = 0x" << std::hex << _endOffset << std::dec << " (" <<
- (_endOffset/JRNL_DBLK_SIZE_BYTES) << " dblks)" << std::endl;
- oss << " Highest rid (_h_rid) = 0x" << std::hex << _highestRecordId << std::dec << std::endl;
- oss << " Last file full (_lffull) = " << (_lastFileFullFlag ? "TRUE" : "FALSE") << std::endl;
+ oss << " Journal empty = " << (journalEmptyFlag_ ? "TRUE" : "FALSE") << std::endl;
+ oss << " First record offset in first file = 0x" << std::hex << firstRecordOffset_ <<
+ std::dec << " (" << (firstRecordOffset_/QLS_DBLK_SIZE_BYTES) << " dblks)" << std::endl;
+ oss << " End offset = 0x" << std::hex << endOffset_ << std::dec << " (" <<
+ (endOffset_/QLS_DBLK_SIZE_BYTES) << " dblks)" << std::endl;
+ oss << " Highest rid = 0x" << std::hex << highestRecordId_ << std::dec << std::endl;
+ oss << " Highest file number = 0x" << std::hex << highestFileNumber_ << std::dec << std::endl;
+ oss << " Last file full = " << (lastFileFullFlag_ ? "TRUE" : "FALSE") << std::endl;
oss << " Enqueued records (txn & non-txn):" << std::endl;
}
return oss.str();
}
+// --- protected functions ---
+
+void RecoveryManager::analyzeJournalFileHeaders(efpIdentity_t& efpIdentity) {
+ std::string headerQueueName;
+ ::file_hdr_t fileHeader;
+ directoryList_t directoryList;
+ jdir::read_dir(journalDirectory_, directoryList, false, true, false, true);
+ for (directoryListConstItr_t i = directoryList.begin(); i != directoryList.end(); ++i) {
+ readJournalFileHeader(*i, fileHeader, headerQueueName);
+ if (headerQueueName.compare(queueName_) != 0) {
+ std::ostringstream oss;
+ oss << "Journal file " << (*i) << " belongs to queue \"" << headerQueueName << "\": ignoring";
+ journalLogRef_.log(JournalLog::LOG_WARN, queueName_, oss.str());
+ } else {
+ fileNumberNameMap_[fileHeader._file_number] = *i;
+ if (fileHeader._file_number > highestFileNumber_) {
+ highestFileNumber_ = fileHeader._file_number;
+ }
+ }
+ }
+ efpIdentity.first = fileHeader._efp_partition;
+ efpIdentity.second = fileHeader._file_size_kib;
+ enqueueCountList_.resize(fileNumberNameMap_.size(), 0);
+ currentJournalFileConstItr_ = fileNumberNameMap_.begin();
+}
+
+void RecoveryManager::checkFileStreamOk(bool checkEof) {
+ if (inFileStream_.fail() || inFileStream_.bad() || checkEof ? inFileStream_.eof() : false) {
+ throw jexception("read failure"); // TODO complete exception
+ }
+}
+
+void RecoveryManager::checkJournalAlignment(const std::streampos recordPosition) {
+ std::streampos currentPosn = recordPosition;
+ unsigned sblkOffset = currentPosn % QLS_SBLK_SIZE_BYTES;
+ if (sblkOffset)
+ {
+ std::ostringstream oss1;
+ oss1 << std::hex << "Bad record alignment found at fid=0x" << getCurrentFileNumber();
+ oss1 << " offs=0x" << currentPosn << " (likely journal overwrite boundary); " << std::dec;
+ oss1 << (QLS_SBLK_SIZE_DBLKS - (sblkOffset/QLS_DBLK_SIZE_BYTES)) << " filler record(s) required.";
+ journalLogRef_.log(JournalLog::LOG_WARN, queueName_, oss1.str());
+
+ std::ofstream outFileStream(getCurrentFileName().c_str(), std::ios_base::in | std::ios_base::out | std::ios_base::binary);
+ if (!outFileStream.good()) {
+ throw jexception(jerrno::JERR__FILEIO, getCurrentFileName(), "RecoveryManager", "checkJournalAlignment");
+ }
+ outFileStream.seekp(currentPosn);
+
+ // Prepare write buffer containing a single empty record (1 dblk)
+ void* writeBuffer = std::malloc(QLS_DBLK_SIZE_BYTES);
+ if (writeBuffer == 0) {
+ throw jexception(jerrno::JERR__MALLOC, "RecoveryManager", "checkJournalAlignment");
+ }
+ const uint32_t xmagic = QLS_EMPTY_MAGIC;
+ ::memcpy(writeBuffer, (const void*)&xmagic, sizeof(xmagic));
+ ::memset((char*)writeBuffer + sizeof(xmagic), QLS_CLEAN_CHAR, QLS_DBLK_SIZE_BYTES - sizeof(xmagic));
+
+ // Write as many empty records as are needed to get to sblk boundary
+ while (currentPosn % QLS_SBLK_SIZE_BYTES) {
+ outFileStream.write((const char*)writeBuffer, QLS_DBLK_SIZE_BYTES);
+ if (outFileStream.fail()) {
+ throw jexception(jerrno::JERR_RCVM_WRITE, "RecoveryManager", "checkJournalAlignment");
+ }
+ std::ostringstream oss2;
+ oss2 << std::hex << "Recover phase write: Wrote filler record: fid=0x" << getCurrentFileNumber();
+ oss2 << " offs=0x" << currentPosn;
+ journalLogRef_.log(JournalLog::LOG_NOTICE, queueName_, oss2.str());
+ currentPosn = outFileStream.tellp();
+ }
+ outFileStream.close();
+ std::free(writeBuffer);
+ journalLogRef_.log(JournalLog::LOG_INFO, queueName_, "Bad record alignment fixed.");
+ }
+ endOffset_ = currentPosn;
+}
+
+bool RecoveryManager::decodeRecord(jrec& record,
+ std::size_t& cumulativeSizeRead,
+ ::rec_hdr_t& headerRecord,
+ std::streampos& fileOffset)
+{
+// uint16_t start_fid = getCurrentFileNumber();
+ std::streampos start_file_offs = fileOffset;
+
+ if (highestRecordId_ == 0) {
+ highestRecordId_ = headerRecord._rid;
+ } else if (headerRecord._rid - highestRecordId_ < 0x8000000000000000ULL) { // RFC 1982 comparison for unsigned 64-bit
+ highestRecordId_ = headerRecord._rid;
+ }
+
+ bool done = false;
+ while (!done) {
+ try {
+ done = record.rcv_decode(headerRecord, &inFileStream_, cumulativeSizeRead);
+ }
+ catch (const jexception& e) {
+// TODO - review this logic and tidy up how rd._lfid is assigned. See new jinf.get_end_file() fn.
+// Original
+// if (e.err_code() != jerrno::JERR_JREC_BADRECTAIL ||
+// fid != (rd._ffid ? rd._ffid - 1 : _num_jfiles - 1)) throw;
+// Tried this, but did not work
+// if (e.err_code() != jerrno::JERR_JREC_BADRECTAIL || h._magic != 0) throw;
+ checkJournalAlignment(start_file_offs);
+// rd._lfid = start_fid;
+ return false;
+ }
+ if (!done && !getNextFile(false)) {
+ checkJournalAlignment(start_file_offs);
+ return false;
+ }
+ }
+ return true;
+}
+
+std::string RecoveryManager::getCurrentFileName() const {
+ return currentJournalFileConstItr_->second;
+}
+
+uint64_t RecoveryManager::getCurrentFileNumber() const {
+ return currentJournalFileConstItr_->first;
+}
+
+bool RecoveryManager::getNextFile(bool jumpToFirstRecordOffsetFlag) {
+ if (inFileStream_.is_open()) {
+ if (inFileStream_.eof() || !inFileStream_.good())
+ {
+ inFileStream_.clear();
+ endOffset_ = inFileStream_.tellg(); // remember file offset before closing
+ if (endOffset_ == -1) { throw jexception("tellg() failure"); } // Check for error code -1 TODO: compelete exception
+ inFileStream_.close();
+ if (++currentJournalFileConstItr_ == fileNumberNameMap_.end()) {
+ return false;
+ }
+ }
+ }
+ if (!inFileStream_.is_open())
+ {
+ inFileStream_.clear(); // clear eof flag, req'd for older versions of c++
+ inFileStream_.open(getCurrentFileName().c_str(), std::ios_base::in | std::ios_base::binary);
+ if (!inFileStream_.good()) {
+ throw jexception(jerrno::JERR__FILEIO, getCurrentFileName(), "RecoveryManager", "getNextFile");
+ }
+
+ // Read file header
+//std::cout << " F" << getCurrentFileNumber() << std::flush; // DEBUG
+ file_hdr_t fhdr;
+ inFileStream_.read((char*)&fhdr, sizeof(fhdr));
+ checkFileStreamOk(true);
+ if (fhdr._rhdr._magic == QLS_FILE_MAGIC) {
+ firstRecordOffset_ = fhdr._fro;
+ std::streamoff foffs = jumpToFirstRecordOffsetFlag ? firstRecordOffset_ : QLS_SBLK_SIZE_BYTES;
+ inFileStream_.seekg(foffs);
+ } else {
+ inFileStream_.close();
+ if (currentJournalFileConstItr_ == fileNumberNameMap_.begin()) {
+ journalEmptyFlag_ = true;
+ }
+ return false;
+ }
+ }
+ return true;
+}
+
+bool RecoveryManager::getNextRecordHeader()
+{
+ std::size_t cum_size_read = 0;
+ void* xidp = 0;
+ rec_hdr_t h;
+
+ bool hdr_ok = false;
+ std::streampos file_pos;
+ while (!hdr_ok) {
+ if (!inFileStream_.is_open()) {
+ if (!getNextFile(true)) {
+ return false;
+ }
+ }
+ file_pos = inFileStream_.tellg();
+//std::cout << " 0x" << std::hex << file_pos << std::dec; // DEBUG
+ inFileStream_.read((char*)&h, sizeof(rec_hdr_t));
+ if (inFileStream_.gcount() == sizeof(rec_hdr_t)) {
+ hdr_ok = true;
+ } else {
+ if (!getNextFile(true)) {
+ return false;
+ }
+ }
+ }
+
+ switch(h._magic) {
+ case QLS_ENQ_MAGIC:
+ {
+//std::cout << ".e" << std::flush; // DEBUG
+ enq_rec er;
+ uint64_t start_fid = getCurrentFileNumber(); // fid may increment in decode() if record folds over file boundary
+ if (!decodeRecord(er, cum_size_read, h, file_pos)) {
+ return false;
+ }
+ if (!er.is_transient()) { // Ignore transient msgs
+ enqueueCountList_[start_fid]++;
+ if (er.xid_size()) {
+ er.get_xid(&xidp);
+ if (xidp != 0) { throw jexception("Null xid with non-null xid_size"); } // TODO complete exception
+ std::string xid((char*)xidp, er.xid_size());
+ transactionMapRef_.insert_txn_data(xid, txn_data(h._rid, 0, start_fid, true));
+ if (transactionMapRef_.set_aio_compl(xid, h._rid) < txn_map::TMAP_OK) { // fail - xid or rid not found
+ std::ostringstream oss;
+ oss << std::hex << "_tmap.set_aio_compl: txn_enq xid=\"" << xid << "\" rid=0x" << h._rid;
+ throw jexception(jerrno::JERR_MAP_NOTFOUND, oss.str(), "RecoveryManager", "getNextRecordHeader");
+ }
+ std::free(xidp);
+ } else {
+ if (enqueueMapRef_.insert_pfid(h._rid, start_fid, file_pos) < enq_map::EMAP_OK) { // fail
+ // The only error code emap::insert_pfid() returns is enq_map::EMAP_DUP_RID.
+ std::ostringstream oss;
+ oss << std::hex << "rid=0x" << h._rid << " _pfid=0x" << start_fid;
+ throw jexception(jerrno::JERR_MAP_DUPLICATE, oss.str(), "RecoveryManager", "getNextRecordHeader");
+ }
+ }
+ }
+ }
+ break;
+ case QLS_DEQ_MAGIC:
+ {
+//std::cout << ".d" << std::flush; // DEBUG
+ deq_rec dr;
+ uint16_t start_fid = getCurrentFileNumber(); // fid may increment in decode() if record folds over file boundary
+ if (!decodeRecord(dr, cum_size_read, h, file_pos)) {
+ return false;
+ }
+ if (dr.xid_size()) {
+ // If the enqueue is part of a pending txn, it will not yet be in emap
+ enqueueMapRef_.lock(dr.deq_rid()); // ignore not found error
+ dr.get_xid(&xidp);
+ if (xidp != 0) { throw jexception("Null xid with non-null xid_size"); } // TODO complete exception
+ std::string xid((char*)xidp, dr.xid_size());
+ transactionMapRef_.insert_txn_data(xid, txn_data(dr.rid(), dr.deq_rid(), start_fid, false,
+ dr.is_txn_coml_commit()));
+ if (transactionMapRef_.set_aio_compl(xid, dr.rid()) < txn_map::TMAP_OK) { // fail - xid or rid not found
+ std::ostringstream oss;
+ oss << std::hex << "_tmap.set_aio_compl: txn_deq xid=\"" << xid << "\" rid=0x" << dr.rid();
+ throw jexception(jerrno::JERR_MAP_NOTFOUND, oss.str(), "RecoveryManager", "getNextRecordHeader");
+ }
+ std::free(xidp);
+ } else {
+ uint64_t enq_fid;
+ if (enqueueMapRef_.get_remove_pfid(dr.deq_rid(), enq_fid, true) == enq_map::EMAP_OK) { // ignore not found error
+ enqueueCountList_[enq_fid]--;
+ }
+ }
+ }
+ break;
+ case QLS_TXA_MAGIC:
+ {
+//std::cout << ".a" << std::flush; // DEBUG
+ txn_rec ar;
+ if (!decodeRecord(ar, cum_size_read, h, file_pos)) {
+ return false;
+ }
+ // Delete this txn from tmap, unlock any locked records in emap
+ ar.get_xid(&xidp);
+ if (xidp != 0) {
+ throw jexception("Null xid with non-null xid_size"); // TODO complete exception
+ }
+ std::string xid((char*)xidp, ar.xid_size());
+ txn_data_list tdl = transactionMapRef_.get_remove_tdata_list(xid); // tdl will be empty if xid not found
+ for (tdl_itr itr = tdl.begin(); itr != tdl.end(); itr++) {
+ if (itr->_enq_flag) {
+ enqueueCountList_[itr->_pfid]--;
+ } else {
+ enqueueMapRef_.unlock(itr->_drid); // ignore not found error
+ }
+ }
+ std::free(xidp);
+ }
+ break;
+ case QLS_TXC_MAGIC:
+ {
+//std::cout << ".t" << std::flush; // DEBUG
+ txn_rec cr;
+ if (!decodeRecord(cr, cum_size_read, h, file_pos)) {
+ return false;
+ }
+ // Delete this txn from tmap, process records into emap
+ cr.get_xid(&xidp);
+ if (xidp != 0) {
+ throw jexception("Null xid with non-null xid_size"); // TODO complete exception
+ }
+ std::string xid((char*)xidp, cr.xid_size());
+ txn_data_list tdl = transactionMapRef_.get_remove_tdata_list(xid); // tdl will be empty if xid not found
+ for (tdl_itr itr = tdl.begin(); itr != tdl.end(); itr++) {
+ if (itr->_enq_flag) { // txn enqueue
+ if (enqueueMapRef_.insert_pfid(itr->_rid, itr->_pfid, file_pos) < enq_map::EMAP_OK) { // fail
+ // The only error code emap::insert_pfid() returns is enq_map::EMAP_DUP_RID.
+ std::ostringstream oss;
+ oss << std::hex << "rid=0x" << itr->_rid << " _pfid=0x" << itr->_pfid;
+ throw jexception(jerrno::JERR_MAP_DUPLICATE, oss.str(), "RecoveryManager", "getNextRecordHeader");
+ }
+ } else { // txn dequeue
+ uint64_t enq_fid;
+ if (enqueueMapRef_.get_remove_pfid(itr->_drid, enq_fid, true) == enq_map::EMAP_OK) // ignore not found error
+ enqueueCountList_[enq_fid]--;
+ }
+ }
+ std::free(xidp);
+ }
+ break;
+ case QLS_EMPTY_MAGIC:
+ {
+//std::cout << ".x" << std::flush; // DEBUG
+ uint32_t rec_dblks = jrec::size_dblks(sizeof(::rec_hdr_t));
+ inFileStream_.ignore(rec_dblks * QLS_DBLK_SIZE_BYTES - sizeof(::rec_hdr_t));
+ checkFileStreamOk(false);
+ if (!getNextFile(false)) {
+ return false;
+ }
+ }
+ break;
+ case 0:
+//std::cout << ".0" << std::endl << std::flush; // DEBUG
+ checkJournalAlignment(file_pos);
+ return false;
+ default:
+//std::cout << ".?" << std::endl << std::flush; // DEBUG
+ // Stop as this is the overwrite boundary.
+ checkJournalAlignment(file_pos);
+ return false;
+ }
+ return true;
+}
+
+void RecoveryManager::readJournalData(char* target,
+ const std::streamsize readSize) {
+ std::streamoff bytesRead = 0;
+ while (bytesRead < readSize) {
+ if (inFileStream_.eof()) {
+ getNextFile(false);
+ }
+ bool readFitsInFile = inFileStream_.tellg() + readSize <= fileSize_kib_ * 1024;
+ std::streamoff readSize = readFitsInFile ? readSize : (fileSize_kib_ * 1024) - inFileStream_.tellg();
+ inFileStream_.read(target + bytesRead, readSize);
+ if (inFileStream_.gcount() != readSize) {
+ throw jexception(); // TODO - proper exception
+ }
+ bytesRead += readSize;
+ }
+}
+
+// static private
+void RecoveryManager::readJournalFileHeader(const std::string& journalFileName,
+ ::file_hdr_t& fileHeaderRef,
+ std::string& queueName) {
+ const std::size_t headerBlockSize = QLS_JRNL_FHDR_RES_SIZE_SBLKS * QLS_SBLK_SIZE_KIB * 1024;
+ char buffer[headerBlockSize];
+ std::ifstream ifs(journalFileName.c_str(), std::ifstream::in | std::ifstream::binary);
+ if (!ifs.good()) {
+ std::ostringstream oss;
+ oss << "File=" << journalFileName;
+ throw jexception(jerrno::JERR_RCVM_OPENRD, oss.str(), "RecoveryManager", "readJournalFileHeader");
+ }
+ ifs.read(buffer, headerBlockSize);
+ if (!ifs) {
+ std::streamsize s = ifs.gcount();
+ ifs.close();
+ std::ostringstream oss;
+ oss << "File=" << journalFileName << "; attempted_read_size=" << headerBlockSize << "; actual_read_size=" << s;
+ throw jexception(jerrno::JERR_RCVM_READ, oss.str(), "RecoveryManager", "readJournalFileHeader");
+ }
+ ifs.close();
+ ::memcpy(&fileHeaderRef, buffer, sizeof(::file_hdr_t));
+ queueName.assign(buffer + sizeof(::file_hdr_t), fileHeaderRef._queue_name_len);
+
+}
+
+void RecoveryManager::removeEmptyFiles(EmptyFilePool* emptyFilePoolPtr) {
+ while (enqueueCountList_.front() == 0 && enqueueCountList_.size() > 1) {
+ fileNumberNameMapItr_t i = fileNumberNameMap_.begin();
+//std::cout << "*** File " << i->first << ": " << i->second << " is empty." << std::endl;
+ emptyFilePoolPtr->returnEmptyFile(i->second);
+ fileNumberNameMap_.erase(i);
+ enqueueCountList_.pop_front();
+ }
+}
+
}} // namespace qpid::qls_jrnl
Modified: qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/RecoveryManager.h
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/RecoveryManager.h?rev=1534383&r1=1534382&r2=1534383&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/RecoveryManager.h (original)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/RecoveryManager.h Mon Oct 21 21:26:10 2013
@@ -22,42 +22,111 @@
#ifndef QPID_LINEARSTORE_RECOVERYSTATE_H_
#define QPID_LINEARSTORE_RECOVERYSTATE_H_
+#include <deque>
#include <fstream>
#include <map>
+#include "qpid/linearstore/jrnl/LinearFileController.h"
#include <stdint.h>
#include <vector>
+struct file_hdr_t;
+struct rec_hdr_t;
+
namespace qpid {
namespace qls_jrnl {
+class data_tok;
+class enq_map;
+class EmptyFilePool;
+class EmptyFilePoolManager;
+class JournalLog;
+class jrec;
+class txn_map;
+
class RecoveryManager
{
-private:
+protected:
+ // Types
+ typedef std::vector<std::string> directoryList_t;
+ typedef directoryList_t::const_iterator directoryListConstItr_t;
+ typedef std::map<uint64_t, std::string> fileNumberNameMap_t;
+ typedef fileNumberNameMap_t::iterator fileNumberNameMapItr_t;
+ typedef fileNumberNameMap_t::const_iterator fileNumberNameMapConstItr_t;
+ typedef std::deque<uint32_t> enqueueCountList_t;
+ typedef enqueueCountList_t::const_iterator enqueueCountListConstItr_t;
+ typedef std::vector<uint64_t> recordIdList_t;
+ typedef recordIdList_t::const_iterator recordIdListConstItr_t;
+
+ // Location and identity
+ const std::string journalDirectory_;
+ const std::string queueName_;
+ enq_map& enqueueMapRef_;
+ txn_map& transactionMapRef_;
+ JournalLog& journalLogRef_;
+
// Initial journal analysis data
- std::vector<std::string> _journalFileList; ///< Journal file list
- std::map<uint64_t, std::string> _fileNumberNameMap; ///< File number - name map
- std::vector<uint32_t> _enqueueCountList; ///< Number enqueued records found for each file
- bool _journalEmptyFlag; ///< Journal data files empty
- std::streamoff _firstRecordOffset; ///< First record offset in ffid
- std::streamoff _endOffset; ///< End offset (first byte past last record)
- uint64_t _highestRecordId; ///< Highest rid found
- bool _lastFileFullFlag; ///< Last file is full
+ fileNumberNameMap_t fileNumberNameMap_; ///< File number - name map
+ enqueueCountList_t enqueueCountList_; ///< Number enqueued records found for each file
+ bool journalEmptyFlag_; ///< Journal data files empty
+ std::streamoff firstRecordOffset_; ///< First record offset in ffid
+ std::streamoff endOffset_; ///< End offset (first byte past last record)
+ uint64_t highestRecordId_; ///< Highest rid found
+ uint64_t highestFileNumber_; ///< Highest file number found
+ bool lastFileFullFlag_; ///< Last file is full
// State for recovery of individual enqueued records
- uint64_t _currentRid;
- uint64_t _currentFileNumber;
- std::string _currentFileName;
- std::streamoff _fileSize;
- std::streamoff _recordStart;
- std::ifstream _inFileStream;
- bool _readComplete;
+ uint32_t fileSize_kib_;
+ fileNumberNameMapConstItr_t currentJournalFileConstItr_;
+ std::string currentFileName_;
+ std::ifstream inFileStream_;
+ recordIdList_t recordIdList_;
+ recordIdListConstItr_t recordIdListConstItr_;
public:
- RecoveryManager();
+ RecoveryManager(const std::string& journalDirectory,
+ const std::string& queuename,
+ enq_map& enqueueMapRef,
+ txn_map& transactionMapRef,
+ JournalLog& journalLogRef);
virtual ~RecoveryManager();
+ void analyzeJournals(const std::vector<std::string>* preparedTransactionListPtr,
+ EmptyFilePoolManager* emptyFilePoolManager,
+ EmptyFilePool** emptyFilePoolPtrPtr);
+ std::streamoff getEndOffset() const;
+ uint64_t getHighestFileNumber() const;
+ uint64_t getHighestRecordId() const;
+ bool isLastFileFull() const;
+ bool readNextRemainingRecord(void** const dataPtrPtr,
+ std::size_t& dataSize,
+ void** const xidPtrPtr,
+ std::size_t& xidSize,
+ bool& transient,
+ bool& external,
+ data_tok* const dtokp,
+ bool ignore_pending_txns);
+ void setLinearFileControllerJournals(lfcAddJournalFileFn fnPtr,
+ LinearFileController* lfcPtr);
std::string toString(const std::string& jid,
bool compact = true);
+protected:
+ void analyzeJournalFileHeaders(efpIdentity_t& efpIdentity);
+ void checkFileStreamOk(bool checkEof);
+ void checkJournalAlignment(const std::streampos recordPosition);
+ bool decodeRecord(jrec& record,
+ std::size_t& cumulativeSizeRead,
+ ::rec_hdr_t& recordHeader,
+ std::streampos& fileOffset);
+ std::string getCurrentFileName() const;
+ uint64_t getCurrentFileNumber() const;
+ bool getNextFile(bool jumpToFirstRecordOffsetFlag);
+ bool getNextRecordHeader();
+ void readJournalData(char* target, const std::streamsize size);
+ void removeEmptyFiles(EmptyFilePool* emptyFilePoolPtr);
+
+ static void readJournalFileHeader(const std::string& journalFileName,
+ ::file_hdr_t& fileHeaderRef,
+ std::string& queueName);
};
}} // namespace qpid::qls_jrnl
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org