You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kp...@apache.org on 2013/11/14 21:39:34 UTC

svn commit: r1542066 [2/3] - in /qpid/trunk/qpid/cpp/src/qpid: legacystore/ linearstore/ linearstore/jrnl/ linearstore/jrnl/utils/

Modified: qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/RecoveryManager.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/RecoveryManager.cpp?rev=1542066&r1=1542065&r2=1542066&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/RecoveryManager.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/RecoveryManager.cpp Thu Nov 14 20:39:32 2013
@@ -44,8 +44,8 @@
 #include <vector>
 
 namespace qpid {
-namespace qls_jrnl
-{
+namespace linearstore {
+namespace journal {
 
 RecoveryManager::RecoveryManager(const std::string& journalDirectory,
                                  const std::string& queuename,
@@ -63,6 +63,7 @@ RecoveryManager::RecoveryManager(const s
                                                  highestRecordId_(0ULL),
                                                  highestFileNumber_(0ULL),
                                                  lastFileFullFlag_(false),
+                                                 currentSerial_(0),
                                                  efpFileSize_kib_(0)
 {}
 
@@ -154,9 +155,10 @@ bool RecoveryManager::readNextRemainingR
     }
     enq_map::emap_data_struct_t eds;
     enqueueMapRef_.get_data(*recordIdListConstItr_, eds);
-    uint64_t fileNumber = eds._pfid;
-    currentJournalFileConstItr_ = fileNumberMap_.find(fileNumber);
-    getNextFile(false);
+    if (!inFileStream_.is_open() || currentJournalFileConstItr_->first != eds._pfid) {
+        getFile(eds._pfid, false);
+    }
+//std::cout << " " << eds._pfid << std::hex << ",0x" << eds._file_posn << std::flush; // DEBUG
 
     inFileStream_.seekg(eds._file_posn, std::ifstream::beg);
     if (!inFileStream_.good()) {
@@ -174,7 +176,10 @@ bool RecoveryManager::readNextRemainingR
     // check flags
     transient = ::is_enq_transient(&enqueueHeader);
     external = ::is_enq_external(&enqueueHeader);
-
+//char magicBuff[5]; // DEBUG
+//::memcpy(magicBuff, &enqueueHeader, 4); // DEBUG
+//magicBuff[4] = 0; // DEBUG
+//std::cout << std::hex << ":" << (char*)magicBuff << ",rid=0x" << enqueueHeader._rhdr._rid << ",xs=0x" << enqueueHeader._xidsize << ",ds=0x" << enqueueHeader._dsize << std::dec << std::flush; // DEBUG
     // read xid
     xidSize = enqueueHeader._xidsize;
     *xidPtrPtr = ::malloc(xidSize);
@@ -217,57 +222,75 @@ void RecoveryManager::setLinearFileContr
     }
 }
 
-std::string RecoveryManager::toString(const std::string& jid,
-                                      bool compact) {
+std::string RecoveryManager::toString(const std::string& jid) {
     std::ostringstream oss;
-    if (compact) {
-        oss << "Recovery journal analysis (jid=\"" << jid << "\"):";
-        oss << " jfl=[";
-        for (fileNumberMapConstItr_t i=fileNumberMap_.begin(); i!=fileNumberMap_.end(); ++i) {
-            if (i!=fileNumberMap_.begin()) {
-                oss << " ";
-            }
-            std::string fqFileName = i->second->getFqFileName();
-            oss << i->first << ":" << fqFileName.substr(fqFileName.rfind('/')+1);
-        }
-        oss << "] ecl=[ ";
-        for (fileNumberMapConstItr_t j=fileNumberMap_.begin(); j!=fileNumberMap_.end(); ++j) {
-            if (j!=fileNumberMap_.begin()) {
-                oss << " ";
-            }
-            oss << j->second->getEnqueuedRecordCount();
-        }
-        oss << " ] empty=" << (journalEmptyFlag_ ? "T" : "F");
-        oss << " fro=0x" << std::hex << firstRecordOffset_ << std::dec << " (" << (firstRecordOffset_/QLS_DBLK_SIZE_BYTES) << " dblks)";
-        oss << " eo=0x" << std::hex << endOffset_ << std::dec << " ("  << (endOffset_/QLS_DBLK_SIZE_BYTES) << " dblks)";
-        oss << " hrid=0x" << std::hex << highestRecordId_ << std::dec;
-        oss << " hfnum=0x" << std::hex << highestFileNumber_ << std::dec;
-        oss << " lffull=" << (lastFileFullFlag_ ? "T" : "F");
+    oss << "Recovery journal analysis (jid=\"" << jid << "\"):" << std::endl;
+    oss << "  Number of journal files = " << fileNumberMap_.size() << std::endl;
+    oss << "  Journal File List:" << std::endl;
+    for (fileNumberMapConstItr_t k=fileNumberMap_.begin(); k!=fileNumberMap_.end(); ++k) {
+        std::string fqFileName = k->second->getFqFileName();
+        oss << "    " << k->first << ": " << fqFileName.substr(fqFileName.rfind('/')+1) << std::endl;
+    }
+    oss << "  Enqueue Counts: [ ";
+    for (fileNumberMapConstItr_t l=fileNumberMap_.begin(); l!=fileNumberMap_.end(); ++l) {
+        if (l != fileNumberMap_.begin()) {
+            oss << ", ";
+        }
+        oss << l->second->getEnqueuedRecordCount();
+    }
+    oss << " ]" << std::endl;
+    oss << "  Journal empty = " << (journalEmptyFlag_ ? "TRUE" : "FALSE") << std::endl;
+    oss << "  First record offset in first file = 0x" << std::hex << firstRecordOffset_ <<
+            std::dec << " (" << (firstRecordOffset_/QLS_DBLK_SIZE_BYTES) << " dblks)" << std::endl;
+    oss << "  End offset = 0x" << std::hex << endOffset_ << std::dec << " ("  <<
+            (endOffset_/QLS_DBLK_SIZE_BYTES) << " dblks)" << std::endl;
+    oss << "  Highest rid = 0x" << std::hex << highestRecordId_ << std::dec << std::endl;
+    oss << "  Highest file number = 0x" << std::hex << highestFileNumber_ << std::dec << std::endl;
+    oss << "  Last file full = " << (lastFileFullFlag_ ? "TRUE" : "FALSE") << std::endl;
+    oss << "  Enqueued records (txn & non-txn):" << std::endl;
+    return oss.str();
+}
+
+std::string RecoveryManager::toLog(const std::string& jid, const int indent) {
+    std::string indentStr(indent, ' ');
+    std::ostringstream oss;
+    oss << std::endl << indentStr  << "Journal recovery analysis (jid=\"" << jid << "\"):" << std::endl;
+    if (journalEmptyFlag_) {
+        oss << indentStr << "<Journal empty, no journal files found>" << std::endl;
     } else {
-        oss << "Recovery journal analysis (jid=\"" << jid << "\"):" << std::endl;
-        oss << "  Number of journal files = " << fileNumberMap_.size() << std::endl;
-        oss << "  Journal File List:" << std::endl;
+        oss << indentStr << std::setw(7) << "file_id"
+                         << std::setw(43) << "file_name"
+                         << std::setw(16) << "fro"
+                         << std::setw(12) << "record_cnt"
+                         << std::setw(5) << "ptn"
+                         << std::setw(10) << "efp"
+                         << std::endl;
+        oss << indentStr << std::setw(7) << "-------"
+                         << std::setw(43) << "-----------------------------------------"
+                         << std::setw(16) << "--------------"
+                         << std::setw(12) << "----------"
+                         << std::setw(5) << "---"
+                         << std::setw(10) << "--------"
+                         << std::endl;
         for (fileNumberMapConstItr_t k=fileNumberMap_.begin(); k!=fileNumberMap_.end(); ++k) {
             std::string fqFileName = k->second->getFqFileName();
-            oss << "    " << k->first << ": " << fqFileName.substr(fqFileName.rfind('/')+1) << std::endl;
+            std::ostringstream fro;
+            fro << std::hex << "0x" << k->second->getFirstRecordOffset();
+            oss << indentStr << std::setw(7) << k->first
+                             << std::setw(43) << fqFileName.substr(fqFileName.rfind('/')+1)
+                             << std::setw(16) << fro.str()
+                             << std::setw(12) << k->second->getEnqueuedRecordCount()
+                             << std::setw(5) << k->second->getEfpIdentity().pn_
+                             << std::setw(9) << k->second->getEfpIdentity().ds_ << "k"
+                             << std::endl;
         }
-        oss << "  Enqueue Counts: [ " << std::endl;
-        for (fileNumberMapConstItr_t l=fileNumberMap_.begin(); l!=fileNumberMap_.end(); ++l) {
-            if (l != fileNumberMap_.begin()) {
-                oss << ", ";
-            }
-            oss << l->second->getEnqueuedRecordCount();
-        }
-        oss << " ]" << std::endl;
-        oss << "  Journal empty = " << (journalEmptyFlag_ ? "TRUE" : "FALSE") << std::endl;
-        oss << "  First record offset in first file = 0x" << std::hex << firstRecordOffset_ <<
+        oss << indentStr << "First record offset in first file = 0x" << std::hex << firstRecordOffset_ <<
                 std::dec << " (" << (firstRecordOffset_/QLS_DBLK_SIZE_BYTES) << " dblks)" << std::endl;
-        oss << "  End offset = 0x" << std::hex << endOffset_ << std::dec << " ("  <<
+        oss << indentStr << "End offset in last file = 0x" << std::hex << endOffset_ << std::dec << " ("  <<
                 (endOffset_/QLS_DBLK_SIZE_BYTES) << " dblks)" << std::endl;
-        oss << "  Highest rid = 0x" << std::hex << highestRecordId_ << std::dec << std::endl;
-        oss << "  Highest file number = 0x" << std::hex << highestFileNumber_ << std::dec << std::endl;
-        oss << "  Last file full = " << (lastFileFullFlag_ ? "TRUE" : "FALSE") << std::endl;
-        oss << "  Enqueued records (txn & non-txn):" << std::endl;
+        oss << indentStr << "Highest rid found = 0x" << std::hex << highestRecordId_ << std::dec << std::endl;
+        oss << indentStr << "Last file full = " << (lastFileFullFlag_ ? "TRUE" : "FALSE") << std::endl;
+        oss << indentStr << "Enqueued records (txn & non-txn):";
     }
     return oss.str();
 }
@@ -355,9 +378,9 @@ void RecoveryManager::checkJournalAlignm
 }
 
 bool RecoveryManager::decodeRecord(jrec& record,
-                              std::size_t& cumulativeSizeRead,
-                              ::rec_hdr_t& headerRecord,
-                              std::streampos& fileOffset)
+                                   std::size_t& cumulativeSizeRead,
+                                   ::rec_hdr_t& headerRecord,
+                                    std::streampos& fileOffset)
 {
     std::streampos start_file_offs = fileOffset;
 
@@ -370,15 +393,18 @@ bool RecoveryManager::decodeRecord(jrec&
     bool done = false;
     while (!done) {
         try {
-            done = record.rcv_decode(headerRecord, &inFileStream_, cumulativeSizeRead);
+            done = record.decode(headerRecord, &inFileStream_, cumulativeSizeRead);
         }
         catch (const jexception& e) {
+            journalLogRef_.log(JournalLog::LOG_INFO, queueName_, e.what());
             checkJournalAlignment(start_file_offs);
             return false;
         }
-        if (!done && !getNextFile(false)) {
-            checkJournalAlignment(start_file_offs);
-            return false;
+        if (!done && needNextFile()) {
+            if (!getNextFile(false)) {
+                checkJournalAlignment(start_file_offs);
+                return false;
+            }
         }
     }
     return true;
@@ -392,45 +418,50 @@ uint64_t RecoveryManager::getCurrentFile
     return currentJournalFileConstItr_->first;
 }
 
-bool RecoveryManager::getNextFile(bool jumpToFirstRecordOffsetFlag) {
+bool RecoveryManager::getFile(const uint64_t fileNumber, bool jumpToFirstRecordOffsetFlag) {
     if (inFileStream_.is_open()) {
-        if (inFileStream_.eof() || !inFileStream_.good()) {
-            inFileStream_.clear();
-            endOffset_ = inFileStream_.tellg(); // remember file offset before closing
-            if (endOffset_ == -1) {
-                std::ostringstream oss;
-                oss << "tellg() failure: fail=" << (inFileStream_.fail()?"T":"F") << " bad=" << (inFileStream_.bad()?"T":"F");
-                throw jexception(jerrno::JERR_RCVM_STREAMBAD, oss.str(), "RecoveryManager", "getNextFile");
-            }
-            inFileStream_.close();
-            if (++currentJournalFileConstItr_ == fileNumberMap_.end()) {
-                return false;
-            }
-        }
-    }
-    if (!inFileStream_.is_open())  {
+        inFileStream_.close();
+//std::cout << " f=" << getCurrentFileName() << "]" << std::flush; // DEBUG
         inFileStream_.clear(); // clear eof flag, req'd for older versions of c++
-        inFileStream_.open(getCurrentFileName().c_str(), std::ios_base::in | std::ios_base::binary);
-        if (!inFileStream_.good()) {
-            throw jexception(jerrno::JERR__FILEIO, getCurrentFileName(), "RecoveryManager", "getNextFile");
-        }
+    }
+    currentJournalFileConstItr_ = fileNumberMap_.find(fileNumber);
+    if (currentJournalFileConstItr_ == fileNumberMap_.end()) {
+        return false;
+    }
+    inFileStream_.open(getCurrentFileName().c_str(), std::ios_base::in | std::ios_base::binary);
+    if (!inFileStream_.good()) {
+        throw jexception(jerrno::JERR__FILEIO, getCurrentFileName(), "RecoveryManager", "getFile");
+    }
+//std::cout << " [F=" << getCurrentFileName() << std::flush; // DEBUG
 
-        // Read file header
-        file_hdr_t fhdr;
-        inFileStream_.read((char*)&fhdr, sizeof(fhdr));
-        checkFileStreamOk(true);
-        if (fhdr._rhdr._magic == QLS_FILE_MAGIC) {
-            firstRecordOffset_ = fhdr._fro;
-            std::streamoff foffs = jumpToFirstRecordOffsetFlag ? firstRecordOffset_ : QLS_SBLK_SIZE_BYTES;
-            inFileStream_.seekg(foffs);
-        } else {
-            inFileStream_.close();
-            if (currentJournalFileConstItr_ == fileNumberMap_.begin()) {
-                journalEmptyFlag_ = true;
-            }
+    if (!readFileHeader()) {
+        return false;
+    }
+    std::streamoff foffs = jumpToFirstRecordOffsetFlag ? firstRecordOffset_ : QLS_JRNL_FHDR_RES_SIZE_SBLKS * QLS_SBLK_SIZE_BYTES;
+    inFileStream_.seekg(foffs);
+    return true;
+}
+
+bool RecoveryManager::getNextFile(bool jumpToFirstRecordOffsetFlag) {
+    if (inFileStream_.is_open()) {
+        inFileStream_.close();
+//std::cout << " .f=" << getCurrentFileName() << "]" << std::flush; // DEBUG
+        if (++currentJournalFileConstItr_ == fileNumberMap_.end()) {
             return false;
         }
+        inFileStream_.clear(); // clear eof flag, req'd for older versions of c++
+    }
+    inFileStream_.open(getCurrentFileName().c_str(), std::ios_base::in | std::ios_base::binary);
+    if (!inFileStream_.good()) {
+        throw jexception(jerrno::JERR__FILEIO, getCurrentFileName(), "RecoveryManager", "getNextFile");
     }
+//std::cout << " [.F=" << getCurrentFileName() << std::flush; // DEBUG
+
+    if (!readFileHeader()) {
+        return false;
+    }
+    std::streamoff foffs = jumpToFirstRecordOffsetFlag ? firstRecordOffset_ : QLS_JRNL_FHDR_RES_SIZE_SBLKS * QLS_SBLK_SIZE_BYTES;
+    inFileStream_.seekg(foffs);
     return true;
 }
 
@@ -443,7 +474,7 @@ bool RecoveryManager::getNextRecordHeade
     bool hdr_ok = false;
     std::streampos file_pos;
     while (!hdr_ok) {
-        if (!inFileStream_.is_open()) {
+        if (needNextFile()) {
             if (!getNextFile(true)) {
                 return false;
             }
@@ -458,8 +489,10 @@ bool RecoveryManager::getNextRecordHeade
         if (inFileStream_.gcount() == sizeof(rec_hdr_t)) {
             hdr_ok = true;
         } else {
-            if (!getNextFile(true)) {
-                return false;
+            if (needNextFile()) {
+                if (!getNextFile(true)) {
+                    return false;
+                }
             }
         }
     }
@@ -468,6 +501,10 @@ bool RecoveryManager::getNextRecordHeade
         case QLS_ENQ_MAGIC:
             {
 //std::cout << " 0x" << std::hex << file_pos << ".e.0x" << h._rid << std::dec << std::flush; // DEBUG
+                if (::rec_hdr_check(&h, QLS_ENQ_MAGIC, QLS_JRNL_VERSION, currentSerial_) != 0) {
+                    endOffset_ = file_pos;
+                    return false;
+                }
                 enq_rec er;
                 uint64_t start_fid = getCurrentFileNumber(); // fid may increment in decode() if record folds over file boundary
                 if (!decodeRecord(er, cum_size_read, h, file_pos)) {
@@ -502,6 +539,10 @@ bool RecoveryManager::getNextRecordHeade
         case QLS_DEQ_MAGIC:
             {
 //std::cout << " 0x" << std::hex << file_pos << ".d.0x" << h._rid << std::dec << std::flush; // DEBUG
+                if (::rec_hdr_check(&h, QLS_DEQ_MAGIC, QLS_JRNL_VERSION, currentSerial_) != 0) {
+                    endOffset_ = file_pos;
+                    return false;
+                }
                 deq_rec dr;
                 uint16_t start_fid = getCurrentFileNumber(); // fid may increment in decode() if record folds over file boundary
                 if (!decodeRecord(dr, cum_size_read, h, file_pos)) {
@@ -534,6 +575,10 @@ bool RecoveryManager::getNextRecordHeade
         case QLS_TXA_MAGIC:
             {
 //std::cout << " 0x" << std::hex << file_pos << ".a.0x" << h._rid << std::dec << std::flush; // DEBUG
+                if (::rec_hdr_check(&h, QLS_TXA_MAGIC, QLS_JRNL_VERSION, currentSerial_) != 0) {
+                    endOffset_ = file_pos;
+                    return false;
+                }
                 txn_rec ar;
                 if (!decodeRecord(ar, cum_size_read, h, file_pos)) {
                     return false;
@@ -558,6 +603,10 @@ bool RecoveryManager::getNextRecordHeade
         case QLS_TXC_MAGIC:
             {
 //std::cout << " 0x" << std::hex << file_pos << ".c.0x" << h._rid << std::dec << std::flush; // DEBUG
+                if (::rec_hdr_check(&h, QLS_TXC_MAGIC, QLS_JRNL_VERSION, currentSerial_) != 0) {
+                    endOffset_ = file_pos;
+                    return false;
+                }
                 txn_rec cr;
                 if (!decodeRecord(cr, cum_size_read, h, file_pos)) {
                     return false;
@@ -593,8 +642,10 @@ bool RecoveryManager::getNextRecordHeade
                 uint32_t rec_dblks = jrec::size_dblks(sizeof(::rec_hdr_t));
                 inFileStream_.ignore(rec_dblks * QLS_DBLK_SIZE_BYTES - sizeof(::rec_hdr_t));
                 checkFileStreamOk(false);
-                if (!getNextFile(false)) {
-                    return false;
+                if (needNextFile()) {
+                    if (!getNextFile(false)) {
+                        return false;
+                    }
                 }
             }
             break;
@@ -611,6 +662,13 @@ bool RecoveryManager::getNextRecordHeade
     return true;
 }
 
+bool RecoveryManager::needNextFile() {
+    if (inFileStream_.is_open()) {
+        return inFileStream_.eof() || inFileStream_.tellg() >= std::streampos(efpFileSize_kib_ * 1024);
+    }
+    return true;
+}
+
 void RecoveryManager::readJournalData(char* target,
                                       const std::streamsize readSize) {
     std::streamoff bytesRead = 0;
@@ -624,7 +682,9 @@ void RecoveryManager::readJournalData(ch
         inFileStream_.read(target + bytesRead, readSize - bytesRead);
         std::streamoff thisReadSize = inFileStream_.gcount();
         if (thisReadSize < readSize) {
-            getNextFile(false);
+            if (needNextFile()) {
+                getNextFile(false);
+            }
             file_pos = inFileStream_.tellg();
             if (file_pos == std::streampos(-1)) {
                 std::ostringstream oss;
@@ -636,6 +696,23 @@ void RecoveryManager::readJournalData(ch
     }
 }
 
+bool RecoveryManager::readFileHeader() {
+    file_hdr_t fhdr;
+    inFileStream_.read((char*)&fhdr, sizeof(fhdr));
+    checkFileStreamOk(true);
+    if (::file_hdr_check(&fhdr, QLS_FILE_MAGIC, QLS_JRNL_VERSION, efpFileSize_kib_) != 0) {
+        firstRecordOffset_ = fhdr._fro;
+        currentSerial_ = fhdr._rhdr._serial;
+    } else {
+        inFileStream_.close();
+        if (currentJournalFileConstItr_ == fileNumberMap_.begin()) {
+            journalEmptyFlag_ = true;
+        }
+        return false;
+    }
+    return true;
+}
+
 // static private
 void RecoveryManager::readJournalFileHeader(const std::string& journalFileName,
                                             ::file_hdr_t& fileHeaderRef,
@@ -670,4 +747,4 @@ void RecoveryManager::removeEmptyFiles(E
     }
 }
 
-}} // namespace qpid::qls_jrnl
+}}}

Modified: qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/RecoveryManager.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/RecoveryManager.h?rev=1542066&r1=1542065&r2=1542066&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/RecoveryManager.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/RecoveryManager.h Thu Nov 14 20:39:32 2013
@@ -19,8 +19,8 @@
  *
  */
 
-#ifndef QPID_LINEARSTORE_RECOVERYSTATE_H_
-#define QPID_LINEARSTORE_RECOVERYSTATE_H_
+#ifndef QPID_LINEARSTORE_JOURNAL_RECOVERYSTATE_H_
+#define QPID_LINEARSTORE_JOURNAL_RECOVERYSTATE_H_
 
 #include <deque>
 #include <fstream>
@@ -33,7 +33,8 @@ struct file_hdr_t;
 struct rec_hdr_t;
 
 namespace qpid {
-namespace qls_jrnl {
+namespace linearstore {
+namespace journal {
 
 class data_tok;
 class enq_map;
@@ -72,6 +73,7 @@ protected:
     bool lastFileFullFlag_;                     ///< Last file is full
 
     // State for recovery of individual enqueued records
+    uint64_t currentSerial_;
     uint32_t efpFileSize_kib_;
     fileNumberMapConstItr_t currentJournalFileConstItr_;
     std::string currentFileName_;
@@ -104,8 +106,8 @@ public:
                                  bool ignore_pending_txns);
     void setLinearFileControllerJournals(lfcAddJournalFileFn fnPtr,
                                          LinearFileController* lfcPtr);
-    std::string toString(const std::string& jid,
-                         bool compact = true);
+    std::string toString(const std::string& jid);
+    std::string toLog(const std::string& jid, const int indent);
 protected:
     void analyzeJournalFileHeaders(efpIdentity_t& efpIdentity);
     void checkFileStreamOk(bool checkEof);
@@ -116,8 +118,11 @@ protected:
                       std::streampos& fileOffset);
     std::string getCurrentFileName() const;
     uint64_t getCurrentFileNumber() const;
+    bool getFile(const uint64_t fileNumber, bool jumpToFirstRecordOffsetFlag);
     bool getNextFile(bool jumpToFirstRecordOffsetFlag);
     bool getNextRecordHeader();
+    bool needNextFile();
+    bool readFileHeader();
     void readJournalData(char* target, const std::streamsize size);
     void removeEmptyFiles(EmptyFilePool* emptyFilePoolPtr);
 
@@ -126,6 +131,6 @@ protected:
                                       std::string& queueName);
 };
 
-}} // namespace qpid::qls_jrnl
+}}}
 
-#endif // QPID_LINEARSTORE_RECOVERYSTATE_H_
+#endif // QPID_LINEARSTORE_JOURNAL_RECOVERYSTATE_H_

Modified: qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/aio.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/aio.h?rev=1542066&r1=1542065&r2=1542066&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/aio.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/aio.h Thu Nov 14 20:39:32 2013
@@ -19,17 +19,16 @@
  *
  */
 
-#ifndef QPID_LEGACYSTORE_JRNL_AIO_H
-#define QPID_LEGACYSTORE_JRNL_AIO_H
+#ifndef QPID_LINEARSTORE_JOURNAL_AIO_H
+#define QPID_LINEARSTORE_JOURNAL_AIO_H
 
 #include <libaio.h>
 #include <cstring>
 #include <string.h>
 
-namespace qpid
-{
-namespace qls_jrnl
-{
+namespace qpid {
+namespace linearstore {
+namespace journal {
 
 typedef iocb aio_cb;
 typedef io_event aio_event;
@@ -135,6 +134,6 @@ public:
     }
 };
 
-}}
+}}}
 
-#endif // ifndef QPID_LEGACYSTORE_JRNL_AIO_H
+#endif // ifndef QPID_LINEARSTORE_JOURNAL_AIO_H

Modified: qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/aio_callback.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/aio_callback.h?rev=1542066&r1=1542065&r2=1542066&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/aio_callback.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/aio_callback.h Thu Nov 14 20:39:32 2013
@@ -19,27 +19,26 @@
  *
  */
 
-#ifndef QPID_LEGACYSTORE_JRNL_AIO_CALLBACK_H
-#define QPID_LEGACYSTORE_JRNL_AIO_CALLBACK_H
+#ifndef QPID_LINEARSTORE_JOURNAL_AIO_CALLBACK_H
+#define QPID_LINEARSTORE_JOURNAL_AIO_CALLBACK_H
 
 #include <stdint.h>
 #include <vector>
 
-namespace qpid
-{
-namespace qls_jrnl
-{
+namespace qpid {
+namespace linearstore {
+namespace journal {
 
-    class data_tok;
+class data_tok;
 
-    class aio_callback
-    {
-    public:
-        virtual ~aio_callback() {}
-        virtual void wr_aio_cb(std::vector<data_tok*>& dtokl) = 0;
-        virtual void rd_aio_cb(std::vector<uint16_t>& pil) = 0;
-    };
+class aio_callback
+{
+public:
+    virtual ~aio_callback() {}
+    virtual void wr_aio_cb(std::vector<data_tok*>& dtokl) = 0;
+    virtual void rd_aio_cb(std::vector<uint16_t>& pil) = 0;
+};
 
-}}
+}}}
 
-#endif // ifndef QPID_LEGACYSTORE_JRNL_AIO_CALLBACK_H
+#endif // ifndef QPID_LINEARSTORE_JOURNAL_AIO_CALLBACK_H

Modified: qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/cvar.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/cvar.h?rev=1542066&r1=1542065&r2=1542066&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/cvar.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/cvar.h Thu Nov 14 20:39:32 2013
@@ -19,8 +19,8 @@
  *
  */
 
-#ifndef QPID_LEGACYSTORE_JRNL_CVAR_H
-#define QPID_LEGACYSTORE_JRNL_CVAR_H
+#ifndef QPID_LINEARSTORE_JOURNAL_CVAR_H
+#define QPID_LINEARSTORE_JOURNAL_CVAR_H
 
 #include <cstring>
 #include "qpid/linearstore/jrnl/jerrno.h"
@@ -30,10 +30,9 @@
 #include <pthread.h>
 #include <sstream>
 
-namespace qpid
-{
-namespace qls_jrnl
-{
+namespace qpid {
+namespace linearstore {
+namespace journal {
 
     // Ultra-simple thread condition variable class
     class cvar
@@ -71,6 +70,6 @@ namespace qls_jrnl
         }
     };
 
-}}
+}}}
 
-#endif // ifndef QPID_LEGACYSTORE_JRNL_CVAR_H
+#endif // ifndef QPID_LINEARSTORE_JOURNAL_CVAR_H

Modified: qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/data_tok.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/data_tok.cpp?rev=1542066&r1=1542065&r2=1542066&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/data_tok.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/data_tok.cpp Thu Nov 14 20:39:32 2013
@@ -27,10 +27,9 @@
 #include "qpid/linearstore/jrnl/slock.h"
 #include <sstream>
 
-namespace qpid
-{
-namespace qls_jrnl
-{
+namespace qpid {
+namespace linearstore {
+namespace journal {
 
 // Static members
 
@@ -39,10 +38,8 @@ smutex data_tok::_mutex;
 
 data_tok::data_tok():
     _wstate(NONE),
-//    _rstate(UNREAD),
     _dsize(0),
     _dblks_written(0),
-//    _dblks_read(0),
     _pg_cnt(0),
     _fid(0),
     _rid(0),
@@ -106,58 +103,12 @@ data_tok::wstate_str(write_state wstate)
     return "<wstate unknown>";
 }
 
-/*
-const char*
-data_tok::rstate_str() const
-{
-    return rstate_str(_rstate);
-}
-*/
-
-/*
-const char*
-data_tok::rstate_str(read_state rstate)
-{
-    switch (rstate)
-    {
-        case NONE:
-            return "NONE";
-        case READ_PART:
-            return "READ_PART";
-        case SKIP_PART:
-            return "SKIP_PART";
-        case READ:
-            return "READ";
-    // Not using default: forces compiler to ensure all cases are covered.
-    }
-    return "<rstate unknown>";
-}
-*/
-
-/*
-void
-data_tok::set_rstate(const read_state rstate)
-{
-    if (_wstate != ENQ && rstate != UNREAD)
-    {
-        std::ostringstream oss;
-        oss << "Attempted to change read state to " << rstate_str(rstate);
-        oss << " while write state is not enqueued (wstate ENQ); wstate=" << wstate_str() << ".";
-        throw jexception(jerrno::JERR_DTOK_ILLEGALSTATE, oss.str(), "data_tok",
-                "set_rstate");
-    }
-    _rstate = rstate;
-}
-*/
-
 void
 data_tok::reset()
 {
     _wstate = NONE;
-//    _rstate = UNREAD;
     _dsize = 0;
     _dblks_written = 0;
-//    _dblks_read = 0;
     _pg_cnt = 0;
     _fid = 0;
     _rid = 0;
@@ -185,4 +136,4 @@ data_tok::status_str() const
     return oss.str();
 }
 
-}}
+}}}

Modified: qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/data_tok.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/data_tok.h?rev=1542066&r1=1542065&r2=1542066&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/data_tok.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/data_tok.h Thu Nov 14 20:39:32 2013
@@ -19,15 +19,14 @@
  *
  */
 
-#ifndef QPID_LEGACYSTORE_JRNL_DATA_TOK_H
-#define QPID_LEGACYSTORE_JRNL_DATA_TOK_H
+#ifndef QPID_LINEARSTORE_JOURNAL_DATA_TOK_H
+#define QPID_LINEARSTORE_JOURNAL_DATA_TOK_H
 
-namespace qpid
-{
-namespace qls_jrnl
-{
+namespace qpid {
+namespace linearstore {
+namespace journal {
 class data_tok;
-}}
+}}}
 
 #include <cassert>
 #include <cstddef>
@@ -35,10 +34,9 @@ class data_tok;
 #include <pthread.h>
 #include <string>
 
-namespace qpid
-{
-namespace qls_jrnl
-{
+namespace qpid {
+namespace linearstore {
+namespace journal {
 
     /**
     * \class data_tok
@@ -72,25 +70,13 @@ namespace qls_jrnl
             COMMITTED
         };
 
-/*
-        enum read_state
-        {
-            UNREAD,     ///< Data block not read
-            READ_PART,  ///< Data block is part-read; waiting for page buffer to fill
-            SKIP_PART,  ///< Prev. dequeued dblock is part-skipped; waiting for page buffer to fill
-            READ        ///< Data block is fully read
-        };
-*/
-
     protected:
         static smutex _mutex;
         static uint64_t _cnt;
         uint64_t    _icnt;
         write_state _wstate;        ///< Enqueued / dequeued state of data
-//        read_state  _rstate;        ///< Read state of data
         std::size_t _dsize;         ///< Data size in bytes
         uint32_t    _dblks_written; ///< Data blocks read/written
-//        uint32_t    _dblks_read;    ///< Data blocks read/written
         uint32_t    _pg_cnt;        ///< Page counter - incr for each page containing part of data
         uint64_t    _fid;           ///< FID containing header of enqueue record
         uint64_t    _rid;           ///< RID of data set by enqueue operation
@@ -106,16 +92,11 @@ namespace qls_jrnl
         inline write_state wstate() const { return _wstate; }
         const char* wstate_str() const;
         static const char* wstate_str(write_state wstate);
-//        inline read_state rstate() const { return _rstate; }
-//        const char* rstate_str() const;
-//        static const char* rstate_str(read_state rstate);
         inline bool is_writable() const { return _wstate == NONE || _wstate == ENQ_PART; }
         inline bool is_enqueued() const { return _wstate == ENQ; }
         inline bool is_readable() const { return _wstate == ENQ; }
-//        inline bool is_read() const { return _rstate == READ; }
         inline bool is_dequeueable() const { return _wstate == ENQ || _wstate == DEQ_PART; }
         inline void set_wstate(const write_state wstate) { _wstate = wstate; }
-//        void set_rstate(const read_state rstate);
         inline std::size_t dsize() const { return _dsize; }
         inline void set_dsize(std::size_t dsize) { _dsize = dsize; }
 
@@ -124,10 +105,6 @@ namespace qls_jrnl
                 { _dblks_written += dblks_written; }
         inline void set_dblocks_written(uint32_t dblks_written) { _dblks_written = dblks_written; }
 
-//        inline uint32_t dblocks_read() const { return _dblks_read; }
-//        inline void incr_dblocks_read(uint32_t dblks_read) { _dblks_read += dblks_read; }
-//        inline void set_dblocks_read(uint32_t dblks_read) { _dblks_read = dblks_read; }
-
         inline uint32_t pg_cnt() const { return _pg_cnt; }
         inline uint32_t incr_pg_cnt() { return ++_pg_cnt; }
         inline uint32_t decr_pg_cnt() { assert(_pg_cnt != 0); return --_pg_cnt; }
@@ -154,7 +131,6 @@ namespace qls_jrnl
         std::string status_str() const;
     };
 
-} // namespace qls_jrnl
-} // namespace jrnl
+}}}
 
-#endif // ifndef QPID_LEGACYSTORE_JRNL_DATA_TOK_H
+#endif // ifndef QPID_LINEARSTORE_JOURNAL_DATA_TOK_H

Modified: qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/deq_rec.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/deq_rec.cpp?rev=1542066&r1=1542065&r2=1542066&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/deq_rec.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/deq_rec.cpp Thu Nov 14 20:39:32 2013
@@ -19,87 +19,43 @@
  *
  */
 
-/**
- * \file deq_rec.cpp
- *
- * Qpid asynchronous store plugin library
- *
- * This file contains the code for the mrg::journal::deq_rec (journal dequeue
- * record) class. See comments in file deq_rec.h for details.
- *
- * \author Kim van der Riet
- */
-
 #include "qpid/linearstore/jrnl/deq_rec.h"
-#include "qpid/linearstore/jrnl/utils/deq_hdr.h"
-#include "qpid/linearstore/jrnl/utils/rec_tail.h"
 
 #include <cassert>
-#include <cerrno>
-#include <cstdlib>
 #include <cstring>
 #include <iomanip>
-#include "qpid/linearstore/jrnl/jerrno.h"
 #include "qpid/linearstore/jrnl/jexception.h"
-#include <sstream>
 
-namespace qpid
-{
-namespace qls_jrnl
-{
+namespace qpid {
+namespace linearstore {
+namespace journal {
 
 deq_rec::deq_rec():
-//        _deq_hdr(QLS_DEQ_MAGIC, QLS_JRNL_VERSION, 0, 0, 0, false),
         _xidp(0),
         _buff(0)
-//        _deq_tail(_deq_hdr)
 {
-    ::deq_hdr_init(&_deq_hdr, QLS_DEQ_MAGIC, QLS_JRNL_VERSION, 0, 0, 0, 0);
+    ::deq_hdr_init(&_deq_hdr, QLS_DEQ_MAGIC, QLS_JRNL_VERSION, 0, 0, 0, 0, 0);
     ::rec_tail_copy(&_deq_tail, &_deq_hdr._rhdr, 0);
 }
 
-deq_rec::deq_rec(const uint64_t rid, const uint64_t drid, const void* const xidp,
-        const std::size_t xidlen, const bool txn_coml_commit):
-//        _deq_hdr(QLS_DEQ_MAGIC, QLS_JRNL_VERSION, rid, drid, xidlen, owi, txn_coml_commit),
-        _xidp(xidp),
-        _buff(0)
-//        _deq_tail(_deq_hdr)
-{
-    ::deq_hdr_init(&_deq_hdr, QLS_DEQ_MAGIC, QLS_JRNL_VERSION, 0, rid, drid, xidlen);
-    ::rec_tail_copy(&_deq_tail, &_deq_hdr._rhdr, 0);
-	::set_txn_coml_commit(&_deq_hdr, txn_coml_commit);
-}
-
 deq_rec::~deq_rec()
 {
     clean();
 }
 
 void
-deq_rec::reset()
-{
-    _deq_hdr._rhdr._rid = 0;
-//    _deq_hdr.set_owi(false);
-    ::set_txn_coml_commit(&_deq_hdr, false);
-    _deq_hdr._deq_rid = 0;
-    _deq_hdr._xidsize = 0;
-    _deq_tail._checksum = 0;
-    _deq_tail._rid = 0;
-    _xidp = 0;
-    _buff = 0;
-}
-
-void
-deq_rec::reset(const  uint64_t rid, const  uint64_t drid, const void* const xidp,
-        const std::size_t xidlen, const bool txn_coml_commit)
+deq_rec::reset(const uint64_t serial, const uint64_t rid, const  uint64_t drid, const void* const xidp,
+               const std::size_t xidlen, const bool txn_coml_commit)
 {
+    _deq_hdr._rhdr._serial = serial;
     _deq_hdr._rhdr._rid = rid;
     ::set_txn_coml_commit(&_deq_hdr, txn_coml_commit);
     _deq_hdr._deq_rid = drid;
     _deq_hdr._xidsize = xidlen;
-    _deq_tail._rid = rid;
     _xidp = xidp;
     _buff = 0;
+    _deq_tail._serial = serial;
+    _deq_tail._rid = rid;
 }
 
 uint32_t
@@ -214,132 +170,16 @@ deq_rec::encode(void* wptr, uint32_t rec
     return size_dblks(wr_cnt);
 }
 
-uint32_t
-deq_rec::decode(rec_hdr_t& h, void* rptr, uint32_t rec_offs_dblks, uint32_t max_size_dblks)
-{
-    assert(rptr != 0);
-    assert(max_size_dblks > 0);
-
-    std::size_t rd_cnt = 0;
-    if (rec_offs_dblks) // Continuation of record on new page
-    {
-        const uint32_t hdr_xid_dblks = size_dblks(sizeof(deq_hdr_t) + _deq_hdr._xidsize);
-        const uint32_t hdr_xid_tail_dblks = size_dblks(sizeof(deq_hdr_t) + _deq_hdr._xidsize +
-                sizeof(rec_tail_t));
-        const std::size_t rec_offs = rec_offs_dblks * QLS_DBLK_SIZE_BYTES;
-
-        if (hdr_xid_tail_dblks - rec_offs_dblks <= max_size_dblks)
-        {
-            // Remainder of xid fits within this page
-            if (rec_offs - sizeof(deq_hdr_t) < _deq_hdr._xidsize)
-            {
-                // Part of xid still outstanding, copy remainder of xid and tail
-                const std::size_t xid_offs = rec_offs - sizeof(deq_hdr_t);
-                const std::size_t xid_rem = _deq_hdr._xidsize - xid_offs;
-                std::memcpy((char*)_buff + xid_offs, rptr, xid_rem);
-                rd_cnt = xid_rem;
-                std::memcpy((void*)&_deq_tail, ((char*)rptr + rd_cnt), sizeof(_deq_tail));
-                chk_tail();
-                rd_cnt += sizeof(_deq_tail);
-            }
-            else
-            {
-                // Tail or part of tail only outstanding, complete tail
-                const std::size_t tail_offs = rec_offs - sizeof(deq_hdr_t) - _deq_hdr._xidsize;
-                const std::size_t tail_rem = sizeof(rec_tail_t) - tail_offs;
-                std::memcpy((char*)&_deq_tail + tail_offs, rptr, tail_rem);
-                chk_tail();
-                rd_cnt = tail_rem;
-            }
-        }
-        else if (hdr_xid_dblks - rec_offs_dblks <= max_size_dblks)
-        {
-            // Remainder of xid fits within this page, tail split
-            const std::size_t xid_offs = rec_offs - sizeof(deq_hdr_t);
-            const std::size_t xid_rem = _deq_hdr._xidsize - xid_offs;
-            std::memcpy((char*)_buff + xid_offs, rptr, xid_rem);
-            rd_cnt += xid_rem;
-            const std::size_t tail_rem = (max_size_dblks * QLS_DBLK_SIZE_BYTES) - rd_cnt;
-            if (tail_rem)
-            {
-                std::memcpy((void*)&_deq_tail, ((char*)rptr + xid_rem), tail_rem);
-                rd_cnt += tail_rem;
-            }
-        }
-        else
-        {
-            // Remainder of xid split
-            const std::size_t xid_cp_size = (max_size_dblks * QLS_DBLK_SIZE_BYTES);
-            std::memcpy((char*)_buff + rec_offs - sizeof(deq_hdr_t), rptr, xid_cp_size);
-            rd_cnt += xid_cp_size;
-        }
-    }
-    else // Start of record
-    {
-        // Get and check header
-        //_deq_hdr.hdr_copy(h);
-        ::rec_hdr_copy(&_deq_hdr._rhdr, &h);
-        rd_cnt = sizeof(rec_hdr_t);
-        _deq_hdr._deq_rid = *(uint64_t*)((char*)rptr + rd_cnt);
-        rd_cnt += sizeof(uint64_t);
-        _deq_hdr._xidsize = *(std::size_t*)((char*)rptr + rd_cnt);
-        rd_cnt = sizeof(deq_hdr_t);
-        chk_hdr();
-        if (_deq_hdr._xidsize)
-        {
-            _buff = std::malloc(_deq_hdr._xidsize);
-            MALLOC_CHK(_buff, "_buff", "deq_rec", "decode");
-            const uint32_t hdr_xid_dblks = size_dblks(sizeof(deq_hdr_t) + _deq_hdr._xidsize);
-            const uint32_t hdr_xid_tail_dblks = size_dblks(sizeof(deq_hdr_t) +  _deq_hdr._xidsize +
-                    sizeof(rec_tail_t));
-
-            // Check if record (header + xid + tail) fits within this page, we can check the
-            // tail before the expense of copying data to memory
-            if (hdr_xid_tail_dblks <= max_size_dblks)
-            {
-                // Entire header, xid and tail fits within this page
-                std::memcpy(_buff, (char*)rptr + rd_cnt, _deq_hdr._xidsize);
-                rd_cnt += _deq_hdr._xidsize;
-                std::memcpy((void*)&_deq_tail, (char*)rptr + rd_cnt, sizeof(_deq_tail));
-                rd_cnt += sizeof(_deq_tail);
-                chk_tail();
-            }
-            else if (hdr_xid_dblks <= max_size_dblks)
-            {
-                // Entire header and xid fit within this page, tail split
-                std::memcpy(_buff, (char*)rptr + rd_cnt, _deq_hdr._xidsize);
-                rd_cnt += _deq_hdr._xidsize;
-                const std::size_t tail_rem = (max_size_dblks * QLS_DBLK_SIZE_BYTES) - rd_cnt;
-                if (tail_rem)
-                {
-                    std::memcpy((void*)&_deq_tail, (char*)rptr + rd_cnt, tail_rem);
-                    rd_cnt += tail_rem;
-                }
-            }
-            else
-            {
-                // Header fits within this page, xid split
-                const std::size_t xid_cp_size = (max_size_dblks * QLS_DBLK_SIZE_BYTES) - rd_cnt;
-                std::memcpy(_buff, (char*)rptr + rd_cnt, xid_cp_size);
-                rd_cnt += xid_cp_size;
-            }
-        }
-    }
-    return size_dblks(rd_cnt);
-}
-
 bool
-deq_rec::rcv_decode(rec_hdr_t h, std::ifstream* ifsp, std::size_t& rec_offs)
+deq_rec::decode(::rec_hdr_t& h, std::ifstream* ifsp, std::size_t& rec_offs)
 {
+    uint32_t checksum = 0UL; // TODO: Add checksum math
     if (rec_offs == 0)
     {
         //_deq_hdr.hdr_copy(h);
         ::rec_hdr_copy(&_deq_hdr._rhdr, &h);
-        ifsp->read((char*)&_deq_hdr._deq_rid, sizeof(uint64_t));
-        ifsp->read((char*)&_deq_hdr._xidsize, sizeof(std::size_t));
-#if defined(JRNL_32_BIT)
-        ifsp->ignore(sizeof(uint32_t)); // _filler0
-#endif
+        ifsp->read((char*)&_deq_hdr._deq_rid, sizeof(_deq_hdr._deq_rid));
+        ifsp->read((char*)&_deq_hdr._xidsize, sizeof(_deq_hdr._xidsize));
         rec_offs = sizeof(_deq_hdr);
         // Read header, allocate (if req'd) for xid
         if (_deq_hdr._xidsize)
@@ -382,9 +222,21 @@ deq_rec::rcv_decode(rec_hdr_t h, std::if
         }
     }
     ifsp->ignore(rec_size_dblks() * QLS_DBLK_SIZE_BYTES - rec_size());
-    if (_deq_hdr._xidsize)
-        chk_tail(); // Throws if tail invalid or record incomplete
     assert(!ifsp->fail() && !ifsp->bad());
+    if (_deq_hdr._xidsize) {
+        int res = ::rec_tail_check(&_deq_tail, &_deq_hdr._rhdr, checksum);
+        if (res != 0) {
+            std::stringstream oss;
+            switch (res) {
+              case 1: oss << std::hex << "Magic: expected 0x" << ~_deq_hdr._rhdr._magic << "; found 0x" << _deq_tail._xmagic; break;
+              case 2: oss << std::hex << "Serial: expected 0x" << _deq_hdr._rhdr._serial << "; found 0x" << _deq_tail._serial; break;
+              case 3: oss << std::hex << "Record Id: expected 0x" << _deq_hdr._rhdr._rid << "; found 0x" << _deq_tail._rid; break;
+              case 4: oss << std::hex << "Checksum: expected 0x" << checksum << "; found 0x" << _deq_tail._checksum; break;
+              default: oss << "Unknown error " << res;
+            }
+            throw jexception(jerrno::JERR_JREC_BADRECTAIL, oss.str(), "deq_rec", "decode"); // TODO: Don't throw exception, log info
+        }
+    }
     return true;
 }
 
@@ -427,38 +279,9 @@ deq_rec::rec_size() const
 }
 
 void
-deq_rec::chk_hdr() const
-{
-    jrec::chk_hdr(_deq_hdr._rhdr);
-    if (_deq_hdr._rhdr._magic != QLS_DEQ_MAGIC)
-    {
-        std::ostringstream oss;
-        oss << std::hex << std::setfill('0');
-        oss << "deq magic: rid=0x" << std::setw(16) << _deq_hdr._rhdr._rid;
-        oss << ": expected=0x" << std::setw(8) << QLS_DEQ_MAGIC;
-        oss << " read=0x" << std::setw(2) << (int)_deq_hdr._rhdr._magic;
-        throw jexception(jerrno::JERR_JREC_BADRECHDR, oss.str(), "deq_rec", "chk_hdr");
-    }
-}
-
-void
-deq_rec::chk_hdr(uint64_t rid) const
-{
-    chk_hdr();
-    jrec::chk_rid(_deq_hdr._rhdr, rid);
-}
-
-void
-deq_rec::chk_tail() const
-{
-    jrec::chk_tail(_deq_tail, _deq_hdr._rhdr);
-}
-
-void
 deq_rec::clean()
 {
     // clean up allocated memory here
 }
 
-} // namespace journal
-} // namespace mrg
+}}}

Modified: qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/deq_rec.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/deq_rec.h?rev=1542066&r1=1542065&r2=1542066&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/deq_rec.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/deq_rec.h Thu Nov 14 20:39:32 2013
@@ -19,73 +19,51 @@
  *
  */
 
-#ifndef QPID_LEGACYSTORE_JRNL_DEQ_REQ_H
-#define QPID_LEGACYSTORE_JRNL_DEQ_REQ_H
+#ifndef QPID_LINEARSTORE_JOURNAL_DEQ_REQ_H
+#define QPID_LINEARSTORE_JOURNAL_DEQ_REQ_H
 
-namespace qpid
-{
-namespace qls_jrnl
-{
-class deq_rec;
-}}
-
-#include <cstddef>
-#include "qpid/linearstore/jrnl/utils/deq_hdr.h"
 #include "qpid/linearstore/jrnl/jrec.h"
+#include "qpid/linearstore/jrnl/utils/deq_hdr.h"
+#include "qpid/linearstore/jrnl/utils/rec_tail.h"
 
-namespace qpid
-{
-namespace qls_jrnl
+namespace qpid {
+namespace linearstore {
+namespace journal {
+
+/**
+* \class deq_rec
+* \brief Class to handle a single journal dequeue record.
+*/
+class deq_rec : public jrec
 {
+private:
+    ::deq_hdr_t _deq_hdr;   ///< Local instance of dequeue header struct
+    const void* _xidp;      ///< xid pointer for encoding (writing to disk)
+    void* _buff;            ///< Pointer to buffer to receive data read from disk
+    ::rec_tail_t _deq_tail; ///< Local instance of enqueue tail struct, only encoded if XID is present
+
+public:
+    deq_rec();
+    virtual ~deq_rec();
+
+    void reset(const uint64_t serial, const uint64_t rid, const  uint64_t drid, const void* const xidp,
+               const std::size_t xidlen, const bool txn_coml_commit);
+    uint32_t encode(void* wptr, uint32_t rec_offs_dblks, uint32_t max_size_dblks);
+    bool decode(::rec_hdr_t& h, std::ifstream* ifsp, std::size_t& rec_offs);
+
+    inline bool is_txn_coml_commit() const { return ::is_txn_coml_commit(&_deq_hdr); }
+    inline uint64_t rid() const { return _deq_hdr._rhdr._rid; }
+    inline uint64_t deq_rid() const { return _deq_hdr._deq_rid; }
+    std::size_t get_xid(void** const xidpp);
+    std::string& str(std::string& str) const;
+    inline std::size_t data_size() const { return 0; } // This record never carries data
+    std::size_t xid_size() const;
+    std::size_t rec_size() const;
+
+private:
+    virtual void clean();
+};
 
-    /**
-    * \class deq_rec
-    * \brief Class to handle a single journal dequeue record.
-    */
-    class deq_rec : public jrec
-    {
-    private:
-        deq_hdr_t _deq_hdr;           ///< Dequeue header
-        const void* _xidp;          ///< xid pointer for encoding (writing to disk)
-        void* _buff;                ///< Pointer to buffer to receive data read from disk
-        rec_tail_t _deq_tail;         ///< Record tail, only encoded if XID is present
-
-    public:
-        // constructor used for read operations and xid will have memory allocated
-        deq_rec();
-        // constructor used for write operations, where xid already exists
-        deq_rec(const uint64_t rid, const uint64_t drid, const void* const xidp,
-                const std::size_t xidlen, const bool txn_coml_commit);
-        virtual ~deq_rec();
-
-        // Prepare instance for use in reading data from journal
-        void reset();
-        // Prepare instance for use in writing data to journal
-        void reset(const  uint64_t rid, const  uint64_t drid, const void* const xidp,
-                const std::size_t xidlen, const bool txn_coml_commit);
-        uint32_t encode(void* wptr, uint32_t rec_offs_dblks, uint32_t max_size_dblks);
-        uint32_t decode(rec_hdr_t& h, void* rptr, uint32_t rec_offs_dblks,
-                uint32_t max_size_dblks);
-        // Decode used for recover
-        bool rcv_decode(rec_hdr_t h, std::ifstream* ifsp, std::size_t& rec_offs);
-
-        inline bool is_txn_coml_commit() const { return ::is_txn_coml_commit(&_deq_hdr); }
-        inline uint64_t rid() const { return _deq_hdr._rhdr._rid; }
-        inline uint64_t deq_rid() const { return _deq_hdr._deq_rid; }
-        std::size_t get_xid(void** const xidpp);
-        std::string& str(std::string& str) const;
-        inline std::size_t data_size() const { return 0; } // This record never carries data
-        std::size_t xid_size() const;
-        std::size_t rec_size() const;
-
-    private:
-        virtual void chk_hdr() const;
-        virtual void chk_hdr(uint64_t rid) const;
-        virtual void chk_tail() const;
-        virtual void clean();
-    }; // class deq_rec
-
-} // namespace journal
-} // namespace mrg
+}}}
 
-#endif // ifndef QPID_LEGACYSTORE_JRNL_DEQ_REQ_H
+#endif // ifndef QPID_LINEARSTORE_JOURNAL_DEQ_REQ_H

Modified: qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/enq_map.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/enq_map.cpp?rev=1542066&r1=1542065&r2=1542066&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/enq_map.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/enq_map.cpp Thu Nov 14 20:39:32 2013
@@ -27,10 +27,9 @@
 #include <sstream>
 
 
-namespace qpid
-{
-namespace qls_jrnl
-{
+namespace qpid {
+namespace linearstore {
+namespace journal {
 
 // static return/error codes
 int16_t enq_map::EMAP_DUP_RID = -3;
@@ -183,4 +182,4 @@ enq_map::pfid_list(std::vector<uint64_t>
     }
 }
 
-}}
+}}}

Modified: qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/enq_map.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/enq_map.h?rev=1542066&r1=1542065&r2=1542066&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/enq_map.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/enq_map.h Thu Nov 14 20:39:32 2013
@@ -19,15 +19,14 @@
  *
  */
 
-#ifndef QPID_LEGACYSTORE_JRNL_ENQ_MAP_H
-#define QPID_LEGACYSTORE_JRNL_ENQ_MAP_H
+#ifndef QPID_LINEARSTORE_JOURNAL_ENQ_MAP_H
+#define QPID_LINEARSTORE_JOURNAL_ENQ_MAP_H
 
-namespace qpid
-{
-namespace qls_jrnl
-{
+namespace qpid {
+namespace linearstore {
+namespace journal {
 class enq_map;
-}}
+}}}
 
 #include "qpid/linearstore/jrnl/jexception.h"
 #include "qpid/linearstore/jrnl/smutex.h"
@@ -35,10 +34,9 @@ class enq_map;
 #include <pthread.h>
 #include <vector>
 
-namespace qpid
-{
-namespace qls_jrnl
-{
+namespace qpid {
+namespace linearstore {
+namespace journal {
 
     /**
     * \class enq_map
@@ -107,6 +105,6 @@ namespace qls_jrnl
         void pfid_list(std::vector<uint64_t>& fv);
     };
 
-}}
+}}}
 
-#endif // ifndef QPID_LEGACYSTORE_JRNL_ENQ_MAP_H
+#endif // ifndef QPID_LINEARSTORE_JOURNAL_ENQ_MAP_H

Modified: qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/enq_rec.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/enq_rec.cpp?rev=1542066&r1=1542065&r2=1542066&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/enq_rec.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/enq_rec.cpp Thu Nov 14 20:39:32 2013
@@ -22,45 +22,22 @@
 #include "qpid/linearstore/jrnl/enq_rec.h"
 
 #include <cassert>
-#include <cerrno>
-#include <cstdlib>
 #include <cstring>
 #include <iomanip>
-#include "qpid/linearstore/jrnl/jerrno.h"
 #include "qpid/linearstore/jrnl/jexception.h"
-#include <sstream>
 
-namespace qpid
-{
-namespace qls_jrnl
-{
+namespace qpid {
+namespace linearstore {
+namespace journal {
 
-// Constructor used for read operations, where buf contains preallocated space to receive data.
 enq_rec::enq_rec():
         jrec(), // superclass
-        //_enq_hdr(QLS_ENQ_MAGIC, QLS_JRNL_VERSION, 0, 0, 0, false, false),
         _xidp(0),
         _data(0),
         _buff(0)
-        //_enq_tail(_enq_hdr)
-{
-    ::enq_hdr_init(&_enq_hdr, QLS_ENQ_MAGIC, QLS_JRNL_VERSION, 0, 0, 0, false);
-    ::rec_tail_copy(&_enq_tail, &_enq_hdr._rhdr, 0);
-}
-
-// Constructor used for transactional write operations, where dbuf contains data to be written.
-enq_rec::enq_rec(const uint64_t rid, const void* const dbuf, const std::size_t dlen,
-        const void* const xidp, const std::size_t xidlen, const bool transient):
-        jrec(), // superclass
-        //_enq_hdr(QLS_ENQ_MAGIC, QLS_JRNL_VERSION, rid, xidlen, dlen, owi, transient),
-        _xidp(xidp),
-        _data(dbuf),
-        _buff(0)
-        //_enq_tail(_enq_hdr)
 {
-    ::enq_hdr_init(&_enq_hdr, QLS_ENQ_MAGIC, QLS_JRNL_VERSION, 0, rid, xidlen, dlen);
+    ::enq_hdr_init(&_enq_hdr, QLS_ENQ_MAGIC, QLS_JRNL_VERSION, 0, 0, 0, 0, false);
     ::rec_tail_copy(&_enq_tail, &_enq_hdr._rhdr, 0);
-    ::set_enq_transient(&_enq_hdr, transient);
 }
 
 enq_rec::~enq_rec()
@@ -68,28 +45,11 @@ enq_rec::~enq_rec()
     clean();
 }
 
-// Prepare instance for use in reading data from journal, where buf contains preallocated space
-// to receive data.
 void
-enq_rec::reset()
-{
-    _enq_hdr._rhdr._rid = 0;
-    ::set_enq_transient(&_enq_hdr, false);
-    _enq_hdr._xidsize = 0;
-    _enq_hdr._dsize = 0;
-    _xidp = 0;
-    _data = 0;
-    _buff = 0;
-    _enq_tail._rid = 0;
-}
-
-// Prepare instance for use in writing transactional data to journal, where dbuf contains data to
-// be written.
-void
-enq_rec::reset(const uint64_t rid, const void* const dbuf, const std::size_t dlen,
-        const void* const xidp, const std::size_t xidlen, const bool transient,
-        const bool external)
+enq_rec::reset(const uint64_t serial, const uint64_t rid, const void* const dbuf, const std::size_t dlen,
+        const void* const xidp, const std::size_t xidlen, const bool transient, const bool external)
 {
+    _enq_hdr._rhdr._serial = serial;
     _enq_hdr._rhdr._rid = rid;
     ::set_enq_transient(&_enq_hdr, transient);
     ::set_enq_external(&_enq_hdr, external);
@@ -98,6 +58,7 @@ enq_rec::reset(const uint64_t rid, const
     _xidp = xidp;
     _data = dbuf;
     _buff = 0;
+    _enq_tail._serial = serial;
     _enq_tail._rid = rid;
 }
 
@@ -246,217 +207,19 @@ enq_rec::encode(void* wptr, uint32_t rec
     return size_dblks(wr_cnt);
 }
 
-uint32_t
-enq_rec::decode(rec_hdr_t& h, void* rptr, uint32_t rec_offs_dblks, uint32_t max_size_dblks)
-{
-    assert(rptr != 0);
-    assert(max_size_dblks > 0);
-
-    std::size_t rd_cnt = 0;
-    if (rec_offs_dblks) // Continuation of record on new page
-    {
-        const uint32_t hdr_xid_data_size = sizeof(enq_hdr_t) + _enq_hdr._xidsize +
-                (::is_enq_external(&_enq_hdr) ? 0 : _enq_hdr._dsize);
-        const uint32_t hdr_xid_data_tail_size = hdr_xid_data_size + sizeof(rec_tail_t);
-        const uint32_t hdr_data_dblks = size_dblks(hdr_xid_data_size);
-        const uint32_t hdr_tail_dblks = size_dblks(hdr_xid_data_tail_size);
-        const std::size_t rec_offs = rec_offs_dblks * QLS_DBLK_SIZE_BYTES;
-        const std::size_t offs = rec_offs - sizeof(enq_hdr_t);
-
-        if (hdr_tail_dblks - rec_offs_dblks <= max_size_dblks)
-        {
-            // Remainder of record fits within this page
-            if (offs < _enq_hdr._xidsize)
-            {
-                // some XID still outstanding, copy remainder of XID, data and tail
-                const std::size_t rem = _enq_hdr._xidsize + _enq_hdr._dsize - offs;
-                std::memcpy((char*)_buff + offs, rptr, rem);
-                rd_cnt += rem;
-                std::memcpy((void*)&_enq_tail, ((char*)rptr + rd_cnt), sizeof(_enq_tail));
-                chk_tail();
-                rd_cnt += sizeof(_enq_tail);
-            }
-            else if (offs < _enq_hdr._xidsize + _enq_hdr._dsize && !::is_enq_external(&_enq_hdr))
-            {
-                // some data still outstanding, copy remainder of data and tail
-                const std::size_t data_offs = offs - _enq_hdr._xidsize;
-                const std::size_t data_rem = _enq_hdr._dsize - data_offs;
-                std::memcpy((char*)_buff + offs, rptr, data_rem);
-                rd_cnt += data_rem;
-                std::memcpy((void*)&_enq_tail, ((char*)rptr + rd_cnt), sizeof(_enq_tail));
-                chk_tail();
-                rd_cnt += sizeof(_enq_tail);
-            }
-            else
-            {
-                // Tail or part of tail only outstanding, complete tail
-                const std::size_t tail_offs = rec_offs - sizeof(enq_hdr_t) - _enq_hdr._xidsize -
-                        _enq_hdr._dsize;
-                const std::size_t tail_rem = sizeof(rec_tail_t) - tail_offs;
-                std::memcpy((char*)&_enq_tail + tail_offs, rptr, tail_rem);
-                chk_tail();
-                rd_cnt = tail_rem;
-            }
-        }
-        else if (hdr_data_dblks - rec_offs_dblks <= max_size_dblks)
-        {
-            // Remainder of xid & data fits within this page; tail split
-
-            /*
-             * TODO: This section needs revision. Since it is known that the end of the page falls within the
-             * tail record, it is only necessary to write from the current offset to the end of the page under
-             * all circumstances. The multiple if/else combinations may be eliminated, as well as one memcpy()
-             * operation.
-             *
-             * Also note that Coverity has detected a possible memory overwrite in this block. It occurs if
-             * both the following two if() stmsts (numbered) are false. With rd_cnt = 0, this would result in
-             * the value of tail_rem > sizeof(tail_rec). Practically, this could only happen if the start and
-             * end of a page both fall within the same tail record, in which case the tail would have to be
-             * (much!) larger. However, the logic here does not account for this possibility.
-             *
-             * If the optimization above is undertaken, this code would probably be removed.
-             */
-            if (offs < _enq_hdr._xidsize) // 1
-            {
-                // some XID still outstanding, copy remainder of XID and data
-                const std::size_t rem = _enq_hdr._xidsize + _enq_hdr._dsize - offs;
-                std::memcpy((char*)_buff + offs, rptr, rem);
-                rd_cnt += rem;
-            }
-            else if (offs < _enq_hdr._xidsize + _enq_hdr._dsize && !::is_enq_external(&_enq_hdr)) // 2
-            {
-                // some data still outstanding, copy remainder of data
-                const std::size_t data_offs = offs - _enq_hdr._xidsize;
-                const std::size_t data_rem = _enq_hdr._dsize - data_offs;
-                std::memcpy((char*)_buff + offs, rptr, data_rem);
-                rd_cnt += data_rem;
-            }
-            const std::size_t tail_rem = (max_size_dblks * QLS_DBLK_SIZE_BYTES) - rd_cnt;
-            if (tail_rem)
-            {
-                std::memcpy((void*)&_enq_tail, ((char*)rptr + rd_cnt), tail_rem);
-                rd_cnt += tail_rem;
-            }
-        }
-        else
-        {
-            // Since xid and data are contiguous, both fit within current page - copy whole page
-            const std::size_t data_cp_size = (max_size_dblks * QLS_DBLK_SIZE_BYTES);
-            std::memcpy((char*)_buff + offs, rptr, data_cp_size);
-            rd_cnt += data_cp_size;
-        }
-    }
-    else // Start of record
-    {
-        // Get and check header
-        //_enq_hdr.hdr_copy(h);
-        ::rec_hdr_copy(&_enq_hdr._rhdr, &h);
-        rd_cnt = sizeof(rec_hdr_t);
-        _enq_hdr._xidsize = *(std::size_t*)((char*)rptr + rd_cnt);
-        rd_cnt += sizeof(std::size_t);
-#if defined(JRNL_32_BIT)
-        rd_cnt += sizeof(uint32_t); // Filler 0
-#endif
-        _enq_hdr._dsize = *(std::size_t*)((char*)rptr + rd_cnt);
-        rd_cnt = sizeof(enq_hdr_t);
-        chk_hdr();
-        if (_enq_hdr._xidsize + (::is_enq_external(&_enq_hdr) ? 0 : _enq_hdr._dsize))
-        {
-            _buff = std::malloc(_enq_hdr._xidsize + (::is_enq_external(&_enq_hdr) ? 0 : _enq_hdr._dsize));
-            MALLOC_CHK(_buff, "_buff", "enq_rec", "decode");
-
-            const uint32_t hdr_xid_size = sizeof(enq_hdr_t) + _enq_hdr._xidsize;
-            const uint32_t hdr_xid_data_size = hdr_xid_size + (::is_enq_external(&_enq_hdr) ? 0 : _enq_hdr._dsize);
-            const uint32_t hdr_xid_data_tail_size = hdr_xid_data_size + sizeof(rec_tail_t);
-            const uint32_t hdr_xid_dblks  = size_dblks(hdr_xid_size);
-            const uint32_t hdr_data_dblks = size_dblks(hdr_xid_data_size);
-            const uint32_t hdr_tail_dblks = size_dblks(hdr_xid_data_tail_size);
-            // Check if record (header + data + tail) fits within this page, we can check the
-            // tail before the expense of copying data to memory
-            if (hdr_tail_dblks <= max_size_dblks)
-            {
-                // Header, xid, data and tail fits within this page
-                if (_enq_hdr._xidsize)
-                {
-                    std::memcpy(_buff, (char*)rptr + rd_cnt, _enq_hdr._xidsize);
-                    rd_cnt += _enq_hdr._xidsize;
-                }
-                if (_enq_hdr._dsize && !::is_enq_external(&_enq_hdr))
-                {
-                    std::memcpy((char*)_buff + _enq_hdr._xidsize, (char*)rptr + rd_cnt,
-                            _enq_hdr._dsize);
-                    rd_cnt += _enq_hdr._dsize;
-                }
-                std::memcpy((void*)&_enq_tail, (char*)rptr + rd_cnt, sizeof(_enq_tail));
-                chk_tail();
-                rd_cnt += sizeof(_enq_tail);
-            }
-            else if (hdr_data_dblks <= max_size_dblks)
-            {
-                // Header, xid and data fit within this page, tail split or separated
-                if (_enq_hdr._xidsize)
-                {
-                    std::memcpy(_buff, (char*)rptr + rd_cnt, _enq_hdr._xidsize);
-                    rd_cnt += _enq_hdr._xidsize;
-                }
-                if (_enq_hdr._dsize && !::is_enq_external(&_enq_hdr))
-                {
-                    std::memcpy((char*)_buff + _enq_hdr._xidsize, (char*)rptr + rd_cnt,
-                            _enq_hdr._dsize);
-                    rd_cnt += _enq_hdr._dsize;
-                }
-                const std::size_t tail_rem = (max_size_dblks * QLS_DBLK_SIZE_BYTES) - rd_cnt;
-                if (tail_rem)
-                {
-                    std::memcpy((void*)&_enq_tail, (char*)rptr + rd_cnt, tail_rem);
-                    rd_cnt += tail_rem;
-                }
-            }
-            else if (hdr_xid_dblks <= max_size_dblks)
-            {
-                // Header and xid fits within this page, data split or separated
-                if (_enq_hdr._xidsize)
-                {
-                    std::memcpy(_buff, (char*)rptr + rd_cnt, _enq_hdr._xidsize);
-                    rd_cnt += _enq_hdr._xidsize;
-                }
-                if (_enq_hdr._dsize && !::is_enq_external(&_enq_hdr))
-                {
-                    const std::size_t data_cp_size = (max_size_dblks * QLS_DBLK_SIZE_BYTES) - rd_cnt;
-                    std::memcpy((char*)_buff + _enq_hdr._xidsize, (char*)rptr + rd_cnt, data_cp_size);
-                    rd_cnt += data_cp_size;
-                }
-            }
-            else
-            {
-                // Header fits within this page, xid split or separated
-                const std::size_t data_cp_size = (max_size_dblks * QLS_DBLK_SIZE_BYTES) - rd_cnt;
-                std::memcpy(_buff, (char*)rptr + rd_cnt, data_cp_size);
-                rd_cnt += data_cp_size;
-            }
-        }
-    }
-    return size_dblks(rd_cnt);
-}
-
 bool
-enq_rec::rcv_decode(rec_hdr_t h, std::ifstream* ifsp, std::size_t& rec_offs)
+enq_rec::decode(::rec_hdr_t& h, std::ifstream* ifsp, std::size_t& rec_offs)
 {
+    uint32_t checksum = 0UL; // TODO: Add checksum math
     if (rec_offs == 0)
     {
         // Read header, allocate (if req'd) for xid
         //_enq_hdr.hdr_copy(h);
         ::rec_hdr_copy(&_enq_hdr._rhdr, &h);
-        ifsp->read((char*)&_enq_hdr._xidsize, sizeof(std::size_t));
-#if defined(JRNL_32_BIT)
-        ifsp->ignore(sizeof(uint32_t)); // _filler0
-#endif
-        ifsp->read((char*)&_enq_hdr._dsize, sizeof(std::size_t));
-#if defined(JRNL_32_BIT)
-        ifsp->ignore(sizeof(uint32_t)); // _filler1
-#endif
+        ifsp->read((char*)&_enq_hdr._xidsize, sizeof(_enq_hdr._xidsize));
+        ifsp->read((char*)&_enq_hdr._dsize, sizeof(_enq_hdr._dsize));
         rec_offs = sizeof(_enq_hdr);
-        if (_enq_hdr._xidsize)
+        if (_enq_hdr._xidsize > 0)
         {
             _buff = std::malloc(_enq_hdr._xidsize);
             MALLOC_CHK(_buff, "_buff", "enq_rec", "rcv_decode");
@@ -517,8 +280,19 @@ enq_rec::rcv_decode(rec_hdr_t h, std::if
         }
     }
     ifsp->ignore(rec_size_dblks() * QLS_DBLK_SIZE_BYTES - rec_size());
-    chk_tail(); // Throws if tail invalid or record incomplete
     assert(!ifsp->fail() && !ifsp->bad());
+    int res = ::rec_tail_check(&_enq_tail, &_enq_hdr._rhdr, checksum);
+    if (res != 0) {
+        std::stringstream oss;
+        switch (res) {
+          case 1: oss << std::hex << "Magic: expected 0x" << ~_enq_hdr._rhdr._magic << "; found 0x" << _enq_tail._xmagic; break;
+          case 2: oss << std::hex << "Serial: expected 0x" << _enq_hdr._rhdr._serial << "; found 0x" << _enq_tail._serial; break;
+          case 3: oss << std::hex << "Record Id: expected 0x" << _enq_hdr._rhdr._rid << "; found 0x" << _enq_tail._rid; break;
+          case 4: oss << std::hex << "Checksum: expected 0x" << checksum << "; found 0x" << _enq_tail._checksum; break;
+          default: oss << "Unknown error " << res;
+        }
+        throw jexception(jerrno::JERR_JREC_BADRECTAIL, oss.str(), "enq_rec", "decode"); // TODO: Don't throw exception, log info
+    }
     return true;
 }
 
@@ -578,44 +352,9 @@ enq_rec::rec_size(const std::size_t xids
 }
 
 void
-enq_rec::set_rid(const uint64_t rid)
-{
-    _enq_hdr._rhdr._rid = rid;
-    _enq_tail._rid = rid;
-}
-
-void
-enq_rec::chk_hdr() const
-{
-    jrec::chk_hdr(_enq_hdr._rhdr);
-    if (_enq_hdr._rhdr._magic != QLS_ENQ_MAGIC)
-    {
-        std::ostringstream oss;
-        oss << std::hex << std::setfill('0');
-        oss << "enq magic: rid=0x" << std::setw(16) << _enq_hdr._rhdr._rid;
-        oss << ": expected=0x" << std::setw(8) << QLS_ENQ_MAGIC;
-        oss << " read=0x" << std::setw(2) << (int)_enq_hdr._rhdr._magic;
-        throw jexception(jerrno::JERR_JREC_BADRECHDR, oss.str(), "enq_rec", "chk_hdr");
-    }
-}
-
-void
-enq_rec::chk_hdr(uint64_t rid) const
-{
-    chk_hdr();
-    jrec::chk_rid(_enq_hdr._rhdr, rid);
-}
-
-void
-enq_rec::chk_tail() const
-{
-    jrec::chk_tail(_enq_tail, _enq_hdr._rhdr);
-}
-
-void
 enq_rec::clean()
 {
     // clean up allocated memory here
 }
 
-}}
+}}}

Modified: qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/enq_rec.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/enq_rec.h?rev=1542066&r1=1542065&r2=1542066&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/enq_rec.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/enq_rec.h Thu Nov 14 20:39:32 2013
@@ -19,86 +19,54 @@
  *
  */
 
-#ifndef QPID_LEGACYSTORE_JRNL_ENQ_REC_H
-#define QPID_LEGACYSTORE_JRNL_ENQ_REC_H
+#ifndef QPID_LINEARSTORE_JOURNAL_ENQ_REC_H
+#define QPID_LINEARSTORE_JOURNAL_ENQ_REC_H
 
-namespace qpid
-{
-namespace qls_jrnl
-{
-class enq_rec;
-}}
-
-#include <cstddef>
-#include "qpid/linearstore/jrnl/utils/enq_hdr.h"
 #include "qpid/linearstore/jrnl/jrec.h"
+#include "qpid/linearstore/jrnl/utils/enq_hdr.h"
+#include "qpid/linearstore/jrnl/utils/rec_tail.h"
 
-namespace qpid
-{
-namespace qls_jrnl
+namespace qpid {
+namespace linearstore {
+namespace journal {
+
+/**
+* \class enq_rec
+* \brief Class to handle a single journal enqueue record.
+*/
+class enq_rec : public jrec
 {
+private:
+    ::enq_hdr_t _enq_hdr;   ///< Local instance of enqueue header struct
+    const void* _xidp;      ///< xid pointer for encoding (for writing to disk)
+    const void* _data;      ///< Pointer to data to be written to disk
+    void* _buff;            ///< Pointer to buffer to receive data read from disk
+    ::rec_tail_t _enq_tail; ///< Local instance of enqueue tail struct
+
+public:
+    enq_rec();
+    virtual ~enq_rec();
+
+    void reset(const uint64_t serial, const uint64_t rid, const void* const dbuf, const std::size_t dlen,
+               const void* const xidp, const std::size_t xidlen, const bool transient, const bool external);
+    uint32_t encode(void* wptr, uint32_t rec_offs_dblks, uint32_t max_size_dblks);
+    bool decode(::rec_hdr_t& h, std::ifstream* ifsp, std::size_t& rec_offs);
+
+    std::size_t get_xid(void** const xidpp);
+    std::size_t get_data(void** const datapp);
+    inline bool is_transient() const { return ::is_enq_transient(&_enq_hdr); }
+    inline bool is_external() const { return ::is_enq_external(&_enq_hdr); }
+    std::string& str(std::string& str) const;
+    inline std::size_t data_size() const { return _enq_hdr._dsize; }
+    inline std::size_t xid_size() const { return _enq_hdr._xidsize; }
+    std::size_t rec_size() const;
+    static std::size_t rec_size(const std::size_t xidsize, const std::size_t dsize, const bool external);
+    inline uint64_t rid() const { return _enq_hdr._rhdr._rid; }
+
+private:
+    virtual void clean();
+};
 
-    /**
-    * \class enq_rec
-    * \brief Class to handle a single journal enqueue record.
-    */
-    class enq_rec : public jrec
-    {
-    private:
-        enq_hdr_t _enq_hdr;
-        const void* _xidp;          ///< xid pointer for encoding (for writing to disk)
-        const void* _data;          ///< Pointer to data to be written to disk
-        void* _buff;                ///< Pointer to buffer to receive data read from disk
-        rec_tail_t _enq_tail;
-
-    public:
-        /**
-        * \brief Constructor used for read operations.
-        */
-        enq_rec();
-
-        /**
-        * \brief Constructor used for write operations, where mbuf contains data to be written.
-        */
-        enq_rec(const uint64_t rid, const void* const dbuf, const std::size_t dlen,
-                const void* const xidp, const std::size_t xidlen, const bool transient);
-
-        /**
-        * \brief Destructor
-        */
-        virtual ~enq_rec();
-
-        // Prepare instance for use in reading data from journal, xid and data will be allocated
-        void reset();
-        // Prepare instance for use in writing data to journal
-        void reset(const uint64_t rid, const void* const dbuf, const std::size_t dlen,
-                const void* const xidp, const std::size_t xidlen, const bool transient,
-                const bool external);
-
-        uint32_t encode(void* wptr, uint32_t rec_offs_dblks, uint32_t max_size_dblks);
-        uint32_t decode(rec_hdr_t& h, void* rptr, uint32_t rec_offs_dblks, uint32_t max_size_dblks);
-        // Decode used for recover
-        bool rcv_decode(rec_hdr_t h, std::ifstream* ifsp, std::size_t& rec_offs);
-
-        std::size_t get_xid(void** const xidpp);
-        std::size_t get_data(void** const datapp);
-        inline bool is_transient() const { return ::is_enq_transient(&_enq_hdr); }
-        inline bool is_external() const { return ::is_enq_external(&_enq_hdr); }
-        std::string& str(std::string& str) const;
-        inline std::size_t data_size() const { return _enq_hdr._dsize; }
-        inline std::size_t xid_size() const { return _enq_hdr._xidsize; }
-        std::size_t rec_size() const;
-        static std::size_t rec_size(const std::size_t xidsize, const std::size_t dsize, const bool external);
-        inline uint64_t rid() const { return _enq_hdr._rhdr._rid; }
-        void set_rid(const uint64_t rid);
-
-    private:
-        void chk_hdr() const;
-        void chk_hdr(uint64_t rid) const;
-        void chk_tail() const;
-        virtual void clean();
-    }; // class enq_rec
-
-}}
+}}}
 
-#endif // ifndef QPID_LEGACYSTORE_JRNL_ENQ_REC_H
+#endif // ifndef QPID_LINEARSTORE_JOURNAL_ENQ_REC_H

Modified: qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/enums.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/enums.h?rev=1542066&r1=1542065&r2=1542066&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/enums.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/enums.h Thu Nov 14 20:39:32 2013
@@ -19,13 +19,12 @@
  *
  */
 
-#ifndef QPID_LINEARSTORE_JRNL_ENUMS_H
-#define QPID_LINEARSTORE_JRNL_ENUMS_H
+#ifndef QPID_LINEARSTORE_JOURNAL_ENUMS_H
+#define QPID_LINEARSTORE_JOURNAL_ENUMS_H
 
-namespace qpid
-{
-namespace qls_jrnl
-{
+namespace qpid {
+namespace linearstore {
+namespace journal {
 
     // TODO: Change this to flags, as multiple of these conditions may exist simultaneously
     /**
@@ -37,12 +36,7 @@ namespace qls_jrnl
         RHM_IORES_PAGE_AIOWAIT, ///< IO operation suspended - next page is waiting for AIO.
         RHM_IORES_FILE_AIOWAIT, ///< IO operation suspended - next file is waiting for AIO.
         RHM_IORES_EMPTY,        ///< During read operations, nothing further is available to read.
-//        RHM_IORES_RCINVALID,    ///< Read page cache is invalid (ie obsolete or uninitialized)
-//        RHM_IORES_ENQCAPTHRESH, ///< Enqueue capacity threshold (limit) reached.
-//        RHM_IORES_FULL,         ///< During write operations, the journal files are full.
-//        RHM_IORES_BUSY,         ///< Another blocking operation is in progress.
         RHM_IORES_TXPENDING     ///< Operation blocked by pending transaction.
-//        RHM_IORES_NOTIMPL       ///< Function is not implemented.
     };
     typedef _iores iores;
 
@@ -54,46 +48,11 @@ namespace qls_jrnl
             case RHM_IORES_PAGE_AIOWAIT: return "RHM_IORES_PAGE_AIOWAIT";
             case RHM_IORES_FILE_AIOWAIT: return "RHM_IORES_FILE_AIOWAIT";
             case RHM_IORES_EMPTY: return "RHM_IORES_EMPTY";
-//            case RHM_IORES_RCINVALID: return "RHM_IORES_RCINVALID";
-//            case RHM_IORES_ENQCAPTHRESH: return "RHM_IORES_ENQCAPTHRESH";
-//            case RHM_IORES_FULL: return "RHM_IORES_FULL";
-//            case RHM_IORES_BUSY: return "RHM_IORES_BUSY";
             case RHM_IORES_TXPENDING: return "RHM_IORES_TXPENDING";
-//            case RHM_IORES_NOTIMPL: return "RHM_IORES_NOTIMPL";
         }
         return "<iores unknown>";
     }
 
-/*
-    enum _log_level
-    {
-        LOG_TRACE = 0,
-        LOG_DEBUG,
-        LOG_INFO,
-        LOG_NOTICE,
-        LOG_WARN,
-        LOG_ERROR,
-        LOG_CRITICAL
-    };
-    typedef _log_level log_level_t;
-
-    static inline const char* log_level_str(log_level_t ll)
-    {
-        switch (ll)
-        {
-            case LOG_TRACE: return "TRACE";
-            case LOG_DEBUG: return "DEBUG";
-            case LOG_INFO: return "INFO";
-            case LOG_NOTICE: return "NOTICE";
-            case LOG_WARN: return "WARN";
-            case LOG_ERROR: return "ERROR";
-            case LOG_CRITICAL: return "CRITICAL";
-        }
-        return "<log level unknown>";
-    }
-*/
-
-
-}}
+}}}
 
-#endif // ifndef QPID_LINEARSTORE_JRNL_ENUMS_H
+#endif // ifndef QPID_LINEARSTORE_JOURNAL_ENUMS_H

Modified: qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/jcfg.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/jcfg.h?rev=1542066&r1=1542065&r2=1542066&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/jcfg.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/jcfg.h Thu Nov 14 20:39:32 2013
@@ -19,8 +19,8 @@
  *
  */
 
-#ifndef QPID_LEGACYSTORE_JRNL_JCFG_H
-#define QPID_LEGACYSTORE_JRNL_JCFG_H
+#ifndef QPID_QLS_JRNL_JCFG_H
+#define QPID_QLS_JRNL_JCFG_H
 
 #define QLS_SBLK_SIZE_BYTES             4096        /**< Disk softblock size in bytes, should match size used on disk media */
 #define QLS_AIO_ALIGN_BOUNDARY_BYTES    QLS_SBLK_SIZE_BYTES /** Memory alignment boundary used for DMA */
@@ -55,4 +55,4 @@
 #define QLS_CLEAN                                   /**< If defined, writes QLS_CLEAN_CHAR to all filled areas on disk */
 #define QLS_CLEAN_CHAR                  0xff        /**< Char used to clear empty space on disk */
 
-#endif /* ifndef QPID_LEGACYSTORE_JRNL_JCFG_H */
+#endif /* ifndef QPID_QLS_JRNL_JCFG_H */

Modified: qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/jcntl.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/jcntl.cpp?rev=1542066&r1=1542065&r2=1542066&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/jcntl.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/jcntl.cpp Thu Nov 14 20:39:32 2013
@@ -38,10 +38,9 @@
 #include <sstream>
 #include <unistd.h>
 
-namespace qpid
-{
-namespace qls_jrnl
-{
+namespace qpid {
+namespace linearstore {
+namespace journal {
 
 #define AIO_CMPL_TIMEOUT_SEC   5
 #define AIO_CMPL_TIMEOUT_NSEC  0
@@ -132,9 +131,9 @@ jcntl::recover(EmptyFilePoolManager* efp
     _recoveryManager.analyzeJournals(prep_txn_list_ptr, efpmp, &_emptyFilePoolPtr);
 
     highest_rid = _recoveryManager.getHighestRecordId();
-    _jrnl_log.log(/*LOG_DEBUG*/JournalLog::LOG_INFO, _jid, _recoveryManager.toString(_jid, true));
+    _jrnl_log.log(/*LOG_DEBUG*/JournalLog::LOG_INFO, _jid, _recoveryManager.toLog(_jid, 5));
     _linearFileController.initialize(_jdir.dirname(), _emptyFilePoolPtr, _recoveryManager.getHighestFileNumber());
-    _recoveryManager.setLinearFileControllerJournals(&qpid::qls_jrnl::LinearFileController::addJournalFile, &_linearFileController);
+    _recoveryManager.setLinearFileControllerJournals(&qpid::linearstore::journal::LinearFileController::addJournalFile, &_linearFileController);
     _wmgr.initialize(cbp, wcache_pgsize_sblks, wcache_num_pages, QLS_WMGR_MAXDTOKPP, QLS_WMGR_MAXWAITUS,
             (_recoveryManager.isLastFileFull() ? 0 : _recoveryManager.getEndOffset()));
 
@@ -430,4 +429,4 @@ std::cout << "&&&&&& jcntl::handle_aio_w
     return false;
 }
 
-}}
+}}}

Modified: qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/jcntl.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/jcntl.h?rev=1542066&r1=1542065&r2=1542066&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/jcntl.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/jcntl.h Thu Nov 14 20:39:32 2013
@@ -19,15 +19,14 @@
  *
  */
 
-#ifndef QPID_LINEARSTORE_JRNL_JCNTL_H
-#define QPID_LINEARSTORE_JRNL_JCNTL_H
+#ifndef QPID_LINEARSTORE_JOURNAL_JCNTL_H
+#define QPID_LINEARSTORE_JOURNAL_JCNTL_H
 
-namespace qpid
-{
-namespace qls_jrnl
-{
+namespace qpid {
+namespace linearstore {
+namespace journal {
     class jcntl;
-}}
+}}}
 
 #include <cstddef>
 #include <deque>
@@ -38,10 +37,9 @@ namespace qls_jrnl
 #include "qpid/linearstore/jrnl/smutex.h"
 #include "qpid/linearstore/jrnl/wmgr.h"
 
-namespace qpid
-{
-namespace qls_jrnl
-{
+namespace qpid {
+namespace linearstore {
+namespace journal {
     class EmptyFilePool;
     class EmptyFilePoolManager;
 
@@ -570,6 +568,6 @@ namespace qls_jrnl
         bool handle_aio_wait(const iores res, iores& resout, const data_tok* dtp);
     };
 
-}}
+}}}
 
-#endif // ifndef QPID_LINEARSTORE_JRNL_JCNTL_H
+#endif // ifndef QPID_LINEARSTORE_JOURNAL_JCNTL_H

Modified: qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/jdir.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/jdir.cpp?rev=1542066&r1=1542065&r2=1542066&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/jdir.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/jdir.cpp Thu Nov 14 20:39:32 2013
@@ -32,10 +32,9 @@
 #include <sys/stat.h>
 #include <unistd.h>
 
-namespace qpid
-{
-namespace qls_jrnl
-{
+namespace qpid {
+namespace linearstore {
+namespace journal {
 
 jdir::jdir(const std::string& dirname/*, const std::string& _base_filename*/):
         _dirname(dirname)/*,
@@ -476,4 +475,4 @@ operator<<(std::ostream& os, const jdir*
     return os;
 }
 
-}}
+}}}

Modified: qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/jdir.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/jdir.h?rev=1542066&r1=1542065&r2=1542066&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/jdir.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/jdir.h Thu Nov 14 20:39:32 2013
@@ -19,25 +19,23 @@
  *
  */
 
-#ifndef QPID_LINEARSTORE_JRNL_JDIR_H
-#define QPID_LINEARSTORE_JRNL_JDIR_H
+#ifndef QPID_LINEARSTORE_JOURNAL_JDIR_H
+#define QPID_LINEARSTORE_JOURNAL_JDIR_H
 
-namespace qpid
-{
-namespace qls_jrnl
-{
+namespace qpid {
+namespace linearstore {
+namespace journal {
 class jdir;
-}}
+}}}
 
 //#include "qpid/linearstore/jrnl/jinf.h"
 #include <dirent.h>
 #include <string>
 #include <vector>
 
-namespace qpid
-{
-namespace qls_jrnl
-{
+namespace qpid {
+namespace linearstore {
+namespace journal {
 
     /**
     * \class jdir
@@ -364,6 +362,6 @@ namespace qls_jrnl
         static void close_dir(DIR* dir, const std::string& dir_name, const std::string& fn_name);
     };
 
-}}
+}}}
 
-#endif // ifndef QPID_LINEARSTORE_JRNL_JDIR_H
+#endif // ifndef QPID_LINEARSTORE_JOURNAL_JDIR_H

Modified: qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/jerrno.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/jerrno.cpp?rev=1542066&r1=1542065&r2=1542066&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/jerrno.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/jerrno.cpp Thu Nov 14 20:39:32 2013
@@ -21,10 +21,9 @@
 
 #include "qpid/linearstore/jrnl/jerrno.h"
 
-namespace qpid
-{
-namespace qls_jrnl
-{
+namespace qpid {
+namespace linearstore {
+namespace journal {
 
 std::map<uint32_t, const char*> jerrno::_err_map;
 std::map<uint32_t, const char*>::iterator jerrno::_err_map_itr;
@@ -214,4 +213,4 @@ jerrno::err_msg(const uint32_t err_no) t
     return _err_map_itr->second;
 }
 
-}}
+}}}

Modified: qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/jerrno.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/jerrno.h?rev=1542066&r1=1542065&r2=1542066&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/jerrno.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/jerrno.h Thu Nov 14 20:39:32 2013
@@ -19,24 +19,22 @@
  *
  */
 
-#ifndef QPID_LEGACYSTORE_JRNL_JERRNO_H
-#define QPID_LEGACYSTORE_JRNL_JERRNO_H
+#ifndef QPID_LINEARSTORE_JOURNAL_JERRNO_H
+#define QPID_LINEARSTORE_JOURNAL_JERRNO_H
 
-namespace qpid
-{
-namespace qls_jrnl
-{
+namespace qpid {
+namespace linearstore {
+namespace journal {
 class jerrno;
-}}
+}}}
 
 #include <map>
 #include <stdint.h>
 #include <string>
 
-namespace qpid
-{
-namespace qls_jrnl
-{
+namespace qpid {
+namespace linearstore {
+namespace journal {
 
     /**
     * \class jerrno
@@ -144,6 +142,6 @@ namespace qls_jrnl
         static bool __init();
     };
 
-}}
+}}}
 
-#endif // ifndef QPID_LEGACYSTORE_JRNL_JERRNO_H
+#endif // ifndef QPID_LINEARSTORE_JOURNAL_JERRNO_H

Modified: qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/jexception.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/jexception.cpp?rev=1542066&r1=1542065&r2=1542066&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/jexception.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/jexception.cpp Thu Nov 14 20:39:32 2013
@@ -27,10 +27,9 @@
 
 #define CATLEN(p) MAX_MSG_SIZE - std::strlen(p) - 1
 
-namespace qpid
-{
-namespace qls_jrnl
-{
+namespace qpid {
+namespace linearstore {
+namespace journal {
 
 jexception::jexception() throw ():
         std::exception(),
@@ -168,4 +167,4 @@ operator<<(std::ostream& os, const jexce
     return os;
 }
 
-}}
+}}}

Modified: qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/jexception.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/jexception.h?rev=1542066&r1=1542065&r2=1542066&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/jexception.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/jexception.h Thu Nov 14 20:39:32 2013
@@ -19,15 +19,14 @@
  *
  */
 
-#ifndef QPID_LEGACYSTORE_JRNL_JEXCEPTION_H
-#define QPID_LEGACYSTORE_JRNL_JEXCEPTION_H
+#ifndef QPID_LINEARSTORE_JOURNAL_JEXCEPTION_H
+#define QPID_LINEARSTORE_JOURNAL_JEXCEPTION_H
 
-namespace qpid
-{
-namespace qls_jrnl
-{
+namespace qpid {
+namespace linearstore {
+namespace journal {
 class jexception;
-}}
+}}}
 
 #include <cerrno>
 #include <cstdio>
@@ -70,10 +69,10 @@ class jexception;
     ::abort(); \
     }
 
-namespace qpid
-{
-namespace qls_jrnl
-{
+namespace qpid {
+namespace linearstore {
+namespace journal {
+
     /**
     * \class jexception
     * \brief Generic journal exception class
@@ -121,6 +120,6 @@ namespace qls_jrnl
         friend std::ostream& operator<<(std::ostream& os, const jexception* jePtr);
     }; // class jexception
 
-}}
+}}}
 
-#endif // ifndef QPID_LEGACYSTORE_JRNL_JEXCEPTION_H
+#endif // ifndef QPID_LINEARSTORE_JOURNAL_JEXCEPTION_H

Modified: qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/jrec.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/jrec.cpp?rev=1542066&r1=1542065&r2=1542066&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/jrec.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/jrec.cpp Thu Nov 14 20:39:32 2013
@@ -24,84 +24,15 @@
 #include <iomanip>
 #include "qpid/linearstore/jrnl/jerrno.h"
 #include "qpid/linearstore/jrnl/jexception.h"
+#include "qpid/linearstore/jrnl/utils/rec_hdr.h"
+#include "qpid/linearstore/jrnl/utils/rec_tail.h"
 #include <sstream>
 
-namespace qpid
-{
-namespace qls_jrnl
-{
+namespace qpid {
+namespace linearstore {
+namespace journal {
 
 jrec::jrec() {}
 jrec::~jrec() {}
 
-void
-jrec::chk_hdr(const rec_hdr_t& hdr)
-{
-    if (hdr._magic == 0)
-    {
-        std::ostringstream oss;
-        oss << std::hex << std::setfill('0');
-        oss << "enq magic NULL: rid=0x" << hdr._rid;
-        throw jexception(jerrno::JERR_JREC_BADRECHDR, oss.str(), "jrec", "chk_hdr");
-    }
-    if (hdr._version != QLS_JRNL_VERSION)
-    {
-        std::ostringstream oss;
-        oss << std::hex << std::setfill('0');
-        oss << "version: rid=0x" << hdr._rid;
-        oss << ": expected=0x" << std::setw(2) << (int)QLS_JRNL_VERSION;
-        oss << " read=0x" << std::setw(2) << (int)hdr._version;
-        throw jexception(jerrno::JERR_JREC_BADRECHDR, oss.str(), "jrec", "chk_hdr");
-    }
-//#if defined (JRNL_LITTLE_ENDIAN)
-//    uint8_t endian_flag = RHM_LENDIAN_FLAG;
-//#else
-//    uint8_t endian_flag = RHM_BENDIAN_FLAG;
-//#endif
-//    if (hdr._eflag != endian_flag)
-//    {
-//        std::ostringstream oss;
-//        oss << std::hex << std::setfill('0');
-//        oss << "endian_flag: rid=" << hdr._rid;
-//        oss << ": expected=0x" << std::setw(2) << (int)endian_flag;
-//        oss << " read=0x" << std::setw(2) << (int)hdr._eflag;
-//        throw jexception(jerrno::JERR_JREC_BADRECHDR, oss.str(), "jrec", "chk_hdr");
-//    }
-}
-
-void
-jrec::chk_rid(const rec_hdr_t& hdr, const uint64_t rid)
-{
-    if (hdr._rid != rid)
-    {
-        std::ostringstream oss;
-        oss << std::hex << std::setfill('0');
-        oss << "rid mismatch: expected=0x" << rid;
-        oss << " read=0x" << hdr._rid;
-        throw jexception(jerrno::JERR_JREC_BADRECHDR, oss.str(), "jrec", "chk_hdr");
-    }
-}
-
-void
-jrec::chk_tail(const rec_tail_t& tail, const rec_hdr_t& hdr)
-{
-    if (tail._xmagic != ~hdr._magic)
-    {
-        std::ostringstream oss;
-        oss << std::hex << std::setfill('0');
-        oss << "magic: rid=0x" << hdr._rid;
-        oss << ": expected=0x" << ~hdr._magic;
-        oss << " read=0x" << tail._xmagic;
-        throw jexception(jerrno::JERR_JREC_BADRECTAIL, oss.str(), "jrec", "chk_tail");
-    }
-    if (tail._rid != hdr._rid)
-    {
-        std::ostringstream oss;
-        oss << std::hex << std::setfill('0');
-        oss << "rid: rid=0x" << hdr._rid;
-        oss << ": read=0x" << tail._rid;
-        throw jexception(jerrno::JERR_JREC_BADRECTAIL, oss.str(), "jrec", "chk_tail");
-    }
-}
-
-}}
+}}}



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org