You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kp...@apache.org on 2013/10/21 23:26:11 UTC

svn commit: r1534383 [2/4] - in /qpid/branches/linearstore/qpid/cpp/src: ./ qpid/broker/ qpid/legacystore/jrnl/ qpid/linearstore/ qpid/linearstore/jrnl/ qpid/linearstore/jrnl/utils/

Modified: qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/JournalFile.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/JournalFile.cpp?rev=1534383&r1=1534382&r2=1534383&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/JournalFile.cpp (original)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/JournalFile.cpp Mon Oct 21 21:26:10 2013
@@ -31,21 +31,21 @@
 namespace qpid {
 namespace qls_jrnl {
 
-JournalFile::JournalFile(const std::string& fqFileName_,
-                         const uint64_t fileSeqNum_,
-                         const uint32_t fileSize_kib_) :
-            fqFileName(fqFileName_),
-            fileSeqNum(fileSeqNum_),
-            fileHandle(-1),
-            fileCloseFlag(false),
-            fileHeaderBasePtr (0),
-            fileHeaderPtr(0),
-            aioControlBlockPtr(0),
-            fileSizeDblks(((fileSize_kib_ * 1024) + (QLS_JRNL_FHDR_RES_SIZE_SBLKS * JRNL_SBLK_SIZE_BYTES)) / JRNL_DBLK_SIZE_BYTES),
-            enqueuedRecordCount(0),
-            submittedDblkCount(0),
-            completedDblkCount(0),
-            outstandingAioOpsCount(0)
+JournalFile::JournalFile(const std::string& fqFileName,
+                         const uint64_t fileSeqNum,
+                         const efpDataSize_kib_t efpDataSize_kib) :
+            fqFileName_(fqFileName),
+            fileSeqNum_(fileSeqNum),
+            fileHandle_(-1),
+            fileCloseFlag_(false),
+            fileHeaderBasePtr_ (0),
+            fileHeaderPtr_(0),
+            aioControlBlockPtr_(0),
+            fileSize_dblks_(((efpDataSize_kib * 1024) + (QLS_JRNL_FHDR_RES_SIZE_SBLKS * QLS_SBLK_SIZE_BYTES)) / QLS_DBLK_SIZE_BYTES),
+            enqueuedRecordCount_(0),
+            submittedDblkCount_(0),
+            completedDblkCount_(0),
+            outstandingAioOpsCount_(0)
 {}
 
 JournalFile::~JournalFile() {
@@ -53,227 +53,223 @@ JournalFile::~JournalFile() {
 }
 
 void
-JournalFile::initialize() {
-    if (::posix_memalign(&fileHeaderBasePtr, QLS_AIO_ALIGN_BOUNDARY, QLS_JRNL_FHDR_RES_SIZE_SBLKS * JRNL_SBLK_SIZE_KIB * 1024))
+JournalFile::initialize(const uint32_t completedDblkCount) {
+    if (::posix_memalign(&fileHeaderBasePtr_, QLS_AIO_ALIGN_BOUNDARY_BYTES, QLS_JRNL_FHDR_RES_SIZE_SBLKS * QLS_SBLK_SIZE_KIB * 1024))
     {
         std::ostringstream oss;
-        oss << "posix_memalign(): blksize=" << QLS_AIO_ALIGN_BOUNDARY << " size=" << (QLS_JRNL_FHDR_RES_SIZE_SBLKS * JRNL_SBLK_SIZE_KIB * 1024);
+        oss << "posix_memalign(): blksize=" << QLS_AIO_ALIGN_BOUNDARY_BYTES << " size=" << (QLS_JRNL_FHDR_RES_SIZE_SBLKS * QLS_SBLK_SIZE_KIB * 1024);
         oss << FORMAT_SYSERR(errno);
         throw jexception(jerrno::JERR__MALLOC, oss.str(), "JournalFile", "initialize");
     }
-    fileHeaderPtr = (::file_hdr_t*)fileHeaderBasePtr;
-    aioControlBlockPtr = new aio_cb;
+    fileHeaderPtr_ = (::file_hdr_t*)fileHeaderBasePtr_;
+    aioControlBlockPtr_ = new aio_cb;
+    if (completedDblkCount > 0UL) {
+        submittedDblkCount_.add(completedDblkCount);
+        completedDblkCount_.add(completedDblkCount);
+    }
 }
 
 void
 JournalFile::finalize() {
-    if (fileHeaderBasePtr != 0) {
-        std::free(fileHeaderBasePtr);
-        fileHeaderBasePtr = 0;
-        fileHeaderPtr = 0;
+    if (fileHeaderBasePtr_ != 0) {
+        std::free(fileHeaderBasePtr_);
+        fileHeaderBasePtr_ = 0;
+        fileHeaderPtr_ = 0;
     }
-    if (aioControlBlockPtr != 0) {
-        std::free(aioControlBlockPtr);
-        aioControlBlockPtr = 0;
+    if (aioControlBlockPtr_ != 0) {
+        delete(aioControlBlockPtr_);
+        aioControlBlockPtr_ = 0;
     }
 }
 
-const std::string
-JournalFile::getDirectory() const {
-    return fqFileName.substr(0, fqFileName.rfind('/'));
+const std::string JournalFile::getDirectory() const {
+    return fqFileName_.substr(0, fqFileName_.rfind('/'));
+}
+
+const std::string JournalFile::getFileName() const {
+    return fqFileName_.substr(fqFileName_.rfind('/')+1);
 }
 
-const std::string
-JournalFile::getFileName() const {
-    return fqFileName.substr(fqFileName.rfind('/')+1);
+const std::string JournalFile::getFqFileName() const {
+    return fqFileName_;
 }
 
-const std::string
-JournalFile::getFqFileName() const {
-    return fqFileName;
+uint64_t JournalFile::getFileSeqNum() const {
+    return fileSeqNum_;
 }
 
-uint64_t
-JournalFile::getFileSeqNum() const {
-    return fileSeqNum;
+bool JournalFile::isOpen() const {
+    return fileHandle_ >= 0;
 }
 
-int
-JournalFile::open() {
-    fileHandle = ::open(fqFileName.c_str(), O_WRONLY | O_DIRECT, S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH); // 0644 -rw-r--r--
-    if (fileHandle < 0) {
+int JournalFile::open() {
+    fileHandle_ = ::open(fqFileName_.c_str(), O_WRONLY | O_DIRECT, S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH); // 0644 -rw-r--r--
+    if (fileHandle_ < 0) {
         std::ostringstream oss;
-        oss << "file=\"" << fqFileName << "\"" << FORMAT_SYSERR(errno);
+        oss << "file=\"" << fqFileName_ << "\"" << FORMAT_SYSERR(errno);
         throw jexception(jerrno::JERR_JNLF_OPEN, oss.str(), "JournalFile", "open");
     }
-    return fileHandle;
+    return fileHandle_;
 }
 
-bool
-JournalFile::isOpen() const {
-    return fileHandle >= 0;
-}
-
-void
-JournalFile::close() {
-    if (fileHandle >= 0) {
+void JournalFile::close() {
+    if (fileHandle_ >= 0) {
         if (getOutstandingAioDblks()) {
-            fileCloseFlag = true; // Close later when all outstanding AIOs have returned
+            fileCloseFlag_ = true; // Close later when all outstanding AIOs have returned
         } else {
-            int res = ::close(fileHandle);
-            fileHandle = -1;
+            int res = ::close(fileHandle_);
+            fileHandle_ = -1;
             if (res != 0) {
                 std::ostringstream oss;
-                oss << "file=\"" << fqFileName << "\"" << FORMAT_SYSERR(errno);
+                oss << "file=\"" << fqFileName_ << "\"" << FORMAT_SYSERR(errno);
                 throw jexception(jerrno::JERR_JNLF_CLOSE, oss.str(), "JournalFile", "open");
             }
         }
     }
 }
 
-void
-JournalFile::asyncFileHeaderWrite(io_context_t ioContextPtr_,
-                                  const efpPartitionNumber_t efpPartitionNumber_,
-                                  const efpDataSize_kib_t efpDataSize_kib_,
-                                  const uint16_t userFlags_,
-                                  const uint64_t recordId_,
-                                  const uint64_t firstRecordOffset_,
-                                  const std::string queueName_) {
-    ::file_hdr_create(fileHeaderPtr, QLS_FILE_MAGIC, QLS_JRNL_VERSION, QLS_JRNL_FHDR_RES_SIZE_SBLKS, efpPartitionNumber_, efpDataSize_kib_);
-    ::file_hdr_init(fileHeaderBasePtr, QLS_JRNL_FHDR_RES_SIZE_SBLKS * JRNL_SBLK_SIZE_KIB * 1024, userFlags_, recordId_, firstRecordOffset_, fileSeqNum, queueName_.size(), queueName_.data());
-    aio::prep_pwrite(aioControlBlockPtr, fileHandle, (void*)fileHeaderBasePtr, QLS_JRNL_FHDR_RES_SIZE_SBLKS * JRNL_SBLK_SIZE_KIB * 1024, 0UL);
-    if (aio::submit(ioContextPtr_, 1, &aioControlBlockPtr) < 0)
+void JournalFile::asyncFileHeaderWrite(io_context_t ioContextPtr,
+                                       const efpPartitionNumber_t efpPartitionNumber,
+                                       const efpDataSize_kib_t efpDataSize_kib,
+                                       const uint16_t userFlags,
+                                       const uint64_t recordId,
+                                       const uint64_t firstRecordOffset,
+                                       const std::string queueName) {
+    ::file_hdr_create(fileHeaderPtr_, QLS_FILE_MAGIC, QLS_JRNL_VERSION, QLS_JRNL_FHDR_RES_SIZE_SBLKS, efpPartitionNumber, efpDataSize_kib);
+    ::file_hdr_init(fileHeaderBasePtr_,
+                    QLS_JRNL_FHDR_RES_SIZE_SBLKS * QLS_SBLK_SIZE_KIB * 1024,
+                    userFlags,
+                    recordId,
+                    firstRecordOffset,
+                    fileSeqNum_,
+                    queueName.size(),
+                    queueName.data());
+    aio::prep_pwrite(aioControlBlockPtr_,
+                     fileHandle_,
+                     (void*)fileHeaderBasePtr_,
+                     QLS_JRNL_FHDR_RES_SIZE_SBLKS * QLS_SBLK_SIZE_KIB * 1024,
+                     0UL);
+    if (aio::submit(ioContextPtr, 1, &aioControlBlockPtr_) < 0)
         throw jexception(jerrno::JERR__AIO, "JournalFile", "asyncPageWrite");
-    addSubmittedDblkCount(QLS_JRNL_FHDR_RES_SIZE_SBLKS * JRNL_SBLK_SIZE_DBLKS);
+    addSubmittedDblkCount(QLS_JRNL_FHDR_RES_SIZE_SBLKS * QLS_SBLK_SIZE_DBLKS);
     incrOutstandingAioOperationCount();
 }
 
-void
-JournalFile::asyncPageWrite(io_context_t ioContextPtr_,
-                            aio_cb* aioControlBlockPtr_,
-                            void* data_,
-                            uint32_t dataSize_dblks_) {
-    aio::prep_pwrite_2(aioControlBlockPtr_, fileHandle, data_, dataSize_dblks_ * JRNL_DBLK_SIZE_BYTES, submittedDblkCount.get() * JRNL_DBLK_SIZE_BYTES);
-    pmgr::page_cb* pcbp = (pmgr::page_cb*)(aioControlBlockPtr_->data); // This page's control block (pcb)
-    pcbp->_wdblks = dataSize_dblks_;
+void JournalFile::asyncPageWrite(io_context_t ioContextPtr,
+                                 aio_cb* aioControlBlockPtr,
+                                 void* data,
+                                 uint32_t dataSize_dblks) {
+    aio::prep_pwrite_2(aioControlBlockPtr,
+                       fileHandle_,
+                       data,
+                       dataSize_dblks * QLS_DBLK_SIZE_BYTES,
+                       submittedDblkCount_.get() * QLS_DBLK_SIZE_BYTES);
+    pmgr::page_cb* pcbp = (pmgr::page_cb*)(aioControlBlockPtr->data); // This page's control block (pcb)
+    pcbp->_wdblks = dataSize_dblks;
     pcbp->_jfp = this;
-    if (aio::submit(ioContextPtr_, 1, &aioControlBlockPtr_) < 0)
-        throw jexception(jerrno::JERR__AIO, "JournalFile", "asyncPageWrite");
-    addSubmittedDblkCount(dataSize_dblks_);
+    if (aio::submit(ioContextPtr, 1, &aioControlBlockPtr) < 0) {
+        throw jexception(jerrno::JERR__AIO, "JournalFile", "asyncPageWrite"); // TODO: complete exception details
+    }
+    addSubmittedDblkCount(dataSize_dblks);
     incrOutstandingAioOperationCount();
 }
 
-uint32_t
-JournalFile::getEnqueuedRecordCount() const {
-    return enqueuedRecordCount.get();
+uint32_t JournalFile::getEnqueuedRecordCount() const {
+    return enqueuedRecordCount_.get();
 }
 
-uint32_t
-JournalFile::incrEnqueuedRecordCount() {
-    return enqueuedRecordCount.increment();
+uint32_t JournalFile::incrEnqueuedRecordCount() {
+    return enqueuedRecordCount_.increment();
 }
 
-uint32_t
-JournalFile::addEnqueuedRecordCount(const uint32_t a) {
-    return enqueuedRecordCount.add(a);
+uint32_t JournalFile::addEnqueuedRecordCount(const uint32_t a) {
+    return enqueuedRecordCount_.add(a);
 }
 
-uint32_t
-JournalFile::decrEnqueuedRecordCount() {
-    return enqueuedRecordCount.decrementLimit();
+uint32_t JournalFile::decrEnqueuedRecordCount() {
+    return enqueuedRecordCount_.decrementLimit();
 }
 
-uint32_t
-JournalFile::subtrEnqueuedRecordCount(const uint32_t s) {
-    return enqueuedRecordCount.subtractLimit(s);
+uint32_t JournalFile::subtrEnqueuedRecordCount(const uint32_t s) {
+    return enqueuedRecordCount_.subtractLimit(s);
 }
 
-uint32_t
-JournalFile::getSubmittedDblkCount() const {
-    return submittedDblkCount.get();
+uint32_t JournalFile::getSubmittedDblkCount() const {
+    return submittedDblkCount_.get();
 }
 
-uint32_t
-JournalFile::addSubmittedDblkCount(const uint32_t a) {
-    return submittedDblkCount.addLimit(a, fileSizeDblks, jerrno::JERR_JNLF_FILEOFFSOVFL);
+uint32_t JournalFile::addSubmittedDblkCount(const uint32_t a) {
+    return submittedDblkCount_.addLimit(a, fileSize_dblks_, jerrno::JERR_JNLF_FILEOFFSOVFL);
 }
 
-uint32_t
-JournalFile::getCompletedDblkCount() const {
-    return completedDblkCount.get();
+uint32_t JournalFile::getCompletedDblkCount() const {
+    return completedDblkCount_.get();
 }
 
-uint32_t
-JournalFile::addCompletedDblkCount(const uint32_t a) {
-    return completedDblkCount.addLimit(a, submittedDblkCount.get(), jerrno::JERR_JNLF_CMPLOFFSOVFL);
+uint32_t JournalFile::addCompletedDblkCount(const uint32_t a) {
+    return completedDblkCount_.addLimit(a, submittedDblkCount_.get(), jerrno::JERR_JNLF_CMPLOFFSOVFL);
 }
 
 uint16_t JournalFile::getOutstandingAioOperationCount() const {
-    return outstandingAioOpsCount.get();
+    return outstandingAioOpsCount_.get();
 }
 
 uint16_t JournalFile::incrOutstandingAioOperationCount() {
-    return outstandingAioOpsCount.increment();
+    return outstandingAioOpsCount_.increment();
 }
 
 uint16_t JournalFile::decrOutstandingAioOperationCount() {
-    uint16_t r = outstandingAioOpsCount.decrementLimit();
-    if (fileCloseFlag && outstandingAioOpsCount == 0) { // Delayed close
+    uint16_t r = outstandingAioOpsCount_.decrementLimit();
+    if (fileCloseFlag_ && outstandingAioOpsCount_ == 0) { // Delayed close
         close();
     }
     return r;
 }
 
-bool
-JournalFile::isEmpty() const {
-    return submittedDblkCount == 0;
+// --- Status helper functions ---
+
+bool JournalFile::isEmpty() const {
+    return submittedDblkCount_ == 0;
 }
 
-bool
-JournalFile::isDataEmpty() const {
-    return submittedDblkCount <= QLS_JRNL_FHDR_RES_SIZE_SBLKS * JRNL_SBLK_SIZE_DBLKS;
+bool JournalFile::isDataEmpty() const {
+    return submittedDblkCount_ <= QLS_JRNL_FHDR_RES_SIZE_SBLKS * QLS_SBLK_SIZE_DBLKS;
 }
 
-u_int32_t
-JournalFile::dblksRemaining() const {
-    return fileSizeDblks - submittedDblkCount;
+u_int32_t JournalFile::dblksRemaining() const {
+    return fileSize_dblks_ - submittedDblkCount_;
 }
 
-bool
-JournalFile::isFull() const {
-    return submittedDblkCount == fileSizeDblks;
+bool JournalFile::isFull() const {
+    return submittedDblkCount_ == fileSize_dblks_;
 }
 
-bool
-JournalFile::isFullAndComplete() const {
-    return completedDblkCount == fileSizeDblks;
+bool JournalFile::isFullAndComplete() const {
+    return completedDblkCount_ == fileSize_dblks_;
 }
 
-u_int32_t
-JournalFile::getOutstandingAioDblks() const {
-    return submittedDblkCount - completedDblkCount;
+u_int32_t JournalFile::getOutstandingAioDblks() const {
+    return submittedDblkCount_ - completedDblkCount_;
 }
 
-bool
-JournalFile::getNextFile() const {
+bool JournalFile::getNextFile() const {
     return isFull();
 }
 
-bool
-JournalFile::isNoEnqueuedRecordsRemaining() const {
+bool JournalFile::isNoEnqueuedRecordsRemaining() const {
     return !isDataEmpty() &&          // Must be written to, not empty
-           enqueuedRecordCount == 0;  // No remaining enqueued records
+           enqueuedRecordCount_ == 0;  // No remaining enqueued records
 }
 
-const std::string
-JournalFile::status_str(const uint8_t indentDepth_) const {
-    std::string indent((size_t)indentDepth_, '.');
+// debug aid
+const std::string JournalFile::status_str(const uint8_t indentDepth) const {
+    std::string indent((size_t)indentDepth, '.');
     std::ostringstream oss;
     oss << indent << "JournalFile: fileName=" << getFileName() << std::endl;
     oss << indent << "  directory=" << getDirectory() << std::endl;
-    oss << indent << "  fileSizeDblks=" << fileSizeDblks << std::endl;
+    oss << indent << "  fileSizeDblks=" << fileSize_dblks_ << std::endl;
     oss << indent << "  open=" << (isOpen() ? "T" : "F") << std::endl;
-    oss << indent << "  fileHandle=" << fileHandle << std::endl;
+    oss << indent << "  fileHandle=" << fileHandle_ << std::endl;
     oss << indent << "  enqueuedRecordCount=" << getEnqueuedRecordCount() << std::endl;
     oss << indent << "  submittedDblkCount=" << getSubmittedDblkCount() << std::endl;
     oss << indent << "  completedDblkCount=" << getCompletedDblkCount() << std::endl;

Modified: qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/JournalFile.h
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/JournalFile.h?rev=1534383&r1=1534382&r2=1534383&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/JournalFile.h (original)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/JournalFile.h Mon Oct 21 21:26:10 2013
@@ -36,26 +36,27 @@ namespace qls_jrnl {
 class JournalFile
 {
 protected:
-    const std::string fqFileName;
-    const uint64_t fileSeqNum;
-    int fileHandle;
-    bool fileCloseFlag;
-    void* fileHeaderBasePtr;
-    ::file_hdr_t* fileHeaderPtr;
-    aio_cb* aioControlBlockPtr;
-    uint32_t fileSizeDblks;                            ///< File size in data blocks, including file header
-    AtomicCounter<uint32_t> enqueuedRecordCount;       ///< Count of enqueued records
-    AtomicCounter<uint32_t> submittedDblkCount;        ///< Write file count (data blocks) for submitted AIO
-    AtomicCounter<uint32_t> completedDblkCount;        ///< Write file count (data blocks) for completed AIO
-    AtomicCounter<uint16_t> outstandingAioOpsCount;    ///< Outstanding AIO operations on this file
+    const std::string fqFileName_;
+    const uint64_t fileSeqNum_;
+    int fileHandle_;
+    bool fileCloseFlag_;
+    void* fileHeaderBasePtr_;
+    ::file_hdr_t* fileHeaderPtr_;
+    aio_cb* aioControlBlockPtr_;
+    uint32_t fileSize_dblks_;                           ///< File size in data blocks, including file header
+
+    AtomicCounter<uint32_t> enqueuedRecordCount_;       ///< Count of enqueued records
+    AtomicCounter<uint32_t> submittedDblkCount_;        ///< Write file count (data blocks) for submitted AIO
+    AtomicCounter<uint32_t> completedDblkCount_;        ///< Write file count (data blocks) for completed AIO
+    AtomicCounter<uint16_t> outstandingAioOpsCount_;    ///< Outstanding AIO operations on this file
 
 public:
     JournalFile(const std::string& fqFileName,
                 const uint64_t fileSeqNum,
-                const uint32_t fileSize_kib);
+                const efpDataSize_kib_t efpDataSize_kib);
     virtual ~JournalFile();
 
-    void initialize();
+    void initialize(const uint32_t completedDblkCount);
     void finalize();
 
     const std::string getDirectory() const;
@@ -63,8 +64,8 @@ public:
     const std::string getFqFileName() const;
     uint64_t getFileSeqNum() const;
 
-    int open();
     bool isOpen() const;
+    int open();
     void close();
     void asyncFileHeaderWrite(io_context_t ioContextPtr,
                               const efpPartitionNumber_t efpPartitionNumber,
@@ -104,7 +105,7 @@ public:
     bool getNextFile() const;                  ///< True when next file is needed
     bool isNoEnqueuedRecordsRemaining() const; ///< True when all enqueued records (or parts) have been dequeued
 
-    // Debug aid
+    // debug aid
     const std::string status_str(const uint8_t indentDepth) const;
 };
 

Modified: qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/JournalLog.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/JournalLog.cpp?rev=1534383&r1=1534382&r2=1534383&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/JournalLog.cpp (original)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/JournalLog.cpp Mon Oct 21 21:26:10 2013
@@ -25,28 +25,27 @@
 namespace qpid {
 namespace qls_jrnl {
 
-JournalLog::JournalLog() {}
+JournalLog::JournalLog(log_level_t logLevelThreshold) : logLevelThreshold_(logLevelThreshold) {}
 
 JournalLog::~JournalLog() {}
 
-void
-JournalLog::log(log_level_t ll, const std::string& jid, const std::string& log_stmt) const {
-    log(ll, jid.c_str(), log_stmt.c_str());
+void JournalLog::log(const log_level_t logLevel,
+                     const std::string& logStatement) const {
+    if (logLevel >= logLevelThreshold_) {
+        std::cerr << log_level_str(logLevel) << ": " << logStatement << std::endl;
+    }
 }
 
-void
-JournalLog::log(log_level_t ll, const char* jid, const char* const log_stmt) const {
-    if (ll > LOG_ERROR) {
-        std::cerr << log_level_str(ll) << ": Journal \"" << jid << "\": " << log_stmt << std::endl;
-    } else if (ll >= LOG_INFO) {
-        std::cout << log_level_str(ll) << ": Journal \"" << jid << "\": " << log_stmt << std::endl;
+void JournalLog::log(log_level_t logLevel,
+                     const std::string& journalId,
+                     const std::string& logStatement) const {
+    if (logLevel >= logLevelThreshold_) {
+        std::cerr << log_level_str(logLevel) << ": Journal \"" << journalId << "\": " << logStatement << std::endl;
     }
-
 }
 
-const char*
-JournalLog::log_level_str(log_level_t ll) {
-    switch (ll)
+const char* JournalLog::log_level_str(log_level_t logLevel) {
+    switch (logLevel)
     {
         case LOG_TRACE: return "TRACE";
         case LOG_DEBUG: return "DEBUG";

Modified: qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/JournalLog.h
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/JournalLog.h?rev=1534383&r1=1534382&r2=1534383&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/JournalLog.h (original)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/JournalLog.h Mon Oct 21 21:26:10 2013
@@ -30,8 +30,7 @@ namespace qls_jrnl {
 class JournalLog
 {
 public:
-    typedef enum _log_level
-    {
+    typedef enum _log_level {
         LOG_TRACE = 0,
         LOG_DEBUG,
         LOG_INFO,
@@ -42,13 +41,17 @@ public:
     } log_level_t;
 
 protected:
-    JournalLog();
-    virtual ~JournalLog();
+    const log_level_t logLevelThreshold_;
 
 public:
-    virtual void log(log_level_t level, const std::string& jid, const std::string& log_stmt) const;
-    virtual void log(log_level_t level, const char* jid, const char* const log_stmt) const;
-    static const char* log_level_str(log_level_t ll);
+    JournalLog(log_level_t logLevelThreshold);
+    virtual ~JournalLog();
+    virtual void log(const log_level_t logLevel,
+                     const std::string& logStatement) const;
+    virtual void log(const log_level_t logLevel,
+                     const std::string& journalId,
+                     const std::string& logStatement) const;
+    static const char* log_level_str(const log_level_t logLevel);
 };
 
 }} // namespace qpid::qls_jrnl

Modified: qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/LinearFileController.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/LinearFileController.cpp?rev=1534383&r1=1534382&r2=1534383&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/LinearFileController.cpp (original)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/LinearFileController.cpp Mon Oct 21 21:26:10 2013
@@ -29,284 +29,265 @@
 #include "qpid/linearstore/jrnl/slock.h"
 #include "qpid/linearstore/jrnl/utils/file_hdr.h"
 
-#include <iostream> // DEBUG
-
 namespace qpid {
 namespace qls_jrnl {
 
-LinearFileController::LinearFileController(jcntl& jcntlRef_) :
-            jcntlRef(jcntlRef_),
-            emptyFilePoolPtr(0),
-            currentJournalFilePtr(0),
-            fileSeqCounter(0),
-            recordIdCounter(0)
+LinearFileController::LinearFileController(jcntl& jcntlRef) :
+            jcntlRef_(jcntlRef),
+            emptyFilePoolPtr_(0),
+            currentJournalFilePtr_(0),
+            fileSeqCounter_(0),
+            recordIdCounter_(0)
 {}
 
 LinearFileController::~LinearFileController() {}
 
-void
-LinearFileController::initialize(const std::string& journalDirectory_,
-                                 EmptyFilePool* emptyFilePoolPtr_) {
-    journalDirectory.assign(journalDirectory_);
-    emptyFilePoolPtr = emptyFilePoolPtr_;
+void LinearFileController::initialize(const std::string& journalDirectory,
+                                      EmptyFilePool* emptyFilePoolPtr,
+                                      uint64_t initialFileNumberVal) {
+    journalDirectory_.assign(journalDirectory);
+    emptyFilePoolPtr_ = emptyFilePoolPtr;
+    fileSeqCounter_ = initialFileNumberVal;
 }
 
-void
-LinearFileController::finalize() {
-    while (!journalFileList.empty()) {
-        delete journalFileList.front();
-        journalFileList.pop_front();
+void LinearFileController::finalize() {
+    while (!journalFileList_.empty()) {
+        delete journalFileList_.front();
+        journalFileList_.pop_front();
     }
 }
 
-void
-LinearFileController::pullEmptyFileFromEfp() {
-    if (currentJournalFilePtr)
-        currentJournalFilePtr->close();
-    std::string ef = emptyFilePoolPtr->takeEmptyFile(journalDirectory); // Moves file from EFP only, returns new file name
-    std::cout << "*** LinearFileController::pullEmptyFileFromEfp() qn=" << jcntlRef.id() << " ef=" << ef << std::endl; // DEBUG
-    currentJournalFilePtr = new JournalFile(ef, getNextFileSeqNum(), emptyFilePoolPtr->dataSize_kib());
-    currentJournalFilePtr->initialize();
+void LinearFileController::addJournalFile(const std::string& fileName,
+                                          const uint64_t fileNumber,
+                                          const uint32_t fileSize_kib,
+                                          const uint32_t completedDblkCount) {
+    if (currentJournalFilePtr_)
+        currentJournalFilePtr_->close();
+    currentJournalFilePtr_ = new JournalFile(fileName, fileNumber, fileSize_kib);
+    currentJournalFilePtr_->initialize(completedDblkCount);
     {
-        slock l(journalFileListMutex);
-        journalFileList.push_back(currentJournalFilePtr);
+        slock l(journalFileListMutex_);
+        journalFileList_.push_back(currentJournalFilePtr_);
     }
-    currentJournalFilePtr->open();
+    currentJournalFilePtr_->open();
 }
 
-void
-LinearFileController::purgeFilesToEfp() {
-    slock l(journalFileListMutex);
-    while (journalFileList.front()->isNoEnqueuedRecordsRemaining()) {
-        emptyFilePoolPtr->returnEmptyFile(journalFileList.front());
-        delete journalFileList.front();
-        journalFileList.pop_front();
-    }
+efpDataSize_kib_t LinearFileController::dataSize_kib() const {
+    return emptyFilePoolPtr_->dataSize_kib();
+}
+
+efpDataSize_sblks_t LinearFileController::dataSize_sblks() const {
+    return emptyFilePoolPtr_->dataSize_sblks();
+}
+
+efpFileSize_kib_t LinearFileController::fileSize_kib() const {
+    return emptyFilePoolPtr_->fileSize_kib();
 }
 
-efpDataSize_kib_t
-LinearFileController::dataSize_kib() const {
-    return emptyFilePoolPtr->dataSize_kib();
+efpFileSize_sblks_t LinearFileController::fileSize_sblks() const {
+    return emptyFilePoolPtr_->fileSize_sblks();
 }
 
-efpFileSize_kib_t
-LinearFileController::fileSize_kib() const {
-    return emptyFilePoolPtr->fileSize_kib();
+uint64_t LinearFileController::getNextRecordId() {
+    return recordIdCounter_.increment();
 }
 
-efpDataSize_sblks_t
-LinearFileController::dataSize_sblks() const {
-    return emptyFilePoolPtr->dataSize_sblks();
+void LinearFileController::pullEmptyFileFromEfp() {
+    if (currentJournalFilePtr_)
+        currentJournalFilePtr_->close();
+    std::string ef = emptyFilePoolPtr_->takeEmptyFile(journalDirectory_); // Moves file from EFP only, returns new file name
+//std::cout << "*** LinearFileController::pullEmptyFileFromEfp() qn=" << jcntlRef.id() << " ef=" << ef << std::endl; // DEBUG
+    addJournalFile(ef, getNextFileSeqNum(), emptyFilePoolPtr_->dataSize_kib(), 0);
 }
 
-efpFileSize_sblks_t
-LinearFileController::fileSize_sblks() const {
-    return emptyFilePoolPtr->fileSize_sblks();
+void LinearFileController::purgeFilesToEfp() {
+    slock l(journalFileListMutex_);
+    while (journalFileList_.front()->isNoEnqueuedRecordsRemaining()) {
+        emptyFilePoolPtr_->returnEmptyFile(journalFileList_.front()->getFqFileName());
+        delete journalFileList_.front();
+        journalFileList_.pop_front();
+    }
 }
 
-uint64_t
-LinearFileController::getNextRecordId() {
-    return recordIdCounter.increment();
+uint32_t LinearFileController::getEnqueuedRecordCount(const efpFileCount_t fileSeqNumber) {
+    slock l(journalFileListMutex_);
+    return find(fileSeqNumber)->getEnqueuedRecordCount();
+}
+
+uint32_t LinearFileController::incrEnqueuedRecordCount(const efpFileCount_t fileSeqNumber) {
+    assertCurrentJournalFileValid("incrEnqueuedRecordCount");
+    return find(fileSeqNumber)->incrEnqueuedRecordCount();
 }
 
-uint32_t
-LinearFileController::decrEnqueuedRecordCount(const efpFileCount_t fileSeqNumber) {
-    slock l(journalFileListMutex);
+uint32_t LinearFileController::decrEnqueuedRecordCount(const efpFileCount_t fileSeqNumber) {
+    slock l(journalFileListMutex_);
     return find(fileSeqNumber)->decrEnqueuedRecordCount();
 }
 
-uint32_t
-LinearFileController::addWriteCompletedDblkCount(const efpFileCount_t fileSeqNumber, const uint32_t a) {
-    slock l(journalFileListMutex);
+uint32_t LinearFileController::addWriteCompletedDblkCount(const efpFileCount_t fileSeqNumber, const uint32_t a) {
+    slock l(journalFileListMutex_);
     return find(fileSeqNumber)->addCompletedDblkCount(a);
 }
 
-uint16_t
-LinearFileController::decrOutstandingAioOperationCount(const efpFileCount_t fileSeqNumber) {
-    slock l(journalFileListMutex);
+uint16_t LinearFileController::decrOutstandingAioOperationCount(const efpFileCount_t fileSeqNumber) {
+    slock l(journalFileListMutex_);
     return find(fileSeqNumber)->decrOutstandingAioOperationCount();
 }
 
-void
-LinearFileController::asyncFileHeaderWrite(io_context_t ioContextPtr,
-                                           const uint16_t userFlags,
-                                           const uint64_t recordId,
-                                           const uint64_t firstRecordOffset) {
-    currentJournalFilePtr->asyncFileHeaderWrite(ioContextPtr,
-                                                emptyFilePoolPtr->getPartitionNumber(),
-                                                emptyFilePoolPtr->dataSize_kib(),
-                                                userFlags,
-                                                recordId,
-                                                firstRecordOffset,
-                                                jcntlRef.id());
-}
-
-void
-LinearFileController::asyncPageWrite(io_context_t ioContextPtr,
-                                     aio_cb* aioControlBlockPtr,
-                                     void* data,
-                                     uint32_t dataSize_dblks) {
+void LinearFileController::asyncFileHeaderWrite(io_context_t ioContextPtr,
+                                                const uint16_t userFlags,
+                                                const uint64_t recordId,
+                                                const uint64_t firstRecordOffset) {
+    currentJournalFilePtr_->asyncFileHeaderWrite(ioContextPtr,
+                                                 emptyFilePoolPtr_->getPartitionNumber(),
+                                                 emptyFilePoolPtr_->dataSize_kib(),
+                                                 userFlags,
+                                                 recordId,
+                                                 firstRecordOffset,
+                                                 jcntlRef_.id());
+}
+
+void LinearFileController::asyncPageWrite(io_context_t ioContextPtr,
+                                          aio_cb* aioControlBlockPtr,
+                                          void* data,
+                                          uint32_t dataSize_dblks) {
     assertCurrentJournalFileValid("asyncPageWrite");
-    currentJournalFilePtr->asyncPageWrite(ioContextPtr, aioControlBlockPtr, data, dataSize_dblks);
+    currentJournalFilePtr_->asyncPageWrite(ioContextPtr, aioControlBlockPtr, data, dataSize_dblks);
 }
 
-uint64_t
-LinearFileController::getCurrentFileSeqNum() const {
+uint64_t LinearFileController::getCurrentFileSeqNum() const {
     assertCurrentJournalFileValid("getCurrentFileSeqNum");
-    return currentJournalFilePtr->getFileSeqNum();
+    return currentJournalFilePtr_->getFileSeqNum();
 }
 
-uint32_t
-LinearFileController::getEnqueuedRecordCount() const {
+uint32_t LinearFileController::getEnqueuedRecordCount() const {
     assertCurrentJournalFileValid("getEnqueuedRecordCount");
-    return currentJournalFilePtr->getEnqueuedRecordCount();
+    return currentJournalFilePtr_->getEnqueuedRecordCount();
 }
 
-uint32_t
-LinearFileController::incrEnqueuedRecordCount() {
+uint32_t LinearFileController::incrEnqueuedRecordCount() {
     assertCurrentJournalFileValid("incrEnqueuedRecordCount");
-    return currentJournalFilePtr->incrEnqueuedRecordCount();
+    return currentJournalFilePtr_->incrEnqueuedRecordCount();
 }
 
-uint32_t
-LinearFileController::addEnqueuedRecordCount(const uint32_t a) {
+uint32_t LinearFileController::addEnqueuedRecordCount(const uint32_t a) {
     assertCurrentJournalFileValid("addEnqueuedRecordCount");
-    return currentJournalFilePtr->addEnqueuedRecordCount(a);
+    return currentJournalFilePtr_->addEnqueuedRecordCount(a);
 }
 
-uint32_t
-LinearFileController::decrEnqueuedRecordCount() {
+uint32_t LinearFileController::decrEnqueuedRecordCount() {
     assertCurrentJournalFileValid("decrEnqueuedRecordCount");
-    return currentJournalFilePtr->decrEnqueuedRecordCount();
+    return currentJournalFilePtr_->decrEnqueuedRecordCount();
 }
 
-uint32_t
-LinearFileController::subtrEnqueuedRecordCount(const uint32_t s) {
+uint32_t LinearFileController::subtrEnqueuedRecordCount(const uint32_t s) {
     assertCurrentJournalFileValid("subtrEnqueuedRecordCount");
-    return currentJournalFilePtr->subtrEnqueuedRecordCount(s);
+    return currentJournalFilePtr_->subtrEnqueuedRecordCount(s);
 }
 
-uint32_t
-LinearFileController::getWriteSubmittedDblkCount() const {
+uint32_t LinearFileController::getWriteSubmittedDblkCount() const {
     assertCurrentJournalFileValid("getWriteSubmittedDblkCount");
-    return currentJournalFilePtr->getSubmittedDblkCount();
+    return currentJournalFilePtr_->getSubmittedDblkCount();
 }
 
-uint32_t
-LinearFileController::addWriteSubmittedDblkCount(const uint32_t a) {
+uint32_t LinearFileController::addWriteSubmittedDblkCount(const uint32_t a) {
     assertCurrentJournalFileValid("addWriteSubmittedDblkCount");
-    return currentJournalFilePtr->addSubmittedDblkCount(a);
+    return currentJournalFilePtr_->addSubmittedDblkCount(a);
 }
 
-uint32_t
-LinearFileController::getWriteCompletedDblkCount() const {
+uint32_t LinearFileController::getWriteCompletedDblkCount() const {
     assertCurrentJournalFileValid("getWriteCompletedDblkCount");
-    return currentJournalFilePtr->getCompletedDblkCount();
+    return currentJournalFilePtr_->getCompletedDblkCount();
 }
 
-uint32_t
-LinearFileController::addWriteCompletedDblkCount(const uint32_t a) {
+uint32_t LinearFileController::addWriteCompletedDblkCount(const uint32_t a) {
     assertCurrentJournalFileValid("addWriteCompletedDblkCount");
-    return currentJournalFilePtr->addCompletedDblkCount(a);
+    return currentJournalFilePtr_->addCompletedDblkCount(a);
 }
 
-uint16_t
-LinearFileController::getOutstandingAioOperationCount() const {
+uint16_t LinearFileController::getOutstandingAioOperationCount() const {
     assertCurrentJournalFileValid("getOutstandingAioOperationCount");
-    return currentJournalFilePtr->getOutstandingAioOperationCount();
+    return currentJournalFilePtr_->getOutstandingAioOperationCount();
 }
 
-uint16_t
-LinearFileController::incrOutstandingAioOperationCount() {
+uint16_t LinearFileController::incrOutstandingAioOperationCount() {
     assertCurrentJournalFileValid("incrOutstandingAioOperationCount");
-    return currentJournalFilePtr->incrOutstandingAioOperationCount();
+    return currentJournalFilePtr_->incrOutstandingAioOperationCount();
 }
 
-uint16_t
-LinearFileController::decrOutstandingAioOperationCount() {
+uint16_t LinearFileController::decrOutstandingAioOperationCount() {
     assertCurrentJournalFileValid("decrOutstandingAioOperationCount");
-    return currentJournalFilePtr->decrOutstandingAioOperationCount();
+    return currentJournalFilePtr_->decrOutstandingAioOperationCount();
 }
 
-bool
-LinearFileController::isEmpty() const {
+bool LinearFileController::isEmpty() const {
     assertCurrentJournalFileValid("isEmpty");
-    return currentJournalFilePtr->isEmpty();
+    return currentJournalFilePtr_->isEmpty();
 }
 
-bool
-LinearFileController::isDataEmpty() const {
+bool LinearFileController::isDataEmpty() const {
     assertCurrentJournalFileValid("isDataEmpty");
-    return currentJournalFilePtr->isDataEmpty();
+    return currentJournalFilePtr_->isDataEmpty();
 }
 
-u_int32_t
-LinearFileController::dblksRemaining() const {
+u_int32_t LinearFileController::dblksRemaining() const {
     assertCurrentJournalFileValid("dblksRemaining");
-    return currentJournalFilePtr->dblksRemaining();
+    return currentJournalFilePtr_->dblksRemaining();
 }
 
-bool
-LinearFileController::isFull() const {
+bool LinearFileController::isFull() const {
     assertCurrentJournalFileValid("isFull");
-    return currentJournalFilePtr->isFull();
+    return currentJournalFilePtr_->isFull();
 }
 
-bool
-LinearFileController::isFullAndComplete() const {
+bool LinearFileController::isFullAndComplete() const {
     assertCurrentJournalFileValid("isFullAndComplete");
-    return currentJournalFilePtr->isFullAndComplete();
+    return currentJournalFilePtr_->isFullAndComplete();
 }
 
-u_int32_t
-LinearFileController::getOutstandingAioDblks() const {
+u_int32_t LinearFileController::getOutstandingAioDblks() const {
     assertCurrentJournalFileValid("getOutstandingAioDblks");
-    return currentJournalFilePtr->getOutstandingAioDblks();
+    return currentJournalFilePtr_->getOutstandingAioDblks();
 }
 
-bool
-LinearFileController::getNextFile() const {
+bool LinearFileController::needNextFile() const {
     assertCurrentJournalFileValid("getNextFile");
-    return currentJournalFilePtr->getNextFile();
+    return currentJournalFilePtr_->getNextFile();
 }
 
-const std::string
-LinearFileController::status(const uint8_t indentDepth) const {
+const std::string LinearFileController::status(const uint8_t indentDepth) const {
     std::string indent((size_t)indentDepth, '.');
     std::ostringstream oss;
-    oss << indent << "LinearFileController: queue=" << jcntlRef.id() << std::endl;
-    oss << indent << "  journalDirectory=" << journalDirectory << std::endl;
-    oss << indent << "  fileSeqCounter=" << fileSeqCounter.get() << std::endl;
-    oss << indent << "  recordIdCounter=" << recordIdCounter.get() << std::endl;
-    oss << indent << "  journalFileList.size=" << journalFileList.size() << std::endl;
+    oss << indent << "LinearFileController: queue=" << jcntlRef_.id() << std::endl;
+    oss << indent << "  journalDirectory=" << journalDirectory_ << std::endl;
+    oss << indent << "  fileSeqCounter=" << fileSeqCounter_.get() << std::endl;
+    oss << indent << "  recordIdCounter=" << recordIdCounter_.get() << std::endl;
+    oss << indent << "  journalFileList.size=" << journalFileList_.size() << std::endl;
     if (checkCurrentJournalFileValid()) {
-        oss << currentJournalFilePtr->status_str(indentDepth+2);
+        oss << currentJournalFilePtr_->status_str(indentDepth+2);
     } else {
         oss << indent << "  <No current journal file>" << std::endl;
     }
     return oss.str();
 }
 
-// protected
+// --- protected functions ---
 
-bool
-LinearFileController::checkCurrentJournalFileValid() const {
-    return currentJournalFilePtr != 0;
+bool LinearFileController::checkCurrentJournalFileValid() const {
+    return currentJournalFilePtr_ != 0;
 }
 
-void
-LinearFileController::assertCurrentJournalFileValid(const char* const functionName) const {
+void LinearFileController::assertCurrentJournalFileValid(const char* const functionName) const {
     if (!checkCurrentJournalFileValid()) {
         throw jexception(jerrno::JERR__NULL, "LinearFileController", functionName);
     }
 }
 
 // NOTE: NOT THREAD SAFE - journalFileList is accessed by multiple threads - use under external lock
-JournalFile*
-LinearFileController::find(const efpFileCount_t fileSeqNumber) {
-    if (currentJournalFilePtr != 0 && currentJournalFilePtr->getFileSeqNum() == fileSeqNumber)
-        return currentJournalFilePtr;
-    for (JournalFileListItr_t i=journalFileList.begin(); i!=journalFileList.end(); ++i) {
+JournalFile* LinearFileController::find(const efpFileCount_t fileSeqNumber) {
+    if (currentJournalFilePtr_ != 0 && currentJournalFilePtr_->getFileSeqNum() == fileSeqNumber)
+        return currentJournalFilePtr_;
+    for (JournalFileListItr_t i=journalFileList_.begin(); i!=journalFileList_.end(); ++i) {
         if ((*i)->getFileSeqNum() == fileSeqNumber) {
             return *i;
         }
@@ -316,9 +297,8 @@ LinearFileController::find(const efpFile
     throw jexception(jerrno::JERR_LFCR_SEQNUMNOTFOUND, oss.str(), "LinearFileController", "find");
 }
 
-uint64_t
-LinearFileController::getNextFileSeqNum() {
-    return fileSeqCounter.increment();
+uint64_t LinearFileController::getNextFileSeqNum() {
+    return fileSeqCounter_.increment();
 }
 
 }} // namespace qpid::qls_jrnl

Modified: qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/LinearFileController.h
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/LinearFileController.h?rev=1534383&r1=1534382&r2=1534383&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/LinearFileController.h (original)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/LinearFileController.h Mon Oct 21 21:26:10 2013
@@ -23,14 +23,16 @@
 #define QPID_LINEARSTORE_LINEARFILECONTROLLER_H_
 
 #include <deque>
-#include "qpid/linearstore/jrnl/aio.h"
 #include "qpid/linearstore/jrnl/AtomicCounter.h"
 #include "qpid/linearstore/jrnl/EmptyFilePoolTypes.h"
-#include "qpid/linearstore/jrnl/smutex.h"
 
-struct file_hdr_t;
+// libaio forward declares
+typedef struct io_context* io_context_t;
+typedef struct iocb aio_cb;
+
 namespace qpid {
 namespace qls_jrnl {
+
 class EmptyFilePool;
 class jcntl;
 class JournalFile;
@@ -41,35 +43,44 @@ protected:
     typedef std::deque<JournalFile*> JournalFileList_t;
     typedef JournalFileList_t::iterator JournalFileListItr_t;
 
-    jcntl& jcntlRef;
-    std::string journalDirectory;
-    EmptyFilePool* emptyFilePoolPtr;
-    JournalFile* currentJournalFilePtr;
-    AtomicCounter<uint64_t> fileSeqCounter;
-    AtomicCounter<uint64_t> recordIdCounter;
+    jcntl& jcntlRef_;
+    std::string journalDirectory_;
+    EmptyFilePool* emptyFilePoolPtr_;
+    JournalFile* currentJournalFilePtr_;
+    AtomicCounter<uint64_t> fileSeqCounter_;
+    AtomicCounter<uint64_t> recordIdCounter_;
 
-    JournalFileList_t journalFileList;
-    smutex journalFileListMutex;
+    JournalFileList_t journalFileList_;
+    smutex journalFileListMutex_;
 
 public:
-    LinearFileController(jcntl& jcntlRef_);
+    LinearFileController(jcntl& jcntlRef);
     virtual ~LinearFileController();
 
-    void initialize(const std::string& journalDirectory_, EmptyFilePool* emptyFilePoolPtr_);
+    void initialize(const std::string& journalDirectory,
+                    EmptyFilePool* emptyFilePoolPtr,
+                    uint64_t initialFileNumberVal);
     void finalize();
 
-    void pullEmptyFileFromEfp();
-    void purgeFilesToEfp();
+    void addJournalFile(const std::string& fileName,
+                        const uint64_t fileNumber,
+                        const uint32_t fileSize_kib,
+                        const uint32_t completedDblkCount);
+
     efpDataSize_kib_t dataSize_kib() const;
-    efpFileSize_kib_t fileSize_kib() const;
     efpDataSize_sblks_t dataSize_sblks() const;
+    efpFileSize_kib_t fileSize_kib() const;
     efpFileSize_sblks_t fileSize_sblks() const;
-
     uint64_t getNextRecordId();
+    void pullEmptyFileFromEfp();
+    void purgeFilesToEfp();
 
-    // Functions for manipulating counts of non-current JournalFile instances in journalFileList
+    // Functions for manipulating counts of non-current JournalFile instances in journalFileList_
+    uint32_t getEnqueuedRecordCount(const efpFileCount_t fileSeqNumber);
+    uint32_t incrEnqueuedRecordCount(const efpFileCount_t fileSeqNumber);
     uint32_t decrEnqueuedRecordCount(const efpFileCount_t fileSeqNumber);
-    uint32_t addWriteCompletedDblkCount(const efpFileCount_t fileSeqNumber, const uint32_t a);
+    uint32_t addWriteCompletedDblkCount(const efpFileCount_t fileSeqNumber,
+                                        const uint32_t a);
     uint16_t decrOutstandingAioOperationCount(const efpFileCount_t fileSeqNumber);
 
     // Pass-through functions for JournalFile class
@@ -106,18 +117,20 @@ public:
     bool isFull() const;                       // True if all possible dblks have been submitted (but may not yet have returned from AIO)
     bool isFullAndComplete() const;            // True if all submitted dblks have returned from AIO
     u_int32_t getOutstandingAioDblks() const;  // Dblks still to be written
-    bool getNextFile() const;                  // True when next file is needed
+    bool needNextFile() const;                 // True when next file is needed
 
     // Debug aid
     const std::string status(const uint8_t indentDepth) const;
 
 protected:
-    bool checkCurrentJournalFileValid() const;
     void assertCurrentJournalFileValid(const char* const functionName) const;
-    JournalFile* find(const efpFileCount_t fileSeqNumber); // NOT THREAD SAFE - use under external lock
+    bool checkCurrentJournalFileValid() const;
+    JournalFile* find(const efpFileCount_t fileSeqNumber);
     uint64_t getNextFileSeqNum();
 };
 
+typedef void (LinearFileController::*lfcAddJournalFileFn)(const std::string&, const uint64_t, const uint32_t, const uint32_t);
+
 }} // namespace qpid::qls_jrnl
 
 #endif // QPID_LINEARSTORE_LINEARFILECONTROLLER_H_

Modified: qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/RecoveryManager.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/RecoveryManager.cpp?rev=1534383&r1=1534382&r2=1534383&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/RecoveryManager.cpp (original)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/RecoveryManager.cpp Mon Oct 21 21:26:10 2013
@@ -21,79 +21,625 @@
 
 #include "qpid/linearstore/jrnl/RecoveryManager.h"
 
+#include <algorithm>
+#include <cstdlib>
 #include <iomanip>
+#include "qpid/linearstore/jrnl/data_tok.h"
+#include "qpid/linearstore/jrnl/deq_rec.h"
+#include "qpid/linearstore/jrnl/EmptyFilePoolManager.h"
+#include "qpid/linearstore/jrnl/enq_map.h"
+#include "qpid/linearstore/jrnl/enq_rec.h"
 #include "qpid/linearstore/jrnl/jcfg.h"
+#include "qpid/linearstore/jrnl/jdir.h"
+#include "qpid/linearstore/jrnl/JournalLog.h"
+#include "qpid/linearstore/jrnl/jrec.h"
+#include "qpid/linearstore/jrnl/LinearFileController.h"
+#include "qpid/linearstore/jrnl/txn_map.h"
+#include "qpid/linearstore/jrnl/txn_rec.h"
+#include "qpid/linearstore/jrnl/utils/enq_hdr.h"
+#include "qpid/linearstore/jrnl/utils/file_hdr.h"
 #include <sstream>
+#include <string>
+#include <vector>
 
 namespace qpid {
 namespace qls_jrnl
 {
 
-RecoveryManager::RecoveryManager() : _journalFileList(),
-                                     _fileNumberNameMap(),
-                                     _enqueueCountList(),
-                                     _journalEmptyFlag(false),
-                                     _firstRecordOffset(0),
-                                     _endOffset(0),
-                                     _highestRecordId(0ULL),
-                                     _lastFileFullFlag(false),
-                                     _currentRid(0ULL),
-                                     _currentFileNumber(0ULL),
-                                     _currentFileName(),
-                                     _fileSize(0),
-                                     _recordStart(0),
-                                     _inFileStream(),
-                                     _readComplete(false)
+RecoveryManager::RecoveryManager(const std::string& journalDirectory,
+                                 const std::string& queuename,
+                                 enq_map& enqueueMapRef,
+                                 txn_map& transactionMapRef,
+                                 JournalLog& journalLogRef) :
+                                                 journalDirectory_(journalDirectory),
+                                                 queueName_(queuename),
+                                                 enqueueMapRef_(enqueueMapRef),
+                                                 transactionMapRef_(transactionMapRef),
+                                                 journalLogRef_(journalLogRef),
+                                                 journalEmptyFlag_(false),
+                                                 firstRecordOffset_(0),
+                                                 endOffset_(0),
+                                                 highestRecordId_(0ULL),
+                                                 highestFileNumber_(0ULL),
+                                                 lastFileFullFlag_(false),
+                                                 fileSize_kib_(0)
 {}
 
 RecoveryManager::~RecoveryManager() {}
 
-std::string
-RecoveryManager::toString(const std::string& jid,
-                          bool compact) {
+void RecoveryManager::analyzeJournals(const std::vector<std::string>* preparedTransactionListPtr,
+                                      EmptyFilePoolManager* emptyFilePoolManager,
+                                      EmptyFilePool** emptyFilePoolPtrPtr) {
+    // Analyze file headers of existing journal files
+    efpIdentity_t efpIdentity;
+    analyzeJournalFileHeaders(efpIdentity);
+    *emptyFilePoolPtrPtr = emptyFilePoolManager->getEmptyFilePool(efpIdentity);
+    fileSize_kib_ = (*emptyFilePoolPtrPtr)->fileSize_kib();
+    // Check for file full condition
+    lastFileFullFlag_ = endOffset_ == (std::streamoff)(*emptyFilePoolPtrPtr)->fileSize_kib() * 1024;
+
+    // Restore all read and write pointers and transactions
+    if (!journalEmptyFlag_) {
+        while (getNextRecordHeader()) {
+
+        }
+        if (inFileStream_.is_open()) {
+            inFileStream_.close();
+        }
+        // Remove leading files which have no enqueued records
+        removeEmptyFiles(*emptyFilePoolPtrPtr);
+
+        // Remove all txns from tmap that are not in the prepared list
+        if (preparedTransactionListPtr) {
+            std::vector<std::string> xidList;
+            transactionMapRef_.xid_list(xidList);
+            for (std::vector<std::string>::iterator itr = xidList.begin(); itr != xidList.end(); itr++) {
+                std::vector<std::string>::const_iterator pitr =
+                        std::find(preparedTransactionListPtr->begin(), preparedTransactionListPtr->end(), *itr);
+                if (pitr == preparedTransactionListPtr->end()) { // not found in prepared list
+                    txn_data_list tdl = transactionMapRef_.get_remove_tdata_list(*itr); // tdl will be empty if xid not found
+                    // Unlock any affected enqueues in emap
+                    for (tdl_itr i=tdl.begin(); i<tdl.end(); i++) {
+                        if (i->_enq_flag) { // enq op - decrement enqueue count
+                            enqueueCountList_[i->_pfid]--;
+                        } else if (enqueueMapRef_.is_enqueued(i->_drid, true)) { // deq op - unlock enq record
+                            int16_t ret = enqueueMapRef_.unlock(i->_drid);
+                            if (ret < enq_map::EMAP_OK) { // fail
+                                // enq_map::unlock()'s only error is enq_map::EMAP_RID_NOT_FOUND
+                                std::ostringstream oss;
+                                oss << std::hex << "_emap.unlock(): drid=0x\"" << i->_drid;
+                                throw jexception(jerrno::JERR_MAP_NOTFOUND, oss.str(), "RecoveryManager", "analyzeJournals");
+                            }
+                        }
+                    }
+                }
+            }
+        }
+        enqueueMapRef_.rid_list(recordIdList_);
+        recordIdListConstItr_ = recordIdList_.begin();
+    }
+}
+
+std::streamoff RecoveryManager::getEndOffset() const {
+    return endOffset_;
+}
+
+uint64_t RecoveryManager::getHighestFileNumber() const {
+    return highestFileNumber_;
+}
+
+uint64_t RecoveryManager::getHighestRecordId() const {
+    return highestRecordId_;
+}
+
+bool RecoveryManager::isLastFileFull() const {
+    return lastFileFullFlag_;
+}
+
+bool RecoveryManager::readNextRemainingRecord(void** const dataPtrPtr,
+                                              std::size_t& dataSize,
+                                              void** const xidPtrPtr,
+                                              std::size_t& xidSize,
+                                              bool& transient,
+                                              bool& external,
+                                              data_tok* const dtokp,
+                                              bool /*ignore_pending_txns*/) {
+    if (!dtokp->is_readable()) {
+        std::ostringstream oss;
+        oss << std::hex << std::setfill('0') << "dtok_id=0x" << std::setw(8) << dtokp->id();
+        oss << "; dtok_rid=0x" << std::setw(16) << dtokp->rid() << "; dtok_wstate=" << dtokp->wstate_str();
+        throw jexception(jerrno::JERR_JCNTL_ENQSTATE, oss.str(), "RecoveryManager", "readNextRemainingRecord");
+    }
+    if (recordIdListConstItr_ == recordIdList_.end()) {
+        return false;
+    }
+    enq_map::emap_data_struct_t eds;
+    enqueueMapRef_.get_data(*recordIdListConstItr_, eds);
+    uint64_t fileNumber = eds._pfid;
+    currentJournalFileConstItr_ = fileNumberNameMap_.find(fileNumber);
+    getNextFile(false);
+
+    inFileStream_.seekg(eds._file_posn, std::ifstream::beg);
+    if (!inFileStream_.good()) {
+        std::ostringstream oss;
+        oss << "Could not find offset 0x" << std::hex << eds._file_posn << " in file " << getCurrentFileName();
+        throw jexception(jerrno::JERR__FILEIO, oss.str(), "RecoveryManager", "readNextRemainingRecord");
+    }
+    ::enq_hdr_t enqueueHeader;
+    inFileStream_.read((char*)&enqueueHeader, sizeof(::enq_hdr_t));
+    if (inFileStream_.gcount() != sizeof(::enq_hdr_t)) {
+        std::ostringstream oss;
+        oss << "Could not read enqueue header from file " << getCurrentFileName() << " at offset 0x" << std::hex << eds._file_posn;
+        throw jexception(jerrno::JERR__FILEIO, oss.str(), "RecoveryManager", "readNextRemainingRecord");
+    }
+    // check flags
+    transient = ::is_enq_transient(&enqueueHeader);
+    external = ::is_enq_external(&enqueueHeader);
+
+    // read xid
+    xidSize = enqueueHeader._xidsize;
+    *xidPtrPtr = ::malloc(xidSize);
+    if (*xidPtrPtr == 0) {
+        std::ostringstream oss;
+        oss << "xidPtr, size=0x" << std::hex << xidSize;
+        throw jexception(jerrno::JERR__MALLOC, oss.str(), "RecoveryManager", "readNextRemainingRecord");
+    }
+    readJournalData((char*)*xidPtrPtr, xidSize);
+
+    // read data
+    dataSize = enqueueHeader._dsize;
+    *dataPtrPtr = ::malloc(dataSize);
+    if (*xidPtrPtr == 0) {
+        std::ostringstream oss;
+        oss << "dataPtr, size=0x" << std::hex << dataSize;
+        throw jexception(jerrno::JERR__MALLOC, oss.str(), "RecoveryManager", "readNextRemainingRecord");
+    }
+    readJournalData((char*)*dataPtrPtr, dataSize);
+    return true;
+}
+
+void RecoveryManager::setLinearFileControllerJournals(lfcAddJournalFileFn fnPtr,
+                                                      LinearFileController* lfcPtr) {
+//std::cout << "****** RecoveryManager::setLinearFileControllerJournals():" << std::endl; // DEBUG
+    for (fileNumberNameMapConstItr_t i = fileNumberNameMap_.begin(); i != fileNumberNameMap_.end(); ++i) {
+        uint32_t fileDblkCount = i->first == highestFileNumber_ ?               // Is this this last file?
+                                 endOffset_ / QLS_DBLK_SIZE_BYTES :            // Last file uses _endOffset
+                                 fileSize_kib_ * 1024 / QLS_DBLK_SIZE_BYTES;   // All others use file size to make them full
+        (lfcPtr->*fnPtr)(i->second, i->first, fileSize_kib_, fileDblkCount);
+//std::cout << "   ** f=" << i->second.substr(i->second.rfind('/')+1) << ",fn=" << i->first << ",s=" << _fileSize_kib << ",eo=" << fileDblkCount << "(" << (fileDblkCount * QLS_DBLK_SIZE_BYTES / 1024) << "kiB)" << std::endl; // DEBUG
+    }
+}
+
+std::string RecoveryManager::toString(const std::string& jid,
+                                      bool compact) {
     std::ostringstream oss;
     if (compact) {
         oss << "Recovery journal analysis (jid=\"" << jid << "\"):";
         oss << " jfl=[";
-        for (std::map<uint64_t, std::string>::const_iterator i=_fileNumberNameMap.begin(); i!=_fileNumberNameMap.end(); ++i) {
-            if (i!=_fileNumberNameMap.begin()) oss << " ";
+        for (fileNumberNameMapConstItr_t i=fileNumberNameMap_.begin(); i!=fileNumberNameMap_.end(); ++i) {
+            if (i!=fileNumberNameMap_.begin()) oss << " ";
             oss << i->first << ":" << i->second.substr(i->second.rfind('/')+1);
         }
         oss << "] ecl=[ ";
-        for (std::vector<uint32_t>::const_iterator j = _enqueueCountList.begin(); j!=_enqueueCountList.end(); ++j) {
-            if (j != _enqueueCountList.begin()) oss << " ";
+        for (enqueueCountListConstItr_t j = enqueueCountList_.begin(); j!=enqueueCountList_.end(); ++j) {
+            if (j != enqueueCountList_.begin()) oss << " ";
             oss << *j;
         }
-        oss << " ] empty=" << (_journalEmptyFlag ? "T" : "F");
-        oss << " fro=0x" << std::hex << _firstRecordOffset << std::dec << " (" << (_firstRecordOffset/JRNL_DBLK_SIZE_BYTES) << " dblks)";
-        oss << " eo=0x" << std::hex << _endOffset << std::dec << " ("  << (_endOffset/JRNL_DBLK_SIZE_BYTES) << " dblks)";
-        oss << " hrid=0x" << std::hex << _highestRecordId << std::dec;
-        oss << " lffull=" << (_lastFileFullFlag ? "T" : "F");
+        oss << " ] empty=" << (journalEmptyFlag_ ? "T" : "F");
+        oss << " fro=0x" << std::hex << firstRecordOffset_ << std::dec << " (" << (firstRecordOffset_/QLS_DBLK_SIZE_BYTES) << " dblks)";
+        oss << " eo=0x" << std::hex << endOffset_ << std::dec << " ("  << (endOffset_/QLS_DBLK_SIZE_BYTES) << " dblks)";
+        oss << " hrid=0x" << std::hex << highestRecordId_ << std::dec;
+        oss << " hfnum=0x" << std::hex << highestFileNumber_ << std::dec;
+        oss << " lffull=" << (lastFileFullFlag_ ? "T" : "F");
     } else {
         oss << "Recovery journal analysis (jid=\"" << jid << "\"):" << std::endl;
-        oss << "  Number of journal files = " << _fileNumberNameMap.size() << std::endl;
+        oss << "  Number of journal files = " << fileNumberNameMap_.size() << std::endl;
         oss << "  Journal File List:" << std::endl;
-        for (std::map<uint64_t, std::string>::const_iterator i=_fileNumberNameMap.begin(); i!=_fileNumberNameMap.end(); ++i) {
+        for (fileNumberNameMapConstItr_t i=fileNumberNameMap_.begin(); i!=fileNumberNameMap_.end(); ++i) {
             oss << "    " << i->first << ": " << i->second.substr(i->second.rfind('/')+1) << std::endl;
         }
         oss << "  Enqueue Counts: [ " << std::endl;
-        for (std::vector<uint32_t>::const_iterator j = _enqueueCountList.begin(); j!=_enqueueCountList.end(); ++j) {
-            if (j != _enqueueCountList.begin()) oss << ", ";
+        for (enqueueCountListConstItr_t j = enqueueCountList_.begin(); j!=enqueueCountList_.end(); ++j) {
+            if (j != enqueueCountList_.begin()) oss << ", ";
             oss << *j;
         }
         oss << " ]" << std::endl;
-        for (unsigned i=0; i<_enqueueCountList.size(); i++)
-           oss << "    File " << std::setw(2) << i << ": " << _enqueueCountList[i] << std::endl;
-        oss << "  Journal empty (_jempty) = " << (_journalEmptyFlag ? "TRUE" : "FALSE") << std::endl;
-        oss << "  First record offset in first fid (_fro) = 0x" << std::hex << _firstRecordOffset <<
-                std::dec << " (" << (_firstRecordOffset/JRNL_DBLK_SIZE_BYTES) << " dblks)" << std::endl;
-        oss << "  End offset (_eo) = 0x" << std::hex << _endOffset << std::dec << " ("  <<
-                (_endOffset/JRNL_DBLK_SIZE_BYTES) << " dblks)" << std::endl;
-        oss << "  Highest rid (_h_rid) = 0x" << std::hex << _highestRecordId << std::dec << std::endl;
-        oss << "  Last file full (_lffull) = " << (_lastFileFullFlag ? "TRUE" : "FALSE") << std::endl;
+        oss << "  Journal empty = " << (journalEmptyFlag_ ? "TRUE" : "FALSE") << std::endl;
+        oss << "  First record offset in first file = 0x" << std::hex << firstRecordOffset_ <<
+                std::dec << " (" << (firstRecordOffset_/QLS_DBLK_SIZE_BYTES) << " dblks)" << std::endl;
+        oss << "  End offset = 0x" << std::hex << endOffset_ << std::dec << " ("  <<
+                (endOffset_/QLS_DBLK_SIZE_BYTES) << " dblks)" << std::endl;
+        oss << "  Highest rid = 0x" << std::hex << highestRecordId_ << std::dec << std::endl;
+        oss << "  Highest file number = 0x" << std::hex << highestFileNumber_ << std::dec << std::endl;
+        oss << "  Last file full = " << (lastFileFullFlag_ ? "TRUE" : "FALSE") << std::endl;
         oss << "  Enqueued records (txn & non-txn):" << std::endl;
     }
     return oss.str();
 }
 
+// --- protected functions ---
+
+void RecoveryManager::analyzeJournalFileHeaders(efpIdentity_t& efpIdentity) {
+    std::string headerQueueName;
+    ::file_hdr_t fileHeader;
+    directoryList_t directoryList;
+    jdir::read_dir(journalDirectory_, directoryList, false, true, false, true);
+    for (directoryListConstItr_t i = directoryList.begin(); i != directoryList.end(); ++i) {
+        readJournalFileHeader(*i, fileHeader, headerQueueName);
+        if (headerQueueName.compare(queueName_) != 0) {
+            std::ostringstream oss;
+            oss << "Journal file " << (*i) << " belongs to queue \"" << headerQueueName << "\": ignoring";
+            journalLogRef_.log(JournalLog::LOG_WARN, queueName_, oss.str());
+        } else {
+            fileNumberNameMap_[fileHeader._file_number] = *i;
+            if (fileHeader._file_number > highestFileNumber_) {
+                highestFileNumber_ = fileHeader._file_number;
+            }
+        }
+    }
+    efpIdentity.first = fileHeader._efp_partition;
+    efpIdentity.second = fileHeader._file_size_kib;
+    enqueueCountList_.resize(fileNumberNameMap_.size(), 0);
+    currentJournalFileConstItr_ = fileNumberNameMap_.begin();
+}
+
+void RecoveryManager::checkFileStreamOk(bool checkEof) {
+    if (inFileStream_.fail() || inFileStream_.bad() || checkEof ? inFileStream_.eof() : false) {
+        throw jexception("read failure"); // TODO complete exception
+    }
+}
+
+void RecoveryManager::checkJournalAlignment(const std::streampos recordPosition) {
+    std::streampos currentPosn = recordPosition;
+    unsigned sblkOffset = currentPosn % QLS_SBLK_SIZE_BYTES;
+    if (sblkOffset)
+    {
+        std::ostringstream oss1;
+        oss1 << std::hex << "Bad record alignment found at fid=0x" << getCurrentFileNumber();
+        oss1 << " offs=0x" << currentPosn << " (likely journal overwrite boundary); " << std::dec;
+        oss1 << (QLS_SBLK_SIZE_DBLKS - (sblkOffset/QLS_DBLK_SIZE_BYTES)) << " filler record(s) required.";
+        journalLogRef_.log(JournalLog::LOG_WARN, queueName_, oss1.str());
+
+        std::ofstream outFileStream(getCurrentFileName().c_str(), std::ios_base::in | std::ios_base::out | std::ios_base::binary);
+        if (!outFileStream.good()) {
+            throw jexception(jerrno::JERR__FILEIO, getCurrentFileName(), "RecoveryManager", "checkJournalAlignment");
+        }
+        outFileStream.seekp(currentPosn);
+
+        // Prepare write buffer containing a single empty record (1 dblk)
+        void* writeBuffer = std::malloc(QLS_DBLK_SIZE_BYTES);
+        if (writeBuffer == 0) {
+            throw jexception(jerrno::JERR__MALLOC, "RecoveryManager", "checkJournalAlignment");
+        }
+        const uint32_t xmagic = QLS_EMPTY_MAGIC;
+        ::memcpy(writeBuffer, (const void*)&xmagic, sizeof(xmagic));
+        ::memset((char*)writeBuffer + sizeof(xmagic), QLS_CLEAN_CHAR, QLS_DBLK_SIZE_BYTES - sizeof(xmagic));
+
+        // Write as many empty records as are needed to get to sblk boundary
+        while (currentPosn % QLS_SBLK_SIZE_BYTES) {
+            outFileStream.write((const char*)writeBuffer, QLS_DBLK_SIZE_BYTES);
+            if (outFileStream.fail()) {
+                throw jexception(jerrno::JERR_RCVM_WRITE, "RecoveryManager", "checkJournalAlignment");
+            }
+            std::ostringstream oss2;
+            oss2 << std::hex << "Recover phase write: Wrote filler record: fid=0x" << getCurrentFileNumber();
+            oss2 << " offs=0x" << currentPosn;
+            journalLogRef_.log(JournalLog::LOG_NOTICE, queueName_, oss2.str());
+            currentPosn = outFileStream.tellp();
+        }
+        outFileStream.close();
+        std::free(writeBuffer);
+        journalLogRef_.log(JournalLog::LOG_INFO, queueName_, "Bad record alignment fixed.");
+    }
+    endOffset_ = currentPosn;
+}
+
+bool RecoveryManager::decodeRecord(jrec& record,
+                              std::size_t& cumulativeSizeRead,
+                              ::rec_hdr_t& headerRecord,
+                              std::streampos& fileOffset)
+{
+//    uint16_t start_fid = getCurrentFileNumber();
+    std::streampos start_file_offs = fileOffset;
+
+    if (highestRecordId_ == 0) {
+        highestRecordId_ = headerRecord._rid;
+    } else if (headerRecord._rid - highestRecordId_ < 0x8000000000000000ULL) { // RFC 1982 comparison for unsigned 64-bit
+        highestRecordId_ = headerRecord._rid;
+    }
+
+    bool done = false;
+    while (!done) {
+        try {
+            done = record.rcv_decode(headerRecord, &inFileStream_, cumulativeSizeRead);
+        }
+        catch (const jexception& e) {
+// TODO - review this logic and tidy up how rd._lfid is assigned. See new jinf.get_end_file() fn.
+// Original
+//             if (e.err_code() != jerrno::JERR_JREC_BADRECTAIL ||
+//                     fid != (rd._ffid ? rd._ffid - 1 : _num_jfiles - 1)) throw;
+// Tried this, but did not work
+//             if (e.err_code() != jerrno::JERR_JREC_BADRECTAIL || h._magic != 0) throw;
+            checkJournalAlignment(start_file_offs);
+//             rd._lfid = start_fid;
+            return false;
+        }
+        if (!done && !getNextFile(false)) {
+            checkJournalAlignment(start_file_offs);
+            return false;
+        }
+    }
+    return true;
+}
+
+std::string RecoveryManager::getCurrentFileName() const {
+    return currentJournalFileConstItr_->second;
+}
+
+uint64_t RecoveryManager::getCurrentFileNumber() const {
+    return currentJournalFileConstItr_->first;
+}
+
+bool RecoveryManager::getNextFile(bool jumpToFirstRecordOffsetFlag) {
+    if (inFileStream_.is_open()) {
+        if (inFileStream_.eof() || !inFileStream_.good())
+        {
+            inFileStream_.clear();
+            endOffset_ = inFileStream_.tellg(); // remember file offset before closing
+            if (endOffset_ == -1) { throw jexception("tellg() failure"); } // Check for error code -1 TODO: compelete exception
+            inFileStream_.close();
+            if (++currentJournalFileConstItr_ == fileNumberNameMap_.end()) {
+                return false;
+            }
+        }
+    }
+    if (!inFileStream_.is_open())
+    {
+        inFileStream_.clear(); // clear eof flag, req'd for older versions of c++
+        inFileStream_.open(getCurrentFileName().c_str(), std::ios_base::in | std::ios_base::binary);
+        if (!inFileStream_.good()) {
+            throw jexception(jerrno::JERR__FILEIO, getCurrentFileName(), "RecoveryManager", "getNextFile");
+        }
+
+        // Read file header
+//std::cout << " F" << getCurrentFileNumber() << std::flush; // DEBUG
+        file_hdr_t fhdr;
+        inFileStream_.read((char*)&fhdr, sizeof(fhdr));
+        checkFileStreamOk(true);
+        if (fhdr._rhdr._magic == QLS_FILE_MAGIC) {
+            firstRecordOffset_ = fhdr._fro;
+            std::streamoff foffs = jumpToFirstRecordOffsetFlag ? firstRecordOffset_ : QLS_SBLK_SIZE_BYTES;
+            inFileStream_.seekg(foffs);
+        } else {
+            inFileStream_.close();
+            if (currentJournalFileConstItr_ == fileNumberNameMap_.begin()) {
+                journalEmptyFlag_ = true;
+            }
+            return false;
+        }
+    }
+    return true;
+}
+
+bool RecoveryManager::getNextRecordHeader()
+{
+    std::size_t cum_size_read = 0;
+    void* xidp = 0;
+    rec_hdr_t h;
+
+    bool hdr_ok = false;
+    std::streampos file_pos;
+    while (!hdr_ok) {
+        if (!inFileStream_.is_open()) {
+            if (!getNextFile(true)) {
+                return false;
+            }
+        }
+        file_pos = inFileStream_.tellg();
+//std::cout << " 0x" << std::hex << file_pos << std::dec; // DEBUG
+        inFileStream_.read((char*)&h, sizeof(rec_hdr_t));
+        if (inFileStream_.gcount() == sizeof(rec_hdr_t)) {
+            hdr_ok = true;
+        } else {
+            if (!getNextFile(true)) {
+                return false;
+            }
+        }
+    }
+
+    switch(h._magic) {
+        case QLS_ENQ_MAGIC:
+            {
+//std::cout << ".e" << std::flush; // DEBUG
+                enq_rec er;
+                uint64_t start_fid = getCurrentFileNumber(); // fid may increment in decode() if record folds over file boundary
+                if (!decodeRecord(er, cum_size_read, h, file_pos)) {
+                    return false;
+                }
+                if (!er.is_transient()) { // Ignore transient msgs
+                    enqueueCountList_[start_fid]++;
+                    if (er.xid_size()) {
+                        er.get_xid(&xidp);
+                        if (xidp != 0) { throw jexception("Null xid with non-null xid_size"); } // TODO complete exception
+                        std::string xid((char*)xidp, er.xid_size());
+                        transactionMapRef_.insert_txn_data(xid, txn_data(h._rid, 0, start_fid, true));
+                        if (transactionMapRef_.set_aio_compl(xid, h._rid) < txn_map::TMAP_OK) { // fail - xid or rid not found
+                            std::ostringstream oss;
+                            oss << std::hex << "_tmap.set_aio_compl: txn_enq xid=\"" << xid << "\" rid=0x" << h._rid;
+                            throw jexception(jerrno::JERR_MAP_NOTFOUND, oss.str(), "RecoveryManager", "getNextRecordHeader");
+                        }
+                        std::free(xidp);
+                    } else {
+                        if (enqueueMapRef_.insert_pfid(h._rid, start_fid, file_pos) < enq_map::EMAP_OK) { // fail
+                            // The only error code emap::insert_pfid() returns is enq_map::EMAP_DUP_RID.
+                            std::ostringstream oss;
+                            oss << std::hex << "rid=0x" << h._rid << " _pfid=0x" << start_fid;
+                            throw jexception(jerrno::JERR_MAP_DUPLICATE, oss.str(), "RecoveryManager", "getNextRecordHeader");
+                        }
+                    }
+                }
+            }
+            break;
+        case QLS_DEQ_MAGIC:
+            {
+//std::cout << ".d" << std::flush; // DEBUG
+                deq_rec dr;
+                uint16_t start_fid = getCurrentFileNumber(); // fid may increment in decode() if record folds over file boundary
+                if (!decodeRecord(dr, cum_size_read, h, file_pos)) {
+                    return false;
+                }
+                if (dr.xid_size()) {
+                    // If the enqueue is part of a pending txn, it will not yet be in emap
+                    enqueueMapRef_.lock(dr.deq_rid()); // ignore not found error
+                    dr.get_xid(&xidp);
+                    if (xidp != 0) { throw jexception("Null xid with non-null xid_size"); } // TODO complete exception
+                    std::string xid((char*)xidp, dr.xid_size());
+                    transactionMapRef_.insert_txn_data(xid, txn_data(dr.rid(), dr.deq_rid(), start_fid, false,
+                            dr.is_txn_coml_commit()));
+                    if (transactionMapRef_.set_aio_compl(xid, dr.rid()) < txn_map::TMAP_OK) { // fail - xid or rid not found
+                        std::ostringstream oss;
+                        oss << std::hex << "_tmap.set_aio_compl: txn_deq xid=\"" << xid << "\" rid=0x" << dr.rid();
+                        throw jexception(jerrno::JERR_MAP_NOTFOUND, oss.str(), "RecoveryManager", "getNextRecordHeader");
+                    }
+                    std::free(xidp);
+                } else {
+                    uint64_t enq_fid;
+                    if (enqueueMapRef_.get_remove_pfid(dr.deq_rid(), enq_fid, true) == enq_map::EMAP_OK) { // ignore not found error
+                        enqueueCountList_[enq_fid]--;
+                    }
+                }
+            }
+            break;
+        case QLS_TXA_MAGIC:
+            {
+//std::cout << ".a" << std::flush; // DEBUG
+                txn_rec ar;
+                if (!decodeRecord(ar, cum_size_read, h, file_pos)) {
+                    return false;
+                }
+                // Delete this txn from tmap, unlock any locked records in emap
+                ar.get_xid(&xidp);
+                if (xidp != 0) {
+                    throw jexception("Null xid with non-null xid_size"); // TODO complete exception
+                }
+                std::string xid((char*)xidp, ar.xid_size());
+                txn_data_list tdl = transactionMapRef_.get_remove_tdata_list(xid); // tdl will be empty if xid not found
+                for (tdl_itr itr = tdl.begin(); itr != tdl.end(); itr++) {
+                    if (itr->_enq_flag) {
+                        enqueueCountList_[itr->_pfid]--;
+                    } else {
+                        enqueueMapRef_.unlock(itr->_drid); // ignore not found error
+                    }
+                }
+                std::free(xidp);
+            }
+            break;
+        case QLS_TXC_MAGIC:
+            {
+//std::cout << ".t" << std::flush; // DEBUG
+                txn_rec cr;
+                if (!decodeRecord(cr, cum_size_read, h, file_pos)) {
+                    return false;
+                }
+                // Delete this txn from tmap, process records into emap
+                cr.get_xid(&xidp);
+                if (xidp != 0) {
+                    throw jexception("Null xid with non-null xid_size"); // TODO complete exception
+                }
+                std::string xid((char*)xidp, cr.xid_size());
+                txn_data_list tdl = transactionMapRef_.get_remove_tdata_list(xid); // tdl will be empty if xid not found
+                for (tdl_itr itr = tdl.begin(); itr != tdl.end(); itr++) {
+                    if (itr->_enq_flag) { // txn enqueue
+                        if (enqueueMapRef_.insert_pfid(itr->_rid, itr->_pfid, file_pos) < enq_map::EMAP_OK) { // fail
+                            // The only error code emap::insert_pfid() returns is enq_map::EMAP_DUP_RID.
+                            std::ostringstream oss;
+                            oss << std::hex << "rid=0x" << itr->_rid << " _pfid=0x" << itr->_pfid;
+                            throw jexception(jerrno::JERR_MAP_DUPLICATE, oss.str(), "RecoveryManager", "getNextRecordHeader");
+                        }
+                    } else { // txn dequeue
+                        uint64_t enq_fid;
+                        if (enqueueMapRef_.get_remove_pfid(itr->_drid, enq_fid, true) == enq_map::EMAP_OK) // ignore not found error
+                            enqueueCountList_[enq_fid]--;
+                    }
+                }
+                std::free(xidp);
+            }
+            break;
+        case QLS_EMPTY_MAGIC:
+            {
+//std::cout << ".x" << std::flush; // DEBUG
+                uint32_t rec_dblks = jrec::size_dblks(sizeof(::rec_hdr_t));
+                inFileStream_.ignore(rec_dblks * QLS_DBLK_SIZE_BYTES - sizeof(::rec_hdr_t));
+                checkFileStreamOk(false);
+                if (!getNextFile(false)) {
+                    return false;
+                }
+            }
+            break;
+        case 0:
+//std::cout << ".0" << std::endl << std::flush; // DEBUG
+            checkJournalAlignment(file_pos);
+            return false;
+        default:
+//std::cout << ".?" << std::endl << std::flush; // DEBUG
+            // Stop as this is the overwrite boundary.
+            checkJournalAlignment(file_pos);
+            return false;
+    }
+    return true;
+}
+
+void RecoveryManager::readJournalData(char* target,
+                                      const std::streamsize readSize) {
+    std::streamoff bytesRead = 0;
+    while (bytesRead < readSize) {
+        if (inFileStream_.eof()) {
+            getNextFile(false);
+        }
+        bool readFitsInFile = inFileStream_.tellg() + readSize <= fileSize_kib_ * 1024;
+        std::streamoff readSize = readFitsInFile ? readSize : (fileSize_kib_ * 1024) - inFileStream_.tellg();
+        inFileStream_.read(target + bytesRead, readSize);
+        if (inFileStream_.gcount() != readSize) {
+            throw jexception(); // TODO - proper exception
+        }
+        bytesRead += readSize;
+    }
+}
+
+// static private
+void RecoveryManager::readJournalFileHeader(const std::string& journalFileName,
+                                            ::file_hdr_t& fileHeaderRef,
+                                            std::string& queueName) {
+    const std::size_t headerBlockSize = QLS_JRNL_FHDR_RES_SIZE_SBLKS * QLS_SBLK_SIZE_KIB * 1024;
+    char buffer[headerBlockSize];
+    std::ifstream ifs(journalFileName.c_str(), std::ifstream::in | std::ifstream::binary);
+    if (!ifs.good()) {
+        std::ostringstream oss;
+        oss << "File=" << journalFileName;
+        throw jexception(jerrno::JERR_RCVM_OPENRD, oss.str(), "RecoveryManager", "readJournalFileHeader");
+    }
+    ifs.read(buffer, headerBlockSize);
+    if (!ifs) {
+        std::streamsize s = ifs.gcount();
+        ifs.close();
+        std::ostringstream oss;
+        oss << "File=" << journalFileName << "; attempted_read_size=" << headerBlockSize << "; actual_read_size=" << s;
+        throw jexception(jerrno::JERR_RCVM_READ, oss.str(), "RecoveryManager", "readJournalFileHeader");
+    }
+    ifs.close();
+    ::memcpy(&fileHeaderRef, buffer, sizeof(::file_hdr_t));
+    queueName.assign(buffer + sizeof(::file_hdr_t), fileHeaderRef._queue_name_len);
+
+}
+
+void RecoveryManager::removeEmptyFiles(EmptyFilePool* emptyFilePoolPtr) {
+    while (enqueueCountList_.front() == 0 && enqueueCountList_.size() > 1) {
+        fileNumberNameMapItr_t i = fileNumberNameMap_.begin();
+//std::cout << "*** File " << i->first << ": " << i->second << " is empty." << std::endl;
+        emptyFilePoolPtr->returnEmptyFile(i->second);
+        fileNumberNameMap_.erase(i);
+        enqueueCountList_.pop_front();
+    }
+}
+
 }} // namespace qpid::qls_jrnl

Modified: qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/RecoveryManager.h
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/RecoveryManager.h?rev=1534383&r1=1534382&r2=1534383&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/RecoveryManager.h (original)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/RecoveryManager.h Mon Oct 21 21:26:10 2013
@@ -22,42 +22,111 @@
 #ifndef QPID_LINEARSTORE_RECOVERYSTATE_H_
 #define QPID_LINEARSTORE_RECOVERYSTATE_H_
 
+#include <deque>
 #include <fstream>
 #include <map>
+#include "qpid/linearstore/jrnl/LinearFileController.h"
 #include <stdint.h>
 #include <vector>
 
+struct file_hdr_t;
+struct rec_hdr_t;
+
 namespace qpid {
 namespace qls_jrnl {
 
+class data_tok;
+class enq_map;
+class EmptyFilePool;
+class EmptyFilePoolManager;
+class JournalLog;
+class jrec;
+class txn_map;
+
 class RecoveryManager
 {
-private:
+protected:
+    // Types
+    typedef std::vector<std::string> directoryList_t;
+    typedef directoryList_t::const_iterator directoryListConstItr_t;
+    typedef std::map<uint64_t, std::string> fileNumberNameMap_t;
+    typedef fileNumberNameMap_t::iterator fileNumberNameMapItr_t;
+    typedef fileNumberNameMap_t::const_iterator fileNumberNameMapConstItr_t;
+    typedef std::deque<uint32_t> enqueueCountList_t;
+    typedef enqueueCountList_t::const_iterator enqueueCountListConstItr_t;
+    typedef std::vector<uint64_t> recordIdList_t;
+    typedef recordIdList_t::const_iterator recordIdListConstItr_t;
+
+    // Location and identity
+    const std::string journalDirectory_;
+    const std::string queueName_;
+    enq_map& enqueueMapRef_;
+    txn_map& transactionMapRef_;
+    JournalLog& journalLogRef_;
+
     // Initial journal analysis data
-    std::vector<std::string> _journalFileList;          ///< Journal file list
-    std::map<uint64_t, std::string> _fileNumberNameMap; ///< File number - name map
-    std::vector<uint32_t> _enqueueCountList;            ///< Number enqueued records found for each file
-    bool _journalEmptyFlag;                             ///< Journal data files empty
-    std::streamoff _firstRecordOffset;                  ///< First record offset in ffid
-    std::streamoff _endOffset;                          ///< End offset (first byte past last record)
-    uint64_t _highestRecordId;                          ///< Highest rid found
-    bool _lastFileFullFlag;                             ///< Last file is full
+    fileNumberNameMap_t fileNumberNameMap_;     ///< File number - name map
+    enqueueCountList_t enqueueCountList_;       ///< Number enqueued records found for each file
+    bool journalEmptyFlag_;                     ///< Journal data files empty
+    std::streamoff firstRecordOffset_;          ///< First record offset in ffid
+    std::streamoff endOffset_;                  ///< End offset (first byte past last record)
+    uint64_t highestRecordId_;                  ///< Highest rid found
+    uint64_t highestFileNumber_;                ///< Highest file number found
+    bool lastFileFullFlag_;                     ///< Last file is full
 
     // State for recovery of individual enqueued records
-    uint64_t _currentRid;
-    uint64_t _currentFileNumber;
-    std::string _currentFileName;
-    std::streamoff _fileSize;
-    std::streamoff _recordStart;
-    std::ifstream _inFileStream;
-    bool _readComplete;
+    uint32_t fileSize_kib_;
+    fileNumberNameMapConstItr_t currentJournalFileConstItr_;
+    std::string currentFileName_;
+    std::ifstream inFileStream_;
+    recordIdList_t recordIdList_;
+    recordIdListConstItr_t recordIdListConstItr_;
 
 public:
-    RecoveryManager();
+    RecoveryManager(const std::string& journalDirectory,
+                    const std::string& queuename,
+                    enq_map& enqueueMapRef,
+                    txn_map& transactionMapRef,
+                    JournalLog& journalLogRef);
     virtual ~RecoveryManager();
 
+    void analyzeJournals(const std::vector<std::string>* preparedTransactionListPtr,
+                         EmptyFilePoolManager* emptyFilePoolManager,
+                         EmptyFilePool** emptyFilePoolPtrPtr);
+    std::streamoff getEndOffset() const;
+    uint64_t getHighestFileNumber() const;
+    uint64_t getHighestRecordId() const;
+    bool isLastFileFull() const;
+    bool readNextRemainingRecord(void** const dataPtrPtr,
+                                 std::size_t& dataSize,
+                                 void** const xidPtrPtr,
+                                 std::size_t& xidSize,
+                                 bool& transient,
+                                 bool& external,
+                                 data_tok* const dtokp,
+                                 bool ignore_pending_txns);
+    void setLinearFileControllerJournals(lfcAddJournalFileFn fnPtr,
+                                         LinearFileController* lfcPtr);
     std::string toString(const std::string& jid,
                          bool compact = true);
+protected:
+    void analyzeJournalFileHeaders(efpIdentity_t& efpIdentity);
+    void checkFileStreamOk(bool checkEof);
+    void checkJournalAlignment(const std::streampos recordPosition);
+    bool decodeRecord(jrec& record,
+                      std::size_t& cumulativeSizeRead,
+                      ::rec_hdr_t& recordHeader,
+                      std::streampos& fileOffset);
+    std::string getCurrentFileName() const;
+    uint64_t getCurrentFileNumber() const;
+    bool getNextFile(bool jumpToFirstRecordOffsetFlag);
+    bool getNextRecordHeader();
+    void readJournalData(char* target, const std::streamsize size);
+    void removeEmptyFiles(EmptyFilePool* emptyFilePoolPtr);
+
+    static void readJournalFileHeader(const std::string& journalFileName,
+                                      ::file_hdr_t& fileHeaderRef,
+                                      std::string& queueName);
 };
 
 }} // namespace qpid::qls_jrnl



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org