You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kp...@apache.org on 2013/10/22 21:09:58 UTC

svn commit: r1534736 [4/8] - in /qpid/trunk/qpid/cpp/src: ./ qpid/linearstore/ qpid/linearstore/jrnl/ qpid/linearstore/jrnl/utils/ tests/linearstore/

Added: qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/LinearFileController.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/LinearFileController.cpp?rev=1534736&view=auto
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/LinearFileController.cpp (added)
+++ qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/LinearFileController.cpp Tue Oct 22 19:09:56 2013
@@ -0,0 +1,304 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "qpid/linearstore/jrnl/LinearFileController.h"
+
+#include <fstream>
+#include "qpid/linearstore/jrnl/EmptyFilePool.h"
+#include "qpid/linearstore/jrnl/jcfg.h"
+#include "qpid/linearstore/jrnl/jcntl.h"
+#include "qpid/linearstore/jrnl/JournalFile.h"
+#include "qpid/linearstore/jrnl/slock.h"
+#include "qpid/linearstore/jrnl/utils/file_hdr.h"
+
+namespace qpid {
+namespace qls_jrnl {
+
+LinearFileController::LinearFileController(jcntl& jcntlRef) :
+            jcntlRef_(jcntlRef),
+            emptyFilePoolPtr_(0),
+            currentJournalFilePtr_(0),
+            fileSeqCounter_(0),
+            recordIdCounter_(0)
+{}
+
+LinearFileController::~LinearFileController() {}
+
+void LinearFileController::initialize(const std::string& journalDirectory,
+                                      EmptyFilePool* emptyFilePoolPtr,
+                                      uint64_t initialFileNumberVal) {
+    journalDirectory_.assign(journalDirectory);
+    emptyFilePoolPtr_ = emptyFilePoolPtr;
+    fileSeqCounter_ = initialFileNumberVal;
+}
+
+void LinearFileController::finalize() {
+    while (!journalFileList_.empty()) {
+        delete journalFileList_.front();
+        journalFileList_.pop_front();
+    }
+}
+
+void LinearFileController::addJournalFile(const std::string& fileName,
+                                          const uint64_t fileNumber,
+                                          const uint32_t fileSize_kib,
+                                          const uint32_t completedDblkCount) {
+    if (currentJournalFilePtr_)
+        currentJournalFilePtr_->close();
+    currentJournalFilePtr_ = new JournalFile(fileName, fileNumber, fileSize_kib);
+    currentJournalFilePtr_->initialize(completedDblkCount);
+    {
+        slock l(journalFileListMutex_);
+        journalFileList_.push_back(currentJournalFilePtr_);
+    }
+    currentJournalFilePtr_->open();
+}
+
+efpDataSize_kib_t LinearFileController::dataSize_kib() const {
+    return emptyFilePoolPtr_->dataSize_kib();
+}
+
+efpDataSize_sblks_t LinearFileController::dataSize_sblks() const {
+    return emptyFilePoolPtr_->dataSize_sblks();
+}
+
+efpFileSize_kib_t LinearFileController::fileSize_kib() const {
+    return emptyFilePoolPtr_->fileSize_kib();
+}
+
+efpFileSize_sblks_t LinearFileController::fileSize_sblks() const {
+    return emptyFilePoolPtr_->fileSize_sblks();
+}
+
+uint64_t LinearFileController::getNextRecordId() {
+    return recordIdCounter_.increment();
+}
+
+void LinearFileController::pullEmptyFileFromEfp() {
+    if (currentJournalFilePtr_)
+        currentJournalFilePtr_->close();
+    std::string ef = emptyFilePoolPtr_->takeEmptyFile(journalDirectory_); // Moves file from EFP only, returns new file name
+//std::cout << "*** LinearFileController::pullEmptyFileFromEfp() qn=" << jcntlRef.id() << " ef=" << ef << std::endl; // DEBUG
+    addJournalFile(ef, getNextFileSeqNum(), emptyFilePoolPtr_->dataSize_kib(), 0);
+}
+
+void LinearFileController::purgeFilesToEfp() {
+    slock l(journalFileListMutex_);
+    while (journalFileList_.front()->isNoEnqueuedRecordsRemaining()) {
+        emptyFilePoolPtr_->returnEmptyFile(journalFileList_.front()->getFqFileName());
+        delete journalFileList_.front();
+        journalFileList_.pop_front();
+    }
+}
+
+uint32_t LinearFileController::getEnqueuedRecordCount(const efpFileCount_t fileSeqNumber) {
+    slock l(journalFileListMutex_);
+    return find(fileSeqNumber)->getEnqueuedRecordCount();
+}
+
+uint32_t LinearFileController::incrEnqueuedRecordCount(const efpFileCount_t fileSeqNumber) {
+    assertCurrentJournalFileValid("incrEnqueuedRecordCount");
+    return find(fileSeqNumber)->incrEnqueuedRecordCount();
+}
+
+uint32_t LinearFileController::decrEnqueuedRecordCount(const efpFileCount_t fileSeqNumber) {
+    slock l(journalFileListMutex_);
+    return find(fileSeqNumber)->decrEnqueuedRecordCount();
+}
+
+uint32_t LinearFileController::addWriteCompletedDblkCount(const efpFileCount_t fileSeqNumber, const uint32_t a) {
+    slock l(journalFileListMutex_);
+    return find(fileSeqNumber)->addCompletedDblkCount(a);
+}
+
+uint16_t LinearFileController::decrOutstandingAioOperationCount(const efpFileCount_t fileSeqNumber) {
+    slock l(journalFileListMutex_);
+    return find(fileSeqNumber)->decrOutstandingAioOperationCount();
+}
+
+void LinearFileController::asyncFileHeaderWrite(io_context_t ioContextPtr,
+                                                const uint16_t userFlags,
+                                                const uint64_t recordId,
+                                                const uint64_t firstRecordOffset) {
+    currentJournalFilePtr_->asyncFileHeaderWrite(ioContextPtr,
+                                                 emptyFilePoolPtr_->getPartitionNumber(),
+                                                 emptyFilePoolPtr_->dataSize_kib(),
+                                                 userFlags,
+                                                 recordId,
+                                                 firstRecordOffset,
+                                                 jcntlRef_.id());
+}
+
+void LinearFileController::asyncPageWrite(io_context_t ioContextPtr,
+                                          aio_cb* aioControlBlockPtr,
+                                          void* data,
+                                          uint32_t dataSize_dblks) {
+    assertCurrentJournalFileValid("asyncPageWrite");
+    currentJournalFilePtr_->asyncPageWrite(ioContextPtr, aioControlBlockPtr, data, dataSize_dblks);
+}
+
+uint64_t LinearFileController::getCurrentFileSeqNum() const {
+    assertCurrentJournalFileValid("getCurrentFileSeqNum");
+    return currentJournalFilePtr_->getFileSeqNum();
+}
+
+uint32_t LinearFileController::getEnqueuedRecordCount() const {
+    assertCurrentJournalFileValid("getEnqueuedRecordCount");
+    return currentJournalFilePtr_->getEnqueuedRecordCount();
+}
+
+uint32_t LinearFileController::incrEnqueuedRecordCount() {
+    assertCurrentJournalFileValid("incrEnqueuedRecordCount");
+    return currentJournalFilePtr_->incrEnqueuedRecordCount();
+}
+
+uint32_t LinearFileController::addEnqueuedRecordCount(const uint32_t a) {
+    assertCurrentJournalFileValid("addEnqueuedRecordCount");
+    return currentJournalFilePtr_->addEnqueuedRecordCount(a);
+}
+
+uint32_t LinearFileController::decrEnqueuedRecordCount() {
+    assertCurrentJournalFileValid("decrEnqueuedRecordCount");
+    return currentJournalFilePtr_->decrEnqueuedRecordCount();
+}
+
+uint32_t LinearFileController::subtrEnqueuedRecordCount(const uint32_t s) {
+    assertCurrentJournalFileValid("subtrEnqueuedRecordCount");
+    return currentJournalFilePtr_->subtrEnqueuedRecordCount(s);
+}
+
+uint32_t LinearFileController::getWriteSubmittedDblkCount() const {
+    assertCurrentJournalFileValid("getWriteSubmittedDblkCount");
+    return currentJournalFilePtr_->getSubmittedDblkCount();
+}
+
+uint32_t LinearFileController::addWriteSubmittedDblkCount(const uint32_t a) {
+    assertCurrentJournalFileValid("addWriteSubmittedDblkCount");
+    return currentJournalFilePtr_->addSubmittedDblkCount(a);
+}
+
+uint32_t LinearFileController::getWriteCompletedDblkCount() const {
+    assertCurrentJournalFileValid("getWriteCompletedDblkCount");
+    return currentJournalFilePtr_->getCompletedDblkCount();
+}
+
+uint32_t LinearFileController::addWriteCompletedDblkCount(const uint32_t a) {
+    assertCurrentJournalFileValid("addWriteCompletedDblkCount");
+    return currentJournalFilePtr_->addCompletedDblkCount(a);
+}
+
+uint16_t LinearFileController::getOutstandingAioOperationCount() const {
+    assertCurrentJournalFileValid("getOutstandingAioOperationCount");
+    return currentJournalFilePtr_->getOutstandingAioOperationCount();
+}
+
+uint16_t LinearFileController::incrOutstandingAioOperationCount() {
+    assertCurrentJournalFileValid("incrOutstandingAioOperationCount");
+    return currentJournalFilePtr_->incrOutstandingAioOperationCount();
+}
+
+uint16_t LinearFileController::decrOutstandingAioOperationCount() {
+    assertCurrentJournalFileValid("decrOutstandingAioOperationCount");
+    return currentJournalFilePtr_->decrOutstandingAioOperationCount();
+}
+
+bool LinearFileController::isEmpty() const {
+    assertCurrentJournalFileValid("isEmpty");
+    return currentJournalFilePtr_->isEmpty();
+}
+
+bool LinearFileController::isDataEmpty() const {
+    assertCurrentJournalFileValid("isDataEmpty");
+    return currentJournalFilePtr_->isDataEmpty();
+}
+
+u_int32_t LinearFileController::dblksRemaining() const {
+    assertCurrentJournalFileValid("dblksRemaining");
+    return currentJournalFilePtr_->dblksRemaining();
+}
+
+bool LinearFileController::isFull() const {
+    assertCurrentJournalFileValid("isFull");
+    return currentJournalFilePtr_->isFull();
+}
+
+bool LinearFileController::isFullAndComplete() const {
+    assertCurrentJournalFileValid("isFullAndComplete");
+    return currentJournalFilePtr_->isFullAndComplete();
+}
+
+u_int32_t LinearFileController::getOutstandingAioDblks() const {
+    assertCurrentJournalFileValid("getOutstandingAioDblks");
+    return currentJournalFilePtr_->getOutstandingAioDblks();
+}
+
+bool LinearFileController::needNextFile() const {
+    assertCurrentJournalFileValid("getNextFile");
+    return currentJournalFilePtr_->getNextFile();
+}
+
+const std::string LinearFileController::status(const uint8_t indentDepth) const {
+    std::string indent((size_t)indentDepth, '.');
+    std::ostringstream oss;
+    oss << indent << "LinearFileController: queue=" << jcntlRef_.id() << std::endl;
+    oss << indent << "  journalDirectory=" << journalDirectory_ << std::endl;
+    oss << indent << "  fileSeqCounter=" << fileSeqCounter_.get() << std::endl;
+    oss << indent << "  recordIdCounter=" << recordIdCounter_.get() << std::endl;
+    oss << indent << "  journalFileList.size=" << journalFileList_.size() << std::endl;
+    if (checkCurrentJournalFileValid()) {
+        oss << currentJournalFilePtr_->status_str(indentDepth+2);
+    } else {
+        oss << indent << "  <No current journal file>" << std::endl;
+    }
+    return oss.str();
+}
+
+// --- protected functions ---
+
+bool LinearFileController::checkCurrentJournalFileValid() const {
+    return currentJournalFilePtr_ != 0;
+}
+
+void LinearFileController::assertCurrentJournalFileValid(const char* const functionName) const {
+    if (!checkCurrentJournalFileValid()) {
+        throw jexception(jerrno::JERR__NULL, "LinearFileController", functionName);
+    }
+}
+
+// NOTE: NOT THREAD SAFE - journalFileList is accessed by multiple threads - use under external lock
+JournalFile* LinearFileController::find(const efpFileCount_t fileSeqNumber) {
+    if (currentJournalFilePtr_ != 0 && currentJournalFilePtr_->getFileSeqNum() == fileSeqNumber)
+        return currentJournalFilePtr_;
+    for (JournalFileListItr_t i=journalFileList_.begin(); i!=journalFileList_.end(); ++i) {
+        if ((*i)->getFileSeqNum() == fileSeqNumber) {
+            return *i;
+        }
+    }
+    std::ostringstream oss;
+    oss << "fileSeqNumber=" << fileSeqNumber;
+    throw jexception(jerrno::JERR_LFCR_SEQNUMNOTFOUND, oss.str(), "LinearFileController", "find");
+}
+
+uint64_t LinearFileController::getNextFileSeqNum() {
+    return fileSeqCounter_.increment();
+}
+
+}} // namespace qpid::qls_jrnl

Added: qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/LinearFileController.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/LinearFileController.h?rev=1534736&view=auto
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/LinearFileController.h (added)
+++ qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/LinearFileController.h Tue Oct 22 19:09:56 2013
@@ -0,0 +1,136 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#ifndef QPID_LINEARSTORE_LINEARFILECONTROLLER_H_
+#define QPID_LINEARSTORE_LINEARFILECONTROLLER_H_
+
+#include <deque>
+#include "qpid/linearstore/jrnl/AtomicCounter.h"
+#include "qpid/linearstore/jrnl/EmptyFilePoolTypes.h"
+
+// libaio forward declares
+typedef struct io_context* io_context_t;
+typedef struct iocb aio_cb;
+
+namespace qpid {
+namespace qls_jrnl {
+
+class EmptyFilePool;
+class jcntl;
+class JournalFile;
+
+class LinearFileController
+{
+protected:
+    typedef std::deque<JournalFile*> JournalFileList_t;
+    typedef JournalFileList_t::iterator JournalFileListItr_t;
+
+    jcntl& jcntlRef_;
+    std::string journalDirectory_;
+    EmptyFilePool* emptyFilePoolPtr_;
+    JournalFile* currentJournalFilePtr_;
+    AtomicCounter<uint64_t> fileSeqCounter_;
+    AtomicCounter<uint64_t> recordIdCounter_;
+
+    JournalFileList_t journalFileList_;
+    smutex journalFileListMutex_;
+
+public:
+    LinearFileController(jcntl& jcntlRef);
+    virtual ~LinearFileController();
+
+    void initialize(const std::string& journalDirectory,
+                    EmptyFilePool* emptyFilePoolPtr,
+                    uint64_t initialFileNumberVal);
+    void finalize();
+
+    void addJournalFile(const std::string& fileName,
+                        const uint64_t fileNumber,
+                        const uint32_t fileSize_kib,
+                        const uint32_t completedDblkCount);
+
+    efpDataSize_kib_t dataSize_kib() const;
+    efpDataSize_sblks_t dataSize_sblks() const;
+    efpFileSize_kib_t fileSize_kib() const;
+    efpFileSize_sblks_t fileSize_sblks() const;
+    uint64_t getNextRecordId();
+    void pullEmptyFileFromEfp();
+    void purgeFilesToEfp();
+
+    // Functions for manipulating counts of non-current JournalFile instances in journalFileList_
+    uint32_t getEnqueuedRecordCount(const efpFileCount_t fileSeqNumber);
+    uint32_t incrEnqueuedRecordCount(const efpFileCount_t fileSeqNumber);
+    uint32_t decrEnqueuedRecordCount(const efpFileCount_t fileSeqNumber);
+    uint32_t addWriteCompletedDblkCount(const efpFileCount_t fileSeqNumber,
+                                        const uint32_t a);
+    uint16_t decrOutstandingAioOperationCount(const efpFileCount_t fileSeqNumber);
+
+    // Pass-through functions for JournalFile class
+    void asyncFileHeaderWrite(io_context_t ioContextPtr,
+                              const uint16_t userFlags,
+                              const uint64_t recordId,
+                              const uint64_t firstRecordOffset);
+    void asyncPageWrite(io_context_t ioContextPtr,
+                        aio_cb* aioControlBlockPtr,
+                        void* data,
+                        uint32_t dataSize_dblks);
+
+    uint64_t getCurrentFileSeqNum() const;
+
+    uint32_t getEnqueuedRecordCount() const;
+    uint32_t incrEnqueuedRecordCount();
+    uint32_t addEnqueuedRecordCount(const uint32_t a);
+    uint32_t decrEnqueuedRecordCount();
+    uint32_t subtrEnqueuedRecordCount(const uint32_t s);
+
+    uint32_t getWriteSubmittedDblkCount() const;
+    uint32_t addWriteSubmittedDblkCount(const uint32_t a);
+
+    uint32_t getWriteCompletedDblkCount() const;
+    uint32_t addWriteCompletedDblkCount(const uint32_t a);
+
+    uint16_t getOutstandingAioOperationCount() const;
+    uint16_t incrOutstandingAioOperationCount();
+    uint16_t decrOutstandingAioOperationCount();
+
+    bool isEmpty() const;                      // True if no writes of any kind have occurred
+    bool isDataEmpty() const;                  // True if only file header written, data is still empty
+    u_int32_t dblksRemaining() const;          // Dblks remaining until full
+    bool isFull() const;                       // True if all possible dblks have been submitted (but may not yet have returned from AIO)
+    bool isFullAndComplete() const;            // True if all submitted dblks have returned from AIO
+    u_int32_t getOutstandingAioDblks() const;  // Dblks still to be written
+    bool needNextFile() const;                 // True when next file is needed
+
+    // Debug aid
+    const std::string status(const uint8_t indentDepth) const;
+
+protected:
+    void assertCurrentJournalFileValid(const char* const functionName) const;
+    bool checkCurrentJournalFileValid() const;
+    JournalFile* find(const efpFileCount_t fileSeqNumber);
+    uint64_t getNextFileSeqNum();
+};
+
+typedef void (LinearFileController::*lfcAddJournalFileFn)(const std::string&, const uint64_t, const uint32_t, const uint32_t);
+
+}} // namespace qpid::qls_jrnl
+
+#endif // QPID_LINEARSTORE_LINEARFILECONTROLLER_H_

Added: qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/RecoveryManager.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/RecoveryManager.cpp?rev=1534736&view=auto
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/RecoveryManager.cpp (added)
+++ qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/RecoveryManager.cpp Tue Oct 22 19:09:56 2013
@@ -0,0 +1,645 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "qpid/linearstore/jrnl/RecoveryManager.h"
+
+#include <algorithm>
+#include <cstdlib>
+#include <iomanip>
+#include "qpid/linearstore/jrnl/data_tok.h"
+#include "qpid/linearstore/jrnl/deq_rec.h"
+#include "qpid/linearstore/jrnl/EmptyFilePoolManager.h"
+#include "qpid/linearstore/jrnl/enq_map.h"
+#include "qpid/linearstore/jrnl/enq_rec.h"
+#include "qpid/linearstore/jrnl/jcfg.h"
+#include "qpid/linearstore/jrnl/jdir.h"
+#include "qpid/linearstore/jrnl/JournalLog.h"
+#include "qpid/linearstore/jrnl/jrec.h"
+#include "qpid/linearstore/jrnl/LinearFileController.h"
+#include "qpid/linearstore/jrnl/txn_map.h"
+#include "qpid/linearstore/jrnl/txn_rec.h"
+#include "qpid/linearstore/jrnl/utils/enq_hdr.h"
+#include "qpid/linearstore/jrnl/utils/file_hdr.h"
+#include <sstream>
+#include <string>
+#include <vector>
+
+namespace qpid {
+namespace qls_jrnl
+{
+
+RecoveryManager::RecoveryManager(const std::string& journalDirectory,
+                                 const std::string& queuename,
+                                 enq_map& enqueueMapRef,
+                                 txn_map& transactionMapRef,
+                                 JournalLog& journalLogRef) :
+                                                 journalDirectory_(journalDirectory),
+                                                 queueName_(queuename),
+                                                 enqueueMapRef_(enqueueMapRef),
+                                                 transactionMapRef_(transactionMapRef),
+                                                 journalLogRef_(journalLogRef),
+                                                 journalEmptyFlag_(false),
+                                                 firstRecordOffset_(0),
+                                                 endOffset_(0),
+                                                 highestRecordId_(0ULL),
+                                                 highestFileNumber_(0ULL),
+                                                 lastFileFullFlag_(false),
+                                                 fileSize_kib_(0)
+{}
+
+RecoveryManager::~RecoveryManager() {}
+
+void RecoveryManager::analyzeJournals(const std::vector<std::string>* preparedTransactionListPtr,
+                                      EmptyFilePoolManager* emptyFilePoolManager,
+                                      EmptyFilePool** emptyFilePoolPtrPtr) {
+    // Analyze file headers of existing journal files
+    efpIdentity_t efpIdentity;
+    analyzeJournalFileHeaders(efpIdentity);
+    *emptyFilePoolPtrPtr = emptyFilePoolManager->getEmptyFilePool(efpIdentity);
+    fileSize_kib_ = (*emptyFilePoolPtrPtr)->fileSize_kib();
+    // Check for file full condition
+    lastFileFullFlag_ = endOffset_ == (std::streamoff)(*emptyFilePoolPtrPtr)->fileSize_kib() * 1024;
+
+    // Restore all read and write pointers and transactions
+    if (!journalEmptyFlag_) {
+        while (getNextRecordHeader()) {
+
+        }
+        if (inFileStream_.is_open()) {
+            inFileStream_.close();
+        }
+        // Remove leading files which have no enqueued records
+        removeEmptyFiles(*emptyFilePoolPtrPtr);
+
+        // Remove all txns from tmap that are not in the prepared list
+        if (preparedTransactionListPtr) {
+            std::vector<std::string> xidList;
+            transactionMapRef_.xid_list(xidList);
+            for (std::vector<std::string>::iterator itr = xidList.begin(); itr != xidList.end(); itr++) {
+                std::vector<std::string>::const_iterator pitr =
+                        std::find(preparedTransactionListPtr->begin(), preparedTransactionListPtr->end(), *itr);
+                if (pitr == preparedTransactionListPtr->end()) { // not found in prepared list
+                    txn_data_list tdl = transactionMapRef_.get_remove_tdata_list(*itr); // tdl will be empty if xid not found
+                    // Unlock any affected enqueues in emap
+                    for (tdl_itr i=tdl.begin(); i<tdl.end(); i++) {
+                        if (i->_enq_flag) { // enq op - decrement enqueue count
+                            enqueueCountList_[i->_pfid]--;
+                        } else if (enqueueMapRef_.is_enqueued(i->_drid, true)) { // deq op - unlock enq record
+                            int16_t ret = enqueueMapRef_.unlock(i->_drid);
+                            if (ret < enq_map::EMAP_OK) { // fail
+                                // enq_map::unlock()'s only error is enq_map::EMAP_RID_NOT_FOUND
+                                std::ostringstream oss;
+                                oss << std::hex << "_emap.unlock(): drid=0x\"" << i->_drid;
+                                throw jexception(jerrno::JERR_MAP_NOTFOUND, oss.str(), "RecoveryManager", "analyzeJournals");
+                            }
+                        }
+                    }
+                }
+            }
+        }
+        enqueueMapRef_.rid_list(recordIdList_);
+        recordIdListConstItr_ = recordIdList_.begin();
+    }
+}
+
+std::streamoff RecoveryManager::getEndOffset() const {
+    return endOffset_;
+}
+
+uint64_t RecoveryManager::getHighestFileNumber() const {
+    return highestFileNumber_;
+}
+
+uint64_t RecoveryManager::getHighestRecordId() const {
+    return highestRecordId_;
+}
+
+bool RecoveryManager::isLastFileFull() const {
+    return lastFileFullFlag_;
+}
+
+bool RecoveryManager::readNextRemainingRecord(void** const dataPtrPtr,
+                                              std::size_t& dataSize,
+                                              void** const xidPtrPtr,
+                                              std::size_t& xidSize,
+                                              bool& transient,
+                                              bool& external,
+                                              data_tok* const dtokp,
+                                              bool /*ignore_pending_txns*/) {
+    if (!dtokp->is_readable()) {
+        std::ostringstream oss;
+        oss << std::hex << std::setfill('0') << "dtok_id=0x" << std::setw(8) << dtokp->id();
+        oss << "; dtok_rid=0x" << std::setw(16) << dtokp->rid() << "; dtok_wstate=" << dtokp->wstate_str();
+        throw jexception(jerrno::JERR_JCNTL_ENQSTATE, oss.str(), "RecoveryManager", "readNextRemainingRecord");
+    }
+    if (recordIdListConstItr_ == recordIdList_.end()) {
+        return false;
+    }
+    enq_map::emap_data_struct_t eds;
+    enqueueMapRef_.get_data(*recordIdListConstItr_, eds);
+    uint64_t fileNumber = eds._pfid;
+    currentJournalFileConstItr_ = fileNumberNameMap_.find(fileNumber);
+    getNextFile(false);
+
+    inFileStream_.seekg(eds._file_posn, std::ifstream::beg);
+    if (!inFileStream_.good()) {
+        std::ostringstream oss;
+        oss << "Could not find offset 0x" << std::hex << eds._file_posn << " in file " << getCurrentFileName();
+        throw jexception(jerrno::JERR__FILEIO, oss.str(), "RecoveryManager", "readNextRemainingRecord");
+    }
+    ::enq_hdr_t enqueueHeader;
+    inFileStream_.read((char*)&enqueueHeader, sizeof(::enq_hdr_t));
+    if (inFileStream_.gcount() != sizeof(::enq_hdr_t)) {
+        std::ostringstream oss;
+        oss << "Could not read enqueue header from file " << getCurrentFileName() << " at offset 0x" << std::hex << eds._file_posn;
+        throw jexception(jerrno::JERR__FILEIO, oss.str(), "RecoveryManager", "readNextRemainingRecord");
+    }
+    // check flags
+    transient = ::is_enq_transient(&enqueueHeader);
+    external = ::is_enq_external(&enqueueHeader);
+
+    // read xid
+    xidSize = enqueueHeader._xidsize;
+    *xidPtrPtr = ::malloc(xidSize);
+    if (*xidPtrPtr == 0) {
+        std::ostringstream oss;
+        oss << "xidPtr, size=0x" << std::hex << xidSize;
+        throw jexception(jerrno::JERR__MALLOC, oss.str(), "RecoveryManager", "readNextRemainingRecord");
+    }
+    readJournalData((char*)*xidPtrPtr, xidSize);
+
+    // read data
+    dataSize = enqueueHeader._dsize;
+    *dataPtrPtr = ::malloc(dataSize);
+    if (*xidPtrPtr == 0) {
+        std::ostringstream oss;
+        oss << "dataPtr, size=0x" << std::hex << dataSize;
+        throw jexception(jerrno::JERR__MALLOC, oss.str(), "RecoveryManager", "readNextRemainingRecord");
+    }
+    readJournalData((char*)*dataPtrPtr, dataSize);
+    return true;
+}
+
+void RecoveryManager::setLinearFileControllerJournals(lfcAddJournalFileFn fnPtr,
+                                                      LinearFileController* lfcPtr) {
+//std::cout << "****** RecoveryManager::setLinearFileControllerJournals():" << std::endl; // DEBUG
+    for (fileNumberNameMapConstItr_t i = fileNumberNameMap_.begin(); i != fileNumberNameMap_.end(); ++i) {
+        uint32_t fileDblkCount = i->first == highestFileNumber_ ?               // Is this this last file?
+                                 endOffset_ / QLS_DBLK_SIZE_BYTES :            // Last file uses _endOffset
+                                 fileSize_kib_ * 1024 / QLS_DBLK_SIZE_BYTES;   // All others use file size to make them full
+        (lfcPtr->*fnPtr)(i->second, i->first, fileSize_kib_, fileDblkCount);
+//std::cout << "   ** f=" << i->second.substr(i->second.rfind('/')+1) << ",fn=" << i->first << ",s=" << _fileSize_kib << ",eo=" << fileDblkCount << "(" << (fileDblkCount * QLS_DBLK_SIZE_BYTES / 1024) << "kiB)" << std::endl; // DEBUG
+    }
+}
+
+std::string RecoveryManager::toString(const std::string& jid,
+                                      bool compact) {
+    std::ostringstream oss;
+    if (compact) {
+        oss << "Recovery journal analysis (jid=\"" << jid << "\"):";
+        oss << " jfl=[";
+        for (fileNumberNameMapConstItr_t i=fileNumberNameMap_.begin(); i!=fileNumberNameMap_.end(); ++i) {
+            if (i!=fileNumberNameMap_.begin()) oss << " ";
+            oss << i->first << ":" << i->second.substr(i->second.rfind('/')+1);
+        }
+        oss << "] ecl=[ ";
+        for (enqueueCountListConstItr_t j = enqueueCountList_.begin(); j!=enqueueCountList_.end(); ++j) {
+            if (j != enqueueCountList_.begin()) oss << " ";
+            oss << *j;
+        }
+        oss << " ] empty=" << (journalEmptyFlag_ ? "T" : "F");
+        oss << " fro=0x" << std::hex << firstRecordOffset_ << std::dec << " (" << (firstRecordOffset_/QLS_DBLK_SIZE_BYTES) << " dblks)";
+        oss << " eo=0x" << std::hex << endOffset_ << std::dec << " ("  << (endOffset_/QLS_DBLK_SIZE_BYTES) << " dblks)";
+        oss << " hrid=0x" << std::hex << highestRecordId_ << std::dec;
+        oss << " hfnum=0x" << std::hex << highestFileNumber_ << std::dec;
+        oss << " lffull=" << (lastFileFullFlag_ ? "T" : "F");
+    } else {
+        oss << "Recovery journal analysis (jid=\"" << jid << "\"):" << std::endl;
+        oss << "  Number of journal files = " << fileNumberNameMap_.size() << std::endl;
+        oss << "  Journal File List:" << std::endl;
+        for (fileNumberNameMapConstItr_t i=fileNumberNameMap_.begin(); i!=fileNumberNameMap_.end(); ++i) {
+            oss << "    " << i->first << ": " << i->second.substr(i->second.rfind('/')+1) << std::endl;
+        }
+        oss << "  Enqueue Counts: [ " << std::endl;
+        for (enqueueCountListConstItr_t j = enqueueCountList_.begin(); j!=enqueueCountList_.end(); ++j) {
+            if (j != enqueueCountList_.begin()) oss << ", ";
+            oss << *j;
+        }
+        oss << " ]" << std::endl;
+        oss << "  Journal empty = " << (journalEmptyFlag_ ? "TRUE" : "FALSE") << std::endl;
+        oss << "  First record offset in first file = 0x" << std::hex << firstRecordOffset_ <<
+                std::dec << " (" << (firstRecordOffset_/QLS_DBLK_SIZE_BYTES) << " dblks)" << std::endl;
+        oss << "  End offset = 0x" << std::hex << endOffset_ << std::dec << " ("  <<
+                (endOffset_/QLS_DBLK_SIZE_BYTES) << " dblks)" << std::endl;
+        oss << "  Highest rid = 0x" << std::hex << highestRecordId_ << std::dec << std::endl;
+        oss << "  Highest file number = 0x" << std::hex << highestFileNumber_ << std::dec << std::endl;
+        oss << "  Last file full = " << (lastFileFullFlag_ ? "TRUE" : "FALSE") << std::endl;
+        oss << "  Enqueued records (txn & non-txn):" << std::endl;
+    }
+    return oss.str();
+}
+
+// --- protected functions ---
+
+void RecoveryManager::analyzeJournalFileHeaders(efpIdentity_t& efpIdentity) {
+    std::string headerQueueName;
+    ::file_hdr_t fileHeader;
+    directoryList_t directoryList;
+    jdir::read_dir(journalDirectory_, directoryList, false, true, false, true);
+    for (directoryListConstItr_t i = directoryList.begin(); i != directoryList.end(); ++i) {
+        readJournalFileHeader(*i, fileHeader, headerQueueName);
+        if (headerQueueName.compare(queueName_) != 0) {
+            std::ostringstream oss;
+            oss << "Journal file " << (*i) << " belongs to queue \"" << headerQueueName << "\": ignoring";
+            journalLogRef_.log(JournalLog::LOG_WARN, queueName_, oss.str());
+        } else {
+            fileNumberNameMap_[fileHeader._file_number] = *i;
+            if (fileHeader._file_number > highestFileNumber_) {
+                highestFileNumber_ = fileHeader._file_number;
+            }
+        }
+    }
+    efpIdentity.first = fileHeader._efp_partition;
+    efpIdentity.second = fileHeader._file_size_kib;
+    enqueueCountList_.resize(fileNumberNameMap_.size(), 0);
+    currentJournalFileConstItr_ = fileNumberNameMap_.begin();
+}
+
+void RecoveryManager::checkFileStreamOk(bool checkEof) {
+    if (inFileStream_.fail() || inFileStream_.bad() || checkEof ? inFileStream_.eof() : false) {
+        throw jexception("read failure"); // TODO complete exception
+    }
+}
+
+void RecoveryManager::checkJournalAlignment(const std::streampos recordPosition) {
+    std::streampos currentPosn = recordPosition;
+    unsigned sblkOffset = currentPosn % QLS_SBLK_SIZE_BYTES;
+    if (sblkOffset)
+    {
+        std::ostringstream oss1;
+        oss1 << std::hex << "Bad record alignment found at fid=0x" << getCurrentFileNumber();
+        oss1 << " offs=0x" << currentPosn << " (likely journal overwrite boundary); " << std::dec;
+        oss1 << (QLS_SBLK_SIZE_DBLKS - (sblkOffset/QLS_DBLK_SIZE_BYTES)) << " filler record(s) required.";
+        journalLogRef_.log(JournalLog::LOG_WARN, queueName_, oss1.str());
+
+        std::ofstream outFileStream(getCurrentFileName().c_str(), std::ios_base::in | std::ios_base::out | std::ios_base::binary);
+        if (!outFileStream.good()) {
+            throw jexception(jerrno::JERR__FILEIO, getCurrentFileName(), "RecoveryManager", "checkJournalAlignment");
+        }
+        outFileStream.seekp(currentPosn);
+
+        // Prepare write buffer containing a single empty record (1 dblk)
+        void* writeBuffer = std::malloc(QLS_DBLK_SIZE_BYTES);
+        if (writeBuffer == 0) {
+            throw jexception(jerrno::JERR__MALLOC, "RecoveryManager", "checkJournalAlignment");
+        }
+        const uint32_t xmagic = QLS_EMPTY_MAGIC;
+        ::memcpy(writeBuffer, (const void*)&xmagic, sizeof(xmagic));
+        ::memset((char*)writeBuffer + sizeof(xmagic), QLS_CLEAN_CHAR, QLS_DBLK_SIZE_BYTES - sizeof(xmagic));
+
+        // Write as many empty records as are needed to get to sblk boundary
+        while (currentPosn % QLS_SBLK_SIZE_BYTES) {
+            outFileStream.write((const char*)writeBuffer, QLS_DBLK_SIZE_BYTES);
+            if (outFileStream.fail()) {
+                throw jexception(jerrno::JERR_RCVM_WRITE, "RecoveryManager", "checkJournalAlignment");
+            }
+            std::ostringstream oss2;
+            oss2 << std::hex << "Recover phase write: Wrote filler record: fid=0x" << getCurrentFileNumber();
+            oss2 << " offs=0x" << currentPosn;
+            journalLogRef_.log(JournalLog::LOG_NOTICE, queueName_, oss2.str());
+            currentPosn = outFileStream.tellp();
+        }
+        outFileStream.close();
+        std::free(writeBuffer);
+        journalLogRef_.log(JournalLog::LOG_INFO, queueName_, "Bad record alignment fixed.");
+    }
+    endOffset_ = currentPosn;
+}
+
+bool RecoveryManager::decodeRecord(jrec& record,
+                              std::size_t& cumulativeSizeRead,
+                              ::rec_hdr_t& headerRecord,
+                              std::streampos& fileOffset)
+{
+//    uint16_t start_fid = getCurrentFileNumber();
+    std::streampos start_file_offs = fileOffset;
+
+    if (highestRecordId_ == 0) {
+        highestRecordId_ = headerRecord._rid;
+    } else if (headerRecord._rid - highestRecordId_ < 0x8000000000000000ULL) { // RFC 1982 comparison for unsigned 64-bit
+        highestRecordId_ = headerRecord._rid;
+    }
+
+    bool done = false;
+    while (!done) {
+        try {
+            done = record.rcv_decode(headerRecord, &inFileStream_, cumulativeSizeRead);
+        }
+        catch (const jexception& e) {
+// TODO - review this logic and tidy up how rd._lfid is assigned. See new jinf.get_end_file() fn.
+// Original
+//             if (e.err_code() != jerrno::JERR_JREC_BADRECTAIL ||
+//                     fid != (rd._ffid ? rd._ffid - 1 : _num_jfiles - 1)) throw;
+// Tried this, but did not work
+//             if (e.err_code() != jerrno::JERR_JREC_BADRECTAIL || h._magic != 0) throw;
+            checkJournalAlignment(start_file_offs);
+//             rd._lfid = start_fid;
+            return false;
+        }
+        if (!done && !getNextFile(false)) {
+            checkJournalAlignment(start_file_offs);
+            return false;
+        }
+    }
+    return true;
+}
+
+std::string RecoveryManager::getCurrentFileName() const {
+    return currentJournalFileConstItr_->second;
+}
+
+uint64_t RecoveryManager::getCurrentFileNumber() const {
+    return currentJournalFileConstItr_->first;
+}
+
+bool RecoveryManager::getNextFile(bool jumpToFirstRecordOffsetFlag) {
+    if (inFileStream_.is_open()) {
+        if (inFileStream_.eof() || !inFileStream_.good())
+        {
+            inFileStream_.clear();
+            endOffset_ = inFileStream_.tellg(); // remember file offset before closing
+            if (endOffset_ == -1) { throw jexception("tellg() failure"); } // Check for error code -1 TODO: compelete exception
+            inFileStream_.close();
+            if (++currentJournalFileConstItr_ == fileNumberNameMap_.end()) {
+                return false;
+            }
+        }
+    }
+    if (!inFileStream_.is_open())
+    {
+        inFileStream_.clear(); // clear eof flag, req'd for older versions of c++
+        inFileStream_.open(getCurrentFileName().c_str(), std::ios_base::in | std::ios_base::binary);
+        if (!inFileStream_.good()) {
+            throw jexception(jerrno::JERR__FILEIO, getCurrentFileName(), "RecoveryManager", "getNextFile");
+        }
+
+        // Read file header
+//std::cout << " F" << getCurrentFileNumber() << std::flush; // DEBUG
+        file_hdr_t fhdr;
+        inFileStream_.read((char*)&fhdr, sizeof(fhdr));
+        checkFileStreamOk(true);
+        if (fhdr._rhdr._magic == QLS_FILE_MAGIC) {
+            firstRecordOffset_ = fhdr._fro;
+            std::streamoff foffs = jumpToFirstRecordOffsetFlag ? firstRecordOffset_ : QLS_SBLK_SIZE_BYTES;
+            inFileStream_.seekg(foffs);
+        } else {
+            inFileStream_.close();
+            if (currentJournalFileConstItr_ == fileNumberNameMap_.begin()) {
+                journalEmptyFlag_ = true;
+            }
+            return false;
+        }
+    }
+    return true;
+}
+
+bool RecoveryManager::getNextRecordHeader()
+{
+    std::size_t cum_size_read = 0;
+    void* xidp = 0;
+    rec_hdr_t h;
+
+    bool hdr_ok = false;
+    std::streampos file_pos;
+    while (!hdr_ok) {
+        if (!inFileStream_.is_open()) {
+            if (!getNextFile(true)) {
+                return false;
+            }
+        }
+        file_pos = inFileStream_.tellg();
+//std::cout << " 0x" << std::hex << file_pos << std::dec; // DEBUG
+        inFileStream_.read((char*)&h, sizeof(rec_hdr_t));
+        if (inFileStream_.gcount() == sizeof(rec_hdr_t)) {
+            hdr_ok = true;
+        } else {
+            if (!getNextFile(true)) {
+                return false;
+            }
+        }
+    }
+
+    switch(h._magic) {
+        case QLS_ENQ_MAGIC:
+            {
+//std::cout << ".e" << std::flush; // DEBUG
+                enq_rec er;
+                uint64_t start_fid = getCurrentFileNumber(); // fid may increment in decode() if record folds over file boundary
+                if (!decodeRecord(er, cum_size_read, h, file_pos)) {
+                    return false;
+                }
+                if (!er.is_transient()) { // Ignore transient msgs
+                    enqueueCountList_[start_fid]++;
+                    if (er.xid_size()) {
+                        er.get_xid(&xidp);
+                        if (xidp != 0) { throw jexception("Null xid with non-null xid_size"); } // TODO complete exception
+                        std::string xid((char*)xidp, er.xid_size());
+                        transactionMapRef_.insert_txn_data(xid, txn_data(h._rid, 0, start_fid, true));
+                        if (transactionMapRef_.set_aio_compl(xid, h._rid) < txn_map::TMAP_OK) { // fail - xid or rid not found
+                            std::ostringstream oss;
+                            oss << std::hex << "_tmap.set_aio_compl: txn_enq xid=\"" << xid << "\" rid=0x" << h._rid;
+                            throw jexception(jerrno::JERR_MAP_NOTFOUND, oss.str(), "RecoveryManager", "getNextRecordHeader");
+                        }
+                        std::free(xidp);
+                    } else {
+                        if (enqueueMapRef_.insert_pfid(h._rid, start_fid, file_pos) < enq_map::EMAP_OK) { // fail
+                            // The only error code emap::insert_pfid() returns is enq_map::EMAP_DUP_RID.
+                            std::ostringstream oss;
+                            oss << std::hex << "rid=0x" << h._rid << " _pfid=0x" << start_fid;
+                            throw jexception(jerrno::JERR_MAP_DUPLICATE, oss.str(), "RecoveryManager", "getNextRecordHeader");
+                        }
+                    }
+                }
+            }
+            break;
+        case QLS_DEQ_MAGIC:
+            {
+//std::cout << ".d" << std::flush; // DEBUG
+                deq_rec dr;
+                uint16_t start_fid = getCurrentFileNumber(); // fid may increment in decode() if record folds over file boundary
+                if (!decodeRecord(dr, cum_size_read, h, file_pos)) {
+                    return false;
+                }
+                if (dr.xid_size()) {
+                    // If the enqueue is part of a pending txn, it will not yet be in emap
+                    enqueueMapRef_.lock(dr.deq_rid()); // ignore not found error
+                    dr.get_xid(&xidp);
+                    if (xidp != 0) { throw jexception("Null xid with non-null xid_size"); } // TODO complete exception
+                    std::string xid((char*)xidp, dr.xid_size());
+                    transactionMapRef_.insert_txn_data(xid, txn_data(dr.rid(), dr.deq_rid(), start_fid, false,
+                            dr.is_txn_coml_commit()));
+                    if (transactionMapRef_.set_aio_compl(xid, dr.rid()) < txn_map::TMAP_OK) { // fail - xid or rid not found
+                        std::ostringstream oss;
+                        oss << std::hex << "_tmap.set_aio_compl: txn_deq xid=\"" << xid << "\" rid=0x" << dr.rid();
+                        throw jexception(jerrno::JERR_MAP_NOTFOUND, oss.str(), "RecoveryManager", "getNextRecordHeader");
+                    }
+                    std::free(xidp);
+                } else {
+                    uint64_t enq_fid;
+                    if (enqueueMapRef_.get_remove_pfid(dr.deq_rid(), enq_fid, true) == enq_map::EMAP_OK) { // ignore not found error
+                        enqueueCountList_[enq_fid]--;
+                    }
+                }
+            }
+            break;
+        case QLS_TXA_MAGIC:
+            {
+//std::cout << ".a" << std::flush; // DEBUG
+                txn_rec ar;
+                if (!decodeRecord(ar, cum_size_read, h, file_pos)) {
+                    return false;
+                }
+                // Delete this txn from tmap, unlock any locked records in emap
+                ar.get_xid(&xidp);
+                if (xidp != 0) {
+                    throw jexception("Null xid with non-null xid_size"); // TODO complete exception
+                }
+                std::string xid((char*)xidp, ar.xid_size());
+                txn_data_list tdl = transactionMapRef_.get_remove_tdata_list(xid); // tdl will be empty if xid not found
+                for (tdl_itr itr = tdl.begin(); itr != tdl.end(); itr++) {
+                    if (itr->_enq_flag) {
+                        enqueueCountList_[itr->_pfid]--;
+                    } else {
+                        enqueueMapRef_.unlock(itr->_drid); // ignore not found error
+                    }
+                }
+                std::free(xidp);
+            }
+            break;
+        case QLS_TXC_MAGIC:
+            {
+//std::cout << ".t" << std::flush; // DEBUG
+                txn_rec cr;
+                if (!decodeRecord(cr, cum_size_read, h, file_pos)) {
+                    return false;
+                }
+                // Delete this txn from tmap, process records into emap
+                cr.get_xid(&xidp);
+                if (xidp != 0) {
+                    throw jexception("Null xid with non-null xid_size"); // TODO complete exception
+                }
+                std::string xid((char*)xidp, cr.xid_size());
+                txn_data_list tdl = transactionMapRef_.get_remove_tdata_list(xid); // tdl will be empty if xid not found
+                for (tdl_itr itr = tdl.begin(); itr != tdl.end(); itr++) {
+                    if (itr->_enq_flag) { // txn enqueue
+                        if (enqueueMapRef_.insert_pfid(itr->_rid, itr->_pfid, file_pos) < enq_map::EMAP_OK) { // fail
+                            // The only error code emap::insert_pfid() returns is enq_map::EMAP_DUP_RID.
+                            std::ostringstream oss;
+                            oss << std::hex << "rid=0x" << itr->_rid << " _pfid=0x" << itr->_pfid;
+                            throw jexception(jerrno::JERR_MAP_DUPLICATE, oss.str(), "RecoveryManager", "getNextRecordHeader");
+                        }
+                    } else { // txn dequeue
+                        uint64_t enq_fid;
+                        if (enqueueMapRef_.get_remove_pfid(itr->_drid, enq_fid, true) == enq_map::EMAP_OK) // ignore not found error
+                            enqueueCountList_[enq_fid]--;
+                    }
+                }
+                std::free(xidp);
+            }
+            break;
+        case QLS_EMPTY_MAGIC:
+            {
+//std::cout << ".x" << std::flush; // DEBUG
+                uint32_t rec_dblks = jrec::size_dblks(sizeof(::rec_hdr_t));
+                inFileStream_.ignore(rec_dblks * QLS_DBLK_SIZE_BYTES - sizeof(::rec_hdr_t));
+                checkFileStreamOk(false);
+                if (!getNextFile(false)) {
+                    return false;
+                }
+            }
+            break;
+        case 0:
+//std::cout << ".0" << std::endl << std::flush; // DEBUG
+            checkJournalAlignment(file_pos);
+            return false;
+        default:
+//std::cout << ".?" << std::endl << std::flush; // DEBUG
+            // Stop as this is the overwrite boundary.
+            checkJournalAlignment(file_pos);
+            return false;
+    }
+    return true;
+}
+
+void RecoveryManager::readJournalData(char* target,
+                                      const std::streamsize readSize) {
+    std::streamoff bytesRead = 0;
+    while (bytesRead < readSize) {
+        if (inFileStream_.eof()) {
+            getNextFile(false);
+        }
+        bool readFitsInFile = inFileStream_.tellg() + readSize <= fileSize_kib_ * 1024;
+        std::streamoff readSize = readFitsInFile ? readSize : (fileSize_kib_ * 1024) - inFileStream_.tellg();
+        inFileStream_.read(target + bytesRead, readSize);
+        if (inFileStream_.gcount() != readSize) {
+            throw jexception(); // TODO - proper exception
+        }
+        bytesRead += readSize;
+    }
+}
+
+// static private
+void RecoveryManager::readJournalFileHeader(const std::string& journalFileName,
+                                            ::file_hdr_t& fileHeaderRef,
+                                            std::string& queueName) {
+    const std::size_t headerBlockSize = QLS_JRNL_FHDR_RES_SIZE_SBLKS * QLS_SBLK_SIZE_KIB * 1024;
+    char buffer[headerBlockSize];
+    std::ifstream ifs(journalFileName.c_str(), std::ifstream::in | std::ifstream::binary);
+    if (!ifs.good()) {
+        std::ostringstream oss;
+        oss << "File=" << journalFileName;
+        throw jexception(jerrno::JERR_RCVM_OPENRD, oss.str(), "RecoveryManager", "readJournalFileHeader");
+    }
+    ifs.read(buffer, headerBlockSize);
+    if (!ifs) {
+        std::streamsize s = ifs.gcount();
+        ifs.close();
+        std::ostringstream oss;
+        oss << "File=" << journalFileName << "; attempted_read_size=" << headerBlockSize << "; actual_read_size=" << s;
+        throw jexception(jerrno::JERR_RCVM_READ, oss.str(), "RecoveryManager", "readJournalFileHeader");
+    }
+    ifs.close();
+    ::memcpy(&fileHeaderRef, buffer, sizeof(::file_hdr_t));
+    queueName.assign(buffer + sizeof(::file_hdr_t), fileHeaderRef._queue_name_len);
+
+}
+
+void RecoveryManager::removeEmptyFiles(EmptyFilePool* emptyFilePoolPtr) {
+    while (enqueueCountList_.front() == 0 && enqueueCountList_.size() > 1) {
+        fileNumberNameMapItr_t i = fileNumberNameMap_.begin();
+//std::cout << "*** File " << i->first << ": " << i->second << " is empty." << std::endl;
+        emptyFilePoolPtr->returnEmptyFile(i->second);
+        fileNumberNameMap_.erase(i);
+        enqueueCountList_.pop_front();
+    }
+}
+
+}} // namespace qpid::qls_jrnl

Added: qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/RecoveryManager.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/RecoveryManager.h?rev=1534736&view=auto
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/RecoveryManager.h (added)
+++ qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/RecoveryManager.h Tue Oct 22 19:09:56 2013
@@ -0,0 +1,134 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#ifndef QPID_LINEARSTORE_RECOVERYSTATE_H_
+#define QPID_LINEARSTORE_RECOVERYSTATE_H_
+
+#include <deque>
+#include <fstream>
+#include <map>
+#include "qpid/linearstore/jrnl/LinearFileController.h"
+#include <stdint.h>
+#include <vector>
+
+struct file_hdr_t;
+struct rec_hdr_t;
+
+namespace qpid {
+namespace qls_jrnl {
+
+class data_tok;
+class enq_map;
+class EmptyFilePool;
+class EmptyFilePoolManager;
+class JournalLog;
+class jrec;
+class txn_map;
+
+class RecoveryManager
+{
+protected:
+    // Types
+    typedef std::vector<std::string> directoryList_t;
+    typedef directoryList_t::const_iterator directoryListConstItr_t;
+    typedef std::map<uint64_t, std::string> fileNumberNameMap_t;
+    typedef fileNumberNameMap_t::iterator fileNumberNameMapItr_t;
+    typedef fileNumberNameMap_t::const_iterator fileNumberNameMapConstItr_t;
+    typedef std::deque<uint32_t> enqueueCountList_t;
+    typedef enqueueCountList_t::const_iterator enqueueCountListConstItr_t;
+    typedef std::vector<uint64_t> recordIdList_t;
+    typedef recordIdList_t::const_iterator recordIdListConstItr_t;
+
+    // Location and identity
+    const std::string journalDirectory_;
+    const std::string queueName_;
+    enq_map& enqueueMapRef_;
+    txn_map& transactionMapRef_;
+    JournalLog& journalLogRef_;
+
+    // Initial journal analysis data
+    fileNumberNameMap_t fileNumberNameMap_;     ///< File number - name map
+    enqueueCountList_t enqueueCountList_;       ///< Number enqueued records found for each file
+    bool journalEmptyFlag_;                     ///< Journal data files empty
+    std::streamoff firstRecordOffset_;          ///< First record offset in ffid
+    std::streamoff endOffset_;                  ///< End offset (first byte past last record)
+    uint64_t highestRecordId_;                  ///< Highest rid found
+    uint64_t highestFileNumber_;                ///< Highest file number found
+    bool lastFileFullFlag_;                     ///< Last file is full
+
+    // State for recovery of individual enqueued records
+    uint32_t fileSize_kib_;
+    fileNumberNameMapConstItr_t currentJournalFileConstItr_;
+    std::string currentFileName_;
+    std::ifstream inFileStream_;
+    recordIdList_t recordIdList_;
+    recordIdListConstItr_t recordIdListConstItr_;
+
+public:
+    RecoveryManager(const std::string& journalDirectory,
+                    const std::string& queuename,
+                    enq_map& enqueueMapRef,
+                    txn_map& transactionMapRef,
+                    JournalLog& journalLogRef);
+    virtual ~RecoveryManager();
+
+    void analyzeJournals(const std::vector<std::string>* preparedTransactionListPtr,
+                         EmptyFilePoolManager* emptyFilePoolManager,
+                         EmptyFilePool** emptyFilePoolPtrPtr);
+    std::streamoff getEndOffset() const;
+    uint64_t getHighestFileNumber() const;
+    uint64_t getHighestRecordId() const;
+    bool isLastFileFull() const;
+    bool readNextRemainingRecord(void** const dataPtrPtr,
+                                 std::size_t& dataSize,
+                                 void** const xidPtrPtr,
+                                 std::size_t& xidSize,
+                                 bool& transient,
+                                 bool& external,
+                                 data_tok* const dtokp,
+                                 bool ignore_pending_txns);
+    void setLinearFileControllerJournals(lfcAddJournalFileFn fnPtr,
+                                         LinearFileController* lfcPtr);
+    std::string toString(const std::string& jid,
+                         bool compact = true);
+protected:
+    void analyzeJournalFileHeaders(efpIdentity_t& efpIdentity);
+    void checkFileStreamOk(bool checkEof);
+    void checkJournalAlignment(const std::streampos recordPosition);
+    bool decodeRecord(jrec& record,
+                      std::size_t& cumulativeSizeRead,
+                      ::rec_hdr_t& recordHeader,
+                      std::streampos& fileOffset);
+    std::string getCurrentFileName() const;
+    uint64_t getCurrentFileNumber() const;
+    bool getNextFile(bool jumpToFirstRecordOffsetFlag);
+    bool getNextRecordHeader();
+    void readJournalData(char* target, const std::streamsize size);
+    void removeEmptyFiles(EmptyFilePool* emptyFilePoolPtr);
+
+    static void readJournalFileHeader(const std::string& journalFileName,
+                                      ::file_hdr_t& fileHeaderRef,
+                                      std::string& queueName);
+};
+
+}} // namespace qpid::qls_jrnl
+
+#endif // QPID_LINEARSTORE_RECOVERYSTATE_H_

Added: qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/aio.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/aio.h?rev=1534736&view=auto
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/aio.h (added)
+++ qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/aio.h Tue Oct 22 19:09:56 2013
@@ -0,0 +1,140 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#ifndef QPID_LEGACYSTORE_JRNL_AIO_H
+#define QPID_LEGACYSTORE_JRNL_AIO_H
+
+#include <libaio.h>
+#include <cstring>
+#include <string.h>
+
+namespace qpid
+{
+namespace qls_jrnl
+{
+
+typedef iocb aio_cb;
+typedef io_event aio_event;
+
+/**
+ * \brief This class is a C++ wrapper class for the libaio functions used by the journal. Note that only those
+ * functions used by the journal are included here. This is not a complete implementation of all libaio functions.
+ */
+class aio
+{
+public:
+    static inline int queue_init(int maxevents, io_context_t* ctxp)
+    {
+        return ::io_queue_init(maxevents, ctxp);
+    }
+
+    static inline int queue_release(io_context_t ctx)
+    {
+        return ::io_queue_release(ctx);
+    }
+
+    static inline int submit(io_context_t ctx, long nr, aio_cb* aios[])
+    {
+        return ::io_submit(ctx, nr, aios);
+    }
+
+    static inline int getevents(io_context_t ctx, long min_nr, long nr, aio_event* events, timespec* const timeout)
+    {
+        return ::io_getevents(ctx, min_nr, nr, events, timeout);
+    }
+
+    /**
+     * \brief This function allows iocbs to be initialized with a pointer that can be re-used. This prepares an
+     * aio_cb struct for read use. (This is a wrapper for libaio's ::io_prep_pread() function.)
+     *
+     * \param aiocbp Pointer to the aio_cb struct to be prepared.
+     * \param fd File descriptor to be used for read.
+     * \param buf Pointer to buffer in which read data is to be placed.
+     * \param count Number of bytes to read - buffer must be large enough.
+     * \param offset Offset within file from which data will be read.
+     */
+    static inline void prep_pread(aio_cb* aiocbp, int fd, void* buf, std::size_t count, int64_t offset)
+    {
+        ::io_prep_pread(aiocbp, fd, buf, count, offset);
+    }
+
+    /**
+     * \brief Special version of libaio's io_prep_pread() which preserves the value of the data pointer. This allows
+     * iocbs to be initialized with a pointer that can be re-used. This prepares a aio_cb struct for read use.
+     *
+     * \param aiocbp Pointer to the aio_cb struct to be prepared.
+     * \param fd File descriptor to be used for read.
+     * \param buf Pointer to buffer in which read data is to be placed.
+     * \param count Number of bytes to read - buffer must be large enough.
+     * \param offset Offset within file from which data will be read.
+     */
+    static inline void prep_pread_2(aio_cb* aiocbp, int fd, void* buf, std::size_t count, int64_t offset)
+    {
+        std::memset((void*) ((char*) aiocbp + sizeof(void*)), 0, sizeof(aio_cb) - sizeof(void*));
+        aiocbp->aio_fildes = fd;
+        aiocbp->aio_lio_opcode = IO_CMD_PREAD;
+        aiocbp->aio_reqprio = 0;
+        aiocbp->u.c.buf = buf;
+        aiocbp->u.c.nbytes = count;
+        aiocbp->u.c.offset = offset;
+    }
+
+    /**
+     * \brief This function allows iocbs to be initialized with a pointer that can be re-used. This function prepares
+     * an aio_cb struct for write use. (This is a wrapper for libaio's ::io_prep_pwrite() function.)
+     *
+     * \param aiocbp Pointer to the aio_cb struct to be prepared.
+     * \param fd File descriptor to be used for write.
+     * \param buf Pointer to buffer in which data to be written is located.
+     * \param count Number of bytes to write.
+     * \param offset Offset within file to which data will be written.
+     */
+    static inline void prep_pwrite(aio_cb* aiocbp, int fd, void* buf, std::size_t count, int64_t offset)
+    {
+        ::io_prep_pwrite(aiocbp, fd, buf, count, offset);
+    }
+
+    /**
+     * \brief Special version of libaio's io_prep_pwrite() which preserves the value of the data pointer. This allows
+     * iocbs to be initialized with a pointer that can be re-used. This function prepares an aio_cb struct for write
+     * use.
+     *
+     * \param aiocbp Pointer to the aio_cb struct to be prepared.
+     * \param fd File descriptor to be used for write.
+     * \param buf Pointer to buffer in which data to be written is located.
+     * \param count Number of bytes to write.
+     * \param offset Offset within file to which data will be written.
+     */
+    static inline void prep_pwrite_2(aio_cb* aiocbp, int fd, void* buf, std::size_t count, int64_t offset)
+    {
+        std::memset((void*) ((char*) aiocbp + sizeof(void*)), 0, sizeof(aio_cb) - sizeof(void*));
+        aiocbp->aio_fildes = fd;
+        aiocbp->aio_lio_opcode = IO_CMD_PWRITE;
+        aiocbp->aio_reqprio = 0;
+        aiocbp->u.c.buf = buf;
+        aiocbp->u.c.nbytes = count;
+        aiocbp->u.c.offset = offset;
+    }
+};
+
+}}
+
+#endif // ifndef QPID_LEGACYSTORE_JRNL_AIO_H

Added: qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/aio_callback.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/aio_callback.h?rev=1534736&view=auto
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/aio_callback.h (added)
+++ qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/aio_callback.h Tue Oct 22 19:09:56 2013
@@ -0,0 +1,45 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#ifndef QPID_LEGACYSTORE_JRNL_AIO_CALLBACK_H
+#define QPID_LEGACYSTORE_JRNL_AIO_CALLBACK_H
+
+#include <stdint.h>
+#include <vector>
+
+namespace qpid
+{
+namespace qls_jrnl
+{
+
+    class data_tok;
+
+    class aio_callback
+    {
+    public:
+        virtual ~aio_callback() {}
+        virtual void wr_aio_cb(std::vector<data_tok*>& dtokl) = 0;
+        virtual void rd_aio_cb(std::vector<uint16_t>& pil) = 0;
+    };
+
+}}
+
+#endif // ifndef QPID_LEGACYSTORE_JRNL_AIO_CALLBACK_H

Added: qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/cvar.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/cvar.h?rev=1534736&view=auto
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/cvar.h (added)
+++ qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/cvar.h Tue Oct 22 19:09:56 2013
@@ -0,0 +1,76 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#ifndef QPID_LEGACYSTORE_JRNL_CVAR_H
+#define QPID_LEGACYSTORE_JRNL_CVAR_H
+
+#include <cstring>
+#include "qpid/linearstore/jrnl/jerrno.h"
+#include "qpid/linearstore/jrnl/jexception.h"
+#include "qpid/linearstore/jrnl/smutex.h"
+#include "qpid/linearstore/jrnl/time_ns.h"
+#include <pthread.h>
+#include <sstream>
+
+namespace qpid
+{
+namespace qls_jrnl
+{
+
+    // Ultra-simple thread condition variable class
+    class cvar
+    {
+    private:
+        const smutex& _sm;
+        pthread_cond_t _c;
+    public:
+        inline cvar(const smutex& sm) : _sm(sm) { ::pthread_cond_init(&_c, 0); }
+        inline ~cvar() { ::pthread_cond_destroy(&_c); }
+        inline void wait()
+        {
+            PTHREAD_CHK(::pthread_cond_wait(&_c, _sm.get()), "::pthread_cond_wait", "cvar", "wait");
+        }
+        inline void timedwait(timespec& ts)
+        {
+            PTHREAD_CHK(::pthread_cond_timedwait(&_c, _sm.get(), &ts), "::pthread_cond_timedwait", "cvar", "timedwait");
+        }
+        inline bool waitintvl(const long intvl_ns)
+        {
+            time_ns t; t.now(); t+=intvl_ns;
+            int ret = ::pthread_cond_timedwait(&_c, _sm.get(), &t);
+            if (ret == ETIMEDOUT)
+                return true;
+            PTHREAD_CHK(ret, "::pthread_cond_timedwait", "cvar", "waitintvl");
+            return false;
+        }
+        inline void signal()
+        {
+            PTHREAD_CHK(::pthread_cond_signal(&_c), "::pthread_cond_signal", "cvar", "notify");
+        }
+        inline void broadcast()
+        {
+            PTHREAD_CHK(::pthread_cond_broadcast(&_c), "::pthread_cond_broadcast", "cvar", "broadcast");
+        }
+    };
+
+}}
+
+#endif // ifndef QPID_LEGACYSTORE_JRNL_CVAR_H

Added: qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/data_tok.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/data_tok.cpp?rev=1534736&view=auto
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/data_tok.cpp (added)
+++ qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/data_tok.cpp Tue Oct 22 19:09:56 2013
@@ -0,0 +1,188 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "qpid/linearstore/jrnl/data_tok.h"
+
+#include <iomanip>
+#include "qpid/linearstore/jrnl/jerrno.h"
+#include "qpid/linearstore/jrnl/jexception.h"
+#include "qpid/linearstore/jrnl/slock.h"
+#include <sstream>
+
+namespace qpid
+{
+namespace qls_jrnl
+{
+
+// Static members
+
+uint64_t data_tok::_cnt = 0;
+smutex data_tok::_mutex;
+
+data_tok::data_tok():
+    _wstate(NONE),
+//    _rstate(UNREAD),
+    _dsize(0),
+    _dblks_written(0),
+//    _dblks_read(0),
+    _pg_cnt(0),
+    _fid(0),
+    _rid(0),
+    _xid(),
+    _dequeue_rid(0),
+    _external_rid(false)
+{
+    slock s(_mutex);
+    _icnt = _cnt++;
+}
+
+data_tok::~data_tok() {}
+
+const char*
+data_tok::wstate_str() const
+{
+    return wstate_str(_wstate);
+}
+
+const char*
+data_tok::wstate_str(write_state wstate)
+{
+    switch (wstate)
+    {
+        case NONE:
+            return "NONE";
+        case ENQ_CACHED:
+            return "ENQ_CACHED";
+        case ENQ_PART:
+            return "ENQ_PART";
+        case ENQ_SUBM:
+            return "ENQ_SUBM";
+        case ENQ:
+            return "ENQ";
+        case DEQ_CACHED:
+            return "DEQ_CACHED";
+        case DEQ_PART:
+            return "DEQ_PART";
+        case DEQ_SUBM:
+            return "DEQ_SUBM";
+        case DEQ:
+            return "DEQ";
+        case ABORT_CACHED:
+            return "ABORT_CACHED";
+        case ABORT_PART:
+            return "ABORT_PART";
+        case ABORT_SUBM:
+            return "ABORT_SUBM";
+        case ABORTED:
+            return "ABORTED";
+        case COMMIT_CACHED:
+            return "COMMIT_CACHED";
+        case COMMIT_PART:
+            return "COMMIT_PART";
+        case COMMIT_SUBM:
+            return "COMMIT_SUBM";
+        case COMMITTED:
+            return "COMMITTED";
+    }
+    // Not using default: forces compiler to ensure all cases are covered.
+    return "<wstate unknown>";
+}
+
+/*
+const char*
+data_tok::rstate_str() const
+{
+    return rstate_str(_rstate);
+}
+*/
+
+/*
+const char*
+data_tok::rstate_str(read_state rstate)
+{
+    switch (rstate)
+    {
+        case NONE:
+            return "NONE";
+        case READ_PART:
+            return "READ_PART";
+        case SKIP_PART:
+            return "SKIP_PART";
+        case READ:
+            return "READ";
+    // Not using default: forces compiler to ensure all cases are covered.
+    }
+    return "<rstate unknown>";
+}
+*/
+
+/*
+void
+data_tok::set_rstate(const read_state rstate)
+{
+    if (_wstate != ENQ && rstate != UNREAD)
+    {
+        std::ostringstream oss;
+        oss << "Attempted to change read state to " << rstate_str(rstate);
+        oss << " while write state is not enqueued (wstate ENQ); wstate=" << wstate_str() << ".";
+        throw jexception(jerrno::JERR_DTOK_ILLEGALSTATE, oss.str(), "data_tok",
+                "set_rstate");
+    }
+    _rstate = rstate;
+}
+*/
+
+void
+data_tok::reset()
+{
+    _wstate = NONE;
+//    _rstate = UNREAD;
+    _dsize = 0;
+    _dblks_written = 0;
+//    _dblks_read = 0;
+    _pg_cnt = 0;
+    _fid = 0;
+    _rid = 0;
+    _xid.clear();
+}
+
+// debug aid
+std::string
+data_tok::status_str() const
+{
+    std::ostringstream oss;
+    oss << std::hex << std::setfill('0');
+    oss << "dtok id=0x" << _icnt << "; ws=" << wstate_str()/* << "; rs=" << rstate_str()*/;
+    oss << "; fid=0x" << _fid << "; rid=0x" << _rid << "; xid=";
+    for (unsigned i=0; i<_xid.size(); i++)
+    {
+        if (isprint(_xid[i]))
+            oss << _xid[i];
+        else
+            oss << "/" << std::setw(2) << (int)((char)_xid[i]);
+    }
+    oss << "; drid=0x" << _dequeue_rid << " extrid=" << (_external_rid?"T":"F");
+    oss << "; ds=0x" << _dsize << "; dw=0x" << _dblks_written/* << "; dr=0x" << _dblks_read*/;
+    oss << "; pc=0x" << _pg_cnt;
+    return oss.str();
+}
+
+}}

Added: qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/data_tok.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/data_tok.h?rev=1534736&view=auto
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/data_tok.h (added)
+++ qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/data_tok.h Tue Oct 22 19:09:56 2013
@@ -0,0 +1,160 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#ifndef QPID_LEGACYSTORE_JRNL_DATA_TOK_H
+#define QPID_LEGACYSTORE_JRNL_DATA_TOK_H
+
+namespace qpid
+{
+namespace qls_jrnl
+{
+class data_tok;
+}}
+
+#include <cassert>
+#include <cstddef>
+#include "qpid/linearstore/jrnl/smutex.h"
+#include <pthread.h>
+#include <string>
+
+namespace qpid
+{
+namespace qls_jrnl
+{
+
+    /**
+    * \class data_tok
+    * \brief Data block token (data_tok) used to track wstate of a data block through asynchronous
+    *     I/O process
+    */
+    class data_tok
+    {
+    public:
+        // TODO: Fix this, separate write state from operation
+        // ie: wstate = NONE, CACHED, PART, SUBM, COMPL
+        //     op = ENQUEUE, DEQUEUE, ABORT, COMMIT
+        enum write_state
+        {
+            NONE,       ///< Data block not sent to journal
+            ENQ_CACHED, ///< Data block enqueue written to page cache
+            ENQ_PART,   ///< Data block part-submitted to AIO, waiting for page buffer to free up
+            ENQ_SUBM,   ///< Data block enqueue submitted to AIO
+            ENQ,        ///< Data block enqueue AIO write complete (enqueue complete)
+            DEQ_CACHED, ///< Data block dequeue written to page cache
+            DEQ_PART,   ///< Data block part-submitted to AIO, waiting for page buffer to free up
+            DEQ_SUBM,   ///< Data block dequeue submitted to AIO
+            DEQ,        ///< Data block dequeue AIO write complete (dequeue complete)
+            ABORT_CACHED,
+            ABORT_PART,
+            ABORT_SUBM,
+            ABORTED,
+            COMMIT_CACHED,
+            COMMIT_PART,
+            COMMIT_SUBM,
+            COMMITTED
+        };
+
+/*
+        enum read_state
+        {
+            UNREAD,     ///< Data block not read
+            READ_PART,  ///< Data block is part-read; waiting for page buffer to fill
+            SKIP_PART,  ///< Prev. dequeued dblock is part-skipped; waiting for page buffer to fill
+            READ        ///< Data block is fully read
+        };
+*/
+
+    protected:
+        static smutex _mutex;
+        static uint64_t _cnt;
+        uint64_t    _icnt;
+        write_state _wstate;        ///< Enqueued / dequeued state of data
+//        read_state  _rstate;        ///< Read state of data
+        std::size_t _dsize;         ///< Data size in bytes
+        uint32_t    _dblks_written; ///< Data blocks read/written
+//        uint32_t    _dblks_read;    ///< Data blocks read/written
+        uint32_t    _pg_cnt;        ///< Page counter - incr for each page containing part of data
+        uint64_t    _fid;           ///< FID containing header of enqueue record
+        uint64_t    _rid;           ///< RID of data set by enqueue operation
+        std::string _xid;           ///< XID set by enqueue operation
+        uint64_t    _dequeue_rid;   ///< RID of data set by dequeue operation
+        bool        _external_rid;  ///< Flag to indicate external setting of rid
+
+    public:
+        data_tok();
+        virtual ~data_tok();
+
+        inline uint64_t id() const { return _icnt; }
+        inline write_state wstate() const { return _wstate; }
+        const char* wstate_str() const;
+        static const char* wstate_str(write_state wstate);
+//        inline read_state rstate() const { return _rstate; }
+//        const char* rstate_str() const;
+//        static const char* rstate_str(read_state rstate);
+        inline bool is_writable() const { return _wstate == NONE || _wstate == ENQ_PART; }
+        inline bool is_enqueued() const { return _wstate == ENQ; }
+        inline bool is_readable() const { return _wstate == ENQ; }
+//        inline bool is_read() const { return _rstate == READ; }
+        inline bool is_dequeueable() const { return _wstate == ENQ || _wstate == DEQ_PART; }
+        inline void set_wstate(const write_state wstate) { _wstate = wstate; }
+//        void set_rstate(const read_state rstate);
+        inline std::size_t dsize() const { return _dsize; }
+        inline void set_dsize(std::size_t dsize) { _dsize = dsize; }
+
+        inline uint32_t dblocks_written() const { return _dblks_written; }
+        inline void incr_dblocks_written(uint32_t dblks_written)
+                { _dblks_written += dblks_written; }
+        inline void set_dblocks_written(uint32_t dblks_written) { _dblks_written = dblks_written; }
+
+//        inline uint32_t dblocks_read() const { return _dblks_read; }
+//        inline void incr_dblocks_read(uint32_t dblks_read) { _dblks_read += dblks_read; }
+//        inline void set_dblocks_read(uint32_t dblks_read) { _dblks_read = dblks_read; }
+
+        inline uint32_t pg_cnt() const { return _pg_cnt; }
+        inline uint32_t incr_pg_cnt() { return ++_pg_cnt; }
+        inline uint32_t decr_pg_cnt() { assert(_pg_cnt != 0); return --_pg_cnt; }
+
+        inline uint64_t fid() const { return _fid; }
+        inline void set_fid(const uint64_t fid) { _fid = fid; }
+        inline uint64_t rid() const { return _rid; }
+        inline void set_rid(const uint64_t rid) { _rid = rid; }
+        inline uint64_t dequeue_rid() const {return _dequeue_rid; }
+        inline void set_dequeue_rid(const uint64_t rid) { _dequeue_rid = rid; }
+        inline bool external_rid() const { return _external_rid; }
+        inline void set_external_rid(const bool external_rid) { _external_rid = external_rid; }
+
+        inline bool has_xid() const { return !_xid.empty(); }
+        inline const std::string& xid() const { return _xid; }
+        inline void clear_xid() { _xid.clear(); }
+        inline void set_xid(const std::string& xid) { _xid.assign(xid); }
+        inline void set_xid(const void* xidp, const std::size_t xid_len)
+                { _xid.assign((const char*)xidp, xid_len); }
+
+        void reset();
+
+        // debug aid
+        std::string status_str() const;
+    };
+
+} // namespace qls_jrnl
+} // namespace jrnl
+
+#endif // ifndef QPID_LEGACYSTORE_JRNL_DATA_TOK_H



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org