You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kp...@apache.org on 2013/11/14 21:39:34 UTC
svn commit: r1542066 [2/3] - in /qpid/trunk/qpid/cpp/src/qpid: legacystore/
linearstore/ linearstore/jrnl/ linearstore/jrnl/utils/
Modified: qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/RecoveryManager.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/RecoveryManager.cpp?rev=1542066&r1=1542065&r2=1542066&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/RecoveryManager.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/RecoveryManager.cpp Thu Nov 14 20:39:32 2013
@@ -44,8 +44,8 @@
#include <vector>
namespace qpid {
-namespace qls_jrnl
-{
+namespace linearstore {
+namespace journal {
RecoveryManager::RecoveryManager(const std::string& journalDirectory,
const std::string& queuename,
@@ -63,6 +63,7 @@ RecoveryManager::RecoveryManager(const s
highestRecordId_(0ULL),
highestFileNumber_(0ULL),
lastFileFullFlag_(false),
+ currentSerial_(0),
efpFileSize_kib_(0)
{}
@@ -154,9 +155,10 @@ bool RecoveryManager::readNextRemainingR
}
enq_map::emap_data_struct_t eds;
enqueueMapRef_.get_data(*recordIdListConstItr_, eds);
- uint64_t fileNumber = eds._pfid;
- currentJournalFileConstItr_ = fileNumberMap_.find(fileNumber);
- getNextFile(false);
+ if (!inFileStream_.is_open() || currentJournalFileConstItr_->first != eds._pfid) {
+ getFile(eds._pfid, false);
+ }
+//std::cout << " " << eds._pfid << std::hex << ",0x" << eds._file_posn << std::flush; // DEBUG
inFileStream_.seekg(eds._file_posn, std::ifstream::beg);
if (!inFileStream_.good()) {
@@ -174,7 +176,10 @@ bool RecoveryManager::readNextRemainingR
// check flags
transient = ::is_enq_transient(&enqueueHeader);
external = ::is_enq_external(&enqueueHeader);
-
+//char magicBuff[5]; // DEBUG
+//::memcpy(magicBuff, &enqueueHeader, 4); // DEBUG
+//magicBuff[4] = 0; // DEBUG
+//std::cout << std::hex << ":" << (char*)magicBuff << ",rid=0x" << enqueueHeader._rhdr._rid << ",xs=0x" << enqueueHeader._xidsize << ",ds=0x" << enqueueHeader._dsize << std::dec << std::flush; // DEBUG
// read xid
xidSize = enqueueHeader._xidsize;
*xidPtrPtr = ::malloc(xidSize);
@@ -217,57 +222,75 @@ void RecoveryManager::setLinearFileContr
}
}
-std::string RecoveryManager::toString(const std::string& jid,
- bool compact) {
+std::string RecoveryManager::toString(const std::string& jid) {
std::ostringstream oss;
- if (compact) {
- oss << "Recovery journal analysis (jid=\"" << jid << "\"):";
- oss << " jfl=[";
- for (fileNumberMapConstItr_t i=fileNumberMap_.begin(); i!=fileNumberMap_.end(); ++i) {
- if (i!=fileNumberMap_.begin()) {
- oss << " ";
- }
- std::string fqFileName = i->second->getFqFileName();
- oss << i->first << ":" << fqFileName.substr(fqFileName.rfind('/')+1);
- }
- oss << "] ecl=[ ";
- for (fileNumberMapConstItr_t j=fileNumberMap_.begin(); j!=fileNumberMap_.end(); ++j) {
- if (j!=fileNumberMap_.begin()) {
- oss << " ";
- }
- oss << j->second->getEnqueuedRecordCount();
- }
- oss << " ] empty=" << (journalEmptyFlag_ ? "T" : "F");
- oss << " fro=0x" << std::hex << firstRecordOffset_ << std::dec << " (" << (firstRecordOffset_/QLS_DBLK_SIZE_BYTES) << " dblks)";
- oss << " eo=0x" << std::hex << endOffset_ << std::dec << " (" << (endOffset_/QLS_DBLK_SIZE_BYTES) << " dblks)";
- oss << " hrid=0x" << std::hex << highestRecordId_ << std::dec;
- oss << " hfnum=0x" << std::hex << highestFileNumber_ << std::dec;
- oss << " lffull=" << (lastFileFullFlag_ ? "T" : "F");
+ oss << "Recovery journal analysis (jid=\"" << jid << "\"):" << std::endl;
+ oss << " Number of journal files = " << fileNumberMap_.size() << std::endl;
+ oss << " Journal File List:" << std::endl;
+ for (fileNumberMapConstItr_t k=fileNumberMap_.begin(); k!=fileNumberMap_.end(); ++k) {
+ std::string fqFileName = k->second->getFqFileName();
+ oss << " " << k->first << ": " << fqFileName.substr(fqFileName.rfind('/')+1) << std::endl;
+ }
+ oss << " Enqueue Counts: [ ";
+ for (fileNumberMapConstItr_t l=fileNumberMap_.begin(); l!=fileNumberMap_.end(); ++l) {
+ if (l != fileNumberMap_.begin()) {
+ oss << ", ";
+ }
+ oss << l->second->getEnqueuedRecordCount();
+ }
+ oss << " ]" << std::endl;
+ oss << " Journal empty = " << (journalEmptyFlag_ ? "TRUE" : "FALSE") << std::endl;
+ oss << " First record offset in first file = 0x" << std::hex << firstRecordOffset_ <<
+ std::dec << " (" << (firstRecordOffset_/QLS_DBLK_SIZE_BYTES) << " dblks)" << std::endl;
+ oss << " End offset = 0x" << std::hex << endOffset_ << std::dec << " (" <<
+ (endOffset_/QLS_DBLK_SIZE_BYTES) << " dblks)" << std::endl;
+ oss << " Highest rid = 0x" << std::hex << highestRecordId_ << std::dec << std::endl;
+ oss << " Highest file number = 0x" << std::hex << highestFileNumber_ << std::dec << std::endl;
+ oss << " Last file full = " << (lastFileFullFlag_ ? "TRUE" : "FALSE") << std::endl;
+ oss << " Enqueued records (txn & non-txn):" << std::endl;
+ return oss.str();
+}
+
+std::string RecoveryManager::toLog(const std::string& jid, const int indent) {
+ std::string indentStr(indent, ' ');
+ std::ostringstream oss;
+ oss << std::endl << indentStr << "Journal recovery analysis (jid=\"" << jid << "\"):" << std::endl;
+ if (journalEmptyFlag_) {
+ oss << indentStr << "<Journal empty, no journal files found>" << std::endl;
} else {
- oss << "Recovery journal analysis (jid=\"" << jid << "\"):" << std::endl;
- oss << " Number of journal files = " << fileNumberMap_.size() << std::endl;
- oss << " Journal File List:" << std::endl;
+ oss << indentStr << std::setw(7) << "file_id"
+ << std::setw(43) << "file_name"
+ << std::setw(16) << "fro"
+ << std::setw(12) << "record_cnt"
+ << std::setw(5) << "ptn"
+ << std::setw(10) << "efp"
+ << std::endl;
+ oss << indentStr << std::setw(7) << "-------"
+ << std::setw(43) << "-----------------------------------------"
+ << std::setw(16) << "--------------"
+ << std::setw(12) << "----------"
+ << std::setw(5) << "---"
+ << std::setw(10) << "--------"
+ << std::endl;
for (fileNumberMapConstItr_t k=fileNumberMap_.begin(); k!=fileNumberMap_.end(); ++k) {
std::string fqFileName = k->second->getFqFileName();
- oss << " " << k->first << ": " << fqFileName.substr(fqFileName.rfind('/')+1) << std::endl;
+ std::ostringstream fro;
+ fro << std::hex << "0x" << k->second->getFirstRecordOffset();
+ oss << indentStr << std::setw(7) << k->first
+ << std::setw(43) << fqFileName.substr(fqFileName.rfind('/')+1)
+ << std::setw(16) << fro.str()
+ << std::setw(12) << k->second->getEnqueuedRecordCount()
+ << std::setw(5) << k->second->getEfpIdentity().pn_
+ << std::setw(9) << k->second->getEfpIdentity().ds_ << "k"
+ << std::endl;
}
- oss << " Enqueue Counts: [ " << std::endl;
- for (fileNumberMapConstItr_t l=fileNumberMap_.begin(); l!=fileNumberMap_.end(); ++l) {
- if (l != fileNumberMap_.begin()) {
- oss << ", ";
- }
- oss << l->second->getEnqueuedRecordCount();
- }
- oss << " ]" << std::endl;
- oss << " Journal empty = " << (journalEmptyFlag_ ? "TRUE" : "FALSE") << std::endl;
- oss << " First record offset in first file = 0x" << std::hex << firstRecordOffset_ <<
+ oss << indentStr << "First record offset in first file = 0x" << std::hex << firstRecordOffset_ <<
std::dec << " (" << (firstRecordOffset_/QLS_DBLK_SIZE_BYTES) << " dblks)" << std::endl;
- oss << " End offset = 0x" << std::hex << endOffset_ << std::dec << " (" <<
+ oss << indentStr << "End offset in last file = 0x" << std::hex << endOffset_ << std::dec << " (" <<
(endOffset_/QLS_DBLK_SIZE_BYTES) << " dblks)" << std::endl;
- oss << " Highest rid = 0x" << std::hex << highestRecordId_ << std::dec << std::endl;
- oss << " Highest file number = 0x" << std::hex << highestFileNumber_ << std::dec << std::endl;
- oss << " Last file full = " << (lastFileFullFlag_ ? "TRUE" : "FALSE") << std::endl;
- oss << " Enqueued records (txn & non-txn):" << std::endl;
+ oss << indentStr << "Highest rid found = 0x" << std::hex << highestRecordId_ << std::dec << std::endl;
+ oss << indentStr << "Last file full = " << (lastFileFullFlag_ ? "TRUE" : "FALSE") << std::endl;
+ oss << indentStr << "Enqueued records (txn & non-txn):";
}
return oss.str();
}
@@ -355,9 +378,9 @@ void RecoveryManager::checkJournalAlignm
}
bool RecoveryManager::decodeRecord(jrec& record,
- std::size_t& cumulativeSizeRead,
- ::rec_hdr_t& headerRecord,
- std::streampos& fileOffset)
+ std::size_t& cumulativeSizeRead,
+ ::rec_hdr_t& headerRecord,
+ std::streampos& fileOffset)
{
std::streampos start_file_offs = fileOffset;
@@ -370,15 +393,18 @@ bool RecoveryManager::decodeRecord(jrec&
bool done = false;
while (!done) {
try {
- done = record.rcv_decode(headerRecord, &inFileStream_, cumulativeSizeRead);
+ done = record.decode(headerRecord, &inFileStream_, cumulativeSizeRead);
}
catch (const jexception& e) {
+ journalLogRef_.log(JournalLog::LOG_INFO, queueName_, e.what());
checkJournalAlignment(start_file_offs);
return false;
}
- if (!done && !getNextFile(false)) {
- checkJournalAlignment(start_file_offs);
- return false;
+ if (!done && needNextFile()) {
+ if (!getNextFile(false)) {
+ checkJournalAlignment(start_file_offs);
+ return false;
+ }
}
}
return true;
@@ -392,45 +418,50 @@ uint64_t RecoveryManager::getCurrentFile
return currentJournalFileConstItr_->first;
}
-bool RecoveryManager::getNextFile(bool jumpToFirstRecordOffsetFlag) {
+bool RecoveryManager::getFile(const uint64_t fileNumber, bool jumpToFirstRecordOffsetFlag) {
if (inFileStream_.is_open()) {
- if (inFileStream_.eof() || !inFileStream_.good()) {
- inFileStream_.clear();
- endOffset_ = inFileStream_.tellg(); // remember file offset before closing
- if (endOffset_ == -1) {
- std::ostringstream oss;
- oss << "tellg() failure: fail=" << (inFileStream_.fail()?"T":"F") << " bad=" << (inFileStream_.bad()?"T":"F");
- throw jexception(jerrno::JERR_RCVM_STREAMBAD, oss.str(), "RecoveryManager", "getNextFile");
- }
- inFileStream_.close();
- if (++currentJournalFileConstItr_ == fileNumberMap_.end()) {
- return false;
- }
- }
- }
- if (!inFileStream_.is_open()) {
+ inFileStream_.close();
+//std::cout << " f=" << getCurrentFileName() << "]" << std::flush; // DEBUG
inFileStream_.clear(); // clear eof flag, req'd for older versions of c++
- inFileStream_.open(getCurrentFileName().c_str(), std::ios_base::in | std::ios_base::binary);
- if (!inFileStream_.good()) {
- throw jexception(jerrno::JERR__FILEIO, getCurrentFileName(), "RecoveryManager", "getNextFile");
- }
+ }
+ currentJournalFileConstItr_ = fileNumberMap_.find(fileNumber);
+ if (currentJournalFileConstItr_ == fileNumberMap_.end()) {
+ return false;
+ }
+ inFileStream_.open(getCurrentFileName().c_str(), std::ios_base::in | std::ios_base::binary);
+ if (!inFileStream_.good()) {
+ throw jexception(jerrno::JERR__FILEIO, getCurrentFileName(), "RecoveryManager", "getFile");
+ }
+//std::cout << " [F=" << getCurrentFileName() << std::flush; // DEBUG
- // Read file header
- file_hdr_t fhdr;
- inFileStream_.read((char*)&fhdr, sizeof(fhdr));
- checkFileStreamOk(true);
- if (fhdr._rhdr._magic == QLS_FILE_MAGIC) {
- firstRecordOffset_ = fhdr._fro;
- std::streamoff foffs = jumpToFirstRecordOffsetFlag ? firstRecordOffset_ : QLS_SBLK_SIZE_BYTES;
- inFileStream_.seekg(foffs);
- } else {
- inFileStream_.close();
- if (currentJournalFileConstItr_ == fileNumberMap_.begin()) {
- journalEmptyFlag_ = true;
- }
+ if (!readFileHeader()) {
+ return false;
+ }
+ std::streamoff foffs = jumpToFirstRecordOffsetFlag ? firstRecordOffset_ : QLS_JRNL_FHDR_RES_SIZE_SBLKS * QLS_SBLK_SIZE_BYTES;
+ inFileStream_.seekg(foffs);
+ return true;
+}
+
+bool RecoveryManager::getNextFile(bool jumpToFirstRecordOffsetFlag) {
+ if (inFileStream_.is_open()) {
+ inFileStream_.close();
+//std::cout << " .f=" << getCurrentFileName() << "]" << std::flush; // DEBUG
+ if (++currentJournalFileConstItr_ == fileNumberMap_.end()) {
return false;
}
+ inFileStream_.clear(); // clear eof flag, req'd for older versions of c++
+ }
+ inFileStream_.open(getCurrentFileName().c_str(), std::ios_base::in | std::ios_base::binary);
+ if (!inFileStream_.good()) {
+ throw jexception(jerrno::JERR__FILEIO, getCurrentFileName(), "RecoveryManager", "getNextFile");
}
+//std::cout << " [.F=" << getCurrentFileName() << std::flush; // DEBUG
+
+ if (!readFileHeader()) {
+ return false;
+ }
+ std::streamoff foffs = jumpToFirstRecordOffsetFlag ? firstRecordOffset_ : QLS_JRNL_FHDR_RES_SIZE_SBLKS * QLS_SBLK_SIZE_BYTES;
+ inFileStream_.seekg(foffs);
return true;
}
@@ -443,7 +474,7 @@ bool RecoveryManager::getNextRecordHeade
bool hdr_ok = false;
std::streampos file_pos;
while (!hdr_ok) {
- if (!inFileStream_.is_open()) {
+ if (needNextFile()) {
if (!getNextFile(true)) {
return false;
}
@@ -458,8 +489,10 @@ bool RecoveryManager::getNextRecordHeade
if (inFileStream_.gcount() == sizeof(rec_hdr_t)) {
hdr_ok = true;
} else {
- if (!getNextFile(true)) {
- return false;
+ if (needNextFile()) {
+ if (!getNextFile(true)) {
+ return false;
+ }
}
}
}
@@ -468,6 +501,10 @@ bool RecoveryManager::getNextRecordHeade
case QLS_ENQ_MAGIC:
{
//std::cout << " 0x" << std::hex << file_pos << ".e.0x" << h._rid << std::dec << std::flush; // DEBUG
+ if (::rec_hdr_check(&h, QLS_ENQ_MAGIC, QLS_JRNL_VERSION, currentSerial_) != 0) {
+ endOffset_ = file_pos;
+ return false;
+ }
enq_rec er;
uint64_t start_fid = getCurrentFileNumber(); // fid may increment in decode() if record folds over file boundary
if (!decodeRecord(er, cum_size_read, h, file_pos)) {
@@ -502,6 +539,10 @@ bool RecoveryManager::getNextRecordHeade
case QLS_DEQ_MAGIC:
{
//std::cout << " 0x" << std::hex << file_pos << ".d.0x" << h._rid << std::dec << std::flush; // DEBUG
+ if (::rec_hdr_check(&h, QLS_DEQ_MAGIC, QLS_JRNL_VERSION, currentSerial_) != 0) {
+ endOffset_ = file_pos;
+ return false;
+ }
deq_rec dr;
uint16_t start_fid = getCurrentFileNumber(); // fid may increment in decode() if record folds over file boundary
if (!decodeRecord(dr, cum_size_read, h, file_pos)) {
@@ -534,6 +575,10 @@ bool RecoveryManager::getNextRecordHeade
case QLS_TXA_MAGIC:
{
//std::cout << " 0x" << std::hex << file_pos << ".a.0x" << h._rid << std::dec << std::flush; // DEBUG
+ if (::rec_hdr_check(&h, QLS_TXA_MAGIC, QLS_JRNL_VERSION, currentSerial_) != 0) {
+ endOffset_ = file_pos;
+ return false;
+ }
txn_rec ar;
if (!decodeRecord(ar, cum_size_read, h, file_pos)) {
return false;
@@ -558,6 +603,10 @@ bool RecoveryManager::getNextRecordHeade
case QLS_TXC_MAGIC:
{
//std::cout << " 0x" << std::hex << file_pos << ".c.0x" << h._rid << std::dec << std::flush; // DEBUG
+ if (::rec_hdr_check(&h, QLS_TXC_MAGIC, QLS_JRNL_VERSION, currentSerial_) != 0) {
+ endOffset_ = file_pos;
+ return false;
+ }
txn_rec cr;
if (!decodeRecord(cr, cum_size_read, h, file_pos)) {
return false;
@@ -593,8 +642,10 @@ bool RecoveryManager::getNextRecordHeade
uint32_t rec_dblks = jrec::size_dblks(sizeof(::rec_hdr_t));
inFileStream_.ignore(rec_dblks * QLS_DBLK_SIZE_BYTES - sizeof(::rec_hdr_t));
checkFileStreamOk(false);
- if (!getNextFile(false)) {
- return false;
+ if (needNextFile()) {
+ if (!getNextFile(false)) {
+ return false;
+ }
}
}
break;
@@ -611,6 +662,13 @@ bool RecoveryManager::getNextRecordHeade
return true;
}
+bool RecoveryManager::needNextFile() {
+ if (inFileStream_.is_open()) {
+ return inFileStream_.eof() || inFileStream_.tellg() >= std::streampos(efpFileSize_kib_ * 1024);
+ }
+ return true;
+}
+
void RecoveryManager::readJournalData(char* target,
const std::streamsize readSize) {
std::streamoff bytesRead = 0;
@@ -624,7 +682,9 @@ void RecoveryManager::readJournalData(ch
inFileStream_.read(target + bytesRead, readSize - bytesRead);
std::streamoff thisReadSize = inFileStream_.gcount();
if (thisReadSize < readSize) {
- getNextFile(false);
+ if (needNextFile()) {
+ getNextFile(false);
+ }
file_pos = inFileStream_.tellg();
if (file_pos == std::streampos(-1)) {
std::ostringstream oss;
@@ -636,6 +696,23 @@ void RecoveryManager::readJournalData(ch
}
}
+bool RecoveryManager::readFileHeader() {
+ file_hdr_t fhdr;
+ inFileStream_.read((char*)&fhdr, sizeof(fhdr));
+ checkFileStreamOk(true);
+ if (::file_hdr_check(&fhdr, QLS_FILE_MAGIC, QLS_JRNL_VERSION, efpFileSize_kib_) != 0) {
+ firstRecordOffset_ = fhdr._fro;
+ currentSerial_ = fhdr._rhdr._serial;
+ } else {
+ inFileStream_.close();
+ if (currentJournalFileConstItr_ == fileNumberMap_.begin()) {
+ journalEmptyFlag_ = true;
+ }
+ return false;
+ }
+ return true;
+}
+
// static private
void RecoveryManager::readJournalFileHeader(const std::string& journalFileName,
::file_hdr_t& fileHeaderRef,
@@ -670,4 +747,4 @@ void RecoveryManager::removeEmptyFiles(E
}
}
-}} // namespace qpid::qls_jrnl
+}}}
Modified: qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/RecoveryManager.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/RecoveryManager.h?rev=1542066&r1=1542065&r2=1542066&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/RecoveryManager.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/RecoveryManager.h Thu Nov 14 20:39:32 2013
@@ -19,8 +19,8 @@
*
*/
-#ifndef QPID_LINEARSTORE_RECOVERYSTATE_H_
-#define QPID_LINEARSTORE_RECOVERYSTATE_H_
+#ifndef QPID_LINEARSTORE_JOURNAL_RECOVERYSTATE_H_
+#define QPID_LINEARSTORE_JOURNAL_RECOVERYSTATE_H_
#include <deque>
#include <fstream>
@@ -33,7 +33,8 @@ struct file_hdr_t;
struct rec_hdr_t;
namespace qpid {
-namespace qls_jrnl {
+namespace linearstore {
+namespace journal {
class data_tok;
class enq_map;
@@ -72,6 +73,7 @@ protected:
bool lastFileFullFlag_; ///< Last file is full
// State for recovery of individual enqueued records
+ uint64_t currentSerial_;
uint32_t efpFileSize_kib_;
fileNumberMapConstItr_t currentJournalFileConstItr_;
std::string currentFileName_;
@@ -104,8 +106,8 @@ public:
bool ignore_pending_txns);
void setLinearFileControllerJournals(lfcAddJournalFileFn fnPtr,
LinearFileController* lfcPtr);
- std::string toString(const std::string& jid,
- bool compact = true);
+ std::string toString(const std::string& jid);
+ std::string toLog(const std::string& jid, const int indent);
protected:
void analyzeJournalFileHeaders(efpIdentity_t& efpIdentity);
void checkFileStreamOk(bool checkEof);
@@ -116,8 +118,11 @@ protected:
std::streampos& fileOffset);
std::string getCurrentFileName() const;
uint64_t getCurrentFileNumber() const;
+ bool getFile(const uint64_t fileNumber, bool jumpToFirstRecordOffsetFlag);
bool getNextFile(bool jumpToFirstRecordOffsetFlag);
bool getNextRecordHeader();
+ bool needNextFile();
+ bool readFileHeader();
void readJournalData(char* target, const std::streamsize size);
void removeEmptyFiles(EmptyFilePool* emptyFilePoolPtr);
@@ -126,6 +131,6 @@ protected:
std::string& queueName);
};
-}} // namespace qpid::qls_jrnl
+}}}
-#endif // QPID_LINEARSTORE_RECOVERYSTATE_H_
+#endif // QPID_LINEARSTORE_JOURNAL_RECOVERYSTATE_H_
Modified: qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/aio.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/aio.h?rev=1542066&r1=1542065&r2=1542066&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/aio.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/aio.h Thu Nov 14 20:39:32 2013
@@ -19,17 +19,16 @@
*
*/
-#ifndef QPID_LEGACYSTORE_JRNL_AIO_H
-#define QPID_LEGACYSTORE_JRNL_AIO_H
+#ifndef QPID_LINEARSTORE_JOURNAL_AIO_H
+#define QPID_LINEARSTORE_JOURNAL_AIO_H
#include <libaio.h>
#include <cstring>
#include <string.h>
-namespace qpid
-{
-namespace qls_jrnl
-{
+namespace qpid {
+namespace linearstore {
+namespace journal {
typedef iocb aio_cb;
typedef io_event aio_event;
@@ -135,6 +134,6 @@ public:
}
};
-}}
+}}}
-#endif // ifndef QPID_LEGACYSTORE_JRNL_AIO_H
+#endif // ifndef QPID_LINEARSTORE_JOURNAL_AIO_H
Modified: qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/aio_callback.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/aio_callback.h?rev=1542066&r1=1542065&r2=1542066&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/aio_callback.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/aio_callback.h Thu Nov 14 20:39:32 2013
@@ -19,27 +19,26 @@
*
*/
-#ifndef QPID_LEGACYSTORE_JRNL_AIO_CALLBACK_H
-#define QPID_LEGACYSTORE_JRNL_AIO_CALLBACK_H
+#ifndef QPID_LINEARSTORE_JOURNAL_AIO_CALLBACK_H
+#define QPID_LINEARSTORE_JOURNAL_AIO_CALLBACK_H
#include <stdint.h>
#include <vector>
-namespace qpid
-{
-namespace qls_jrnl
-{
+namespace qpid {
+namespace linearstore {
+namespace journal {
- class data_tok;
+class data_tok;
- class aio_callback
- {
- public:
- virtual ~aio_callback() {}
- virtual void wr_aio_cb(std::vector<data_tok*>& dtokl) = 0;
- virtual void rd_aio_cb(std::vector<uint16_t>& pil) = 0;
- };
+class aio_callback
+{
+public:
+ virtual ~aio_callback() {}
+ virtual void wr_aio_cb(std::vector<data_tok*>& dtokl) = 0;
+ virtual void rd_aio_cb(std::vector<uint16_t>& pil) = 0;
+};
-}}
+}}}
-#endif // ifndef QPID_LEGACYSTORE_JRNL_AIO_CALLBACK_H
+#endif // ifndef QPID_LINEARSTORE_JOURNAL_AIO_CALLBACK_H
Modified: qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/cvar.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/cvar.h?rev=1542066&r1=1542065&r2=1542066&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/cvar.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/cvar.h Thu Nov 14 20:39:32 2013
@@ -19,8 +19,8 @@
*
*/
-#ifndef QPID_LEGACYSTORE_JRNL_CVAR_H
-#define QPID_LEGACYSTORE_JRNL_CVAR_H
+#ifndef QPID_LINEARSTORE_JOURNAL_CVAR_H
+#define QPID_LINEARSTORE_JOURNAL_CVAR_H
#include <cstring>
#include "qpid/linearstore/jrnl/jerrno.h"
@@ -30,10 +30,9 @@
#include <pthread.h>
#include <sstream>
-namespace qpid
-{
-namespace qls_jrnl
-{
+namespace qpid {
+namespace linearstore {
+namespace journal {
// Ultra-simple thread condition variable class
class cvar
@@ -71,6 +70,6 @@ namespace qls_jrnl
}
};
-}}
+}}}
-#endif // ifndef QPID_LEGACYSTORE_JRNL_CVAR_H
+#endif // ifndef QPID_LINEARSTORE_JOURNAL_CVAR_H
Modified: qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/data_tok.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/data_tok.cpp?rev=1542066&r1=1542065&r2=1542066&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/data_tok.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/data_tok.cpp Thu Nov 14 20:39:32 2013
@@ -27,10 +27,9 @@
#include "qpid/linearstore/jrnl/slock.h"
#include <sstream>
-namespace qpid
-{
-namespace qls_jrnl
-{
+namespace qpid {
+namespace linearstore {
+namespace journal {
// Static members
@@ -39,10 +38,8 @@ smutex data_tok::_mutex;
data_tok::data_tok():
_wstate(NONE),
-// _rstate(UNREAD),
_dsize(0),
_dblks_written(0),
-// _dblks_read(0),
_pg_cnt(0),
_fid(0),
_rid(0),
@@ -106,58 +103,12 @@ data_tok::wstate_str(write_state wstate)
return "<wstate unknown>";
}
-/*
-const char*
-data_tok::rstate_str() const
-{
- return rstate_str(_rstate);
-}
-*/
-
-/*
-const char*
-data_tok::rstate_str(read_state rstate)
-{
- switch (rstate)
- {
- case NONE:
- return "NONE";
- case READ_PART:
- return "READ_PART";
- case SKIP_PART:
- return "SKIP_PART";
- case READ:
- return "READ";
- // Not using default: forces compiler to ensure all cases are covered.
- }
- return "<rstate unknown>";
-}
-*/
-
-/*
-void
-data_tok::set_rstate(const read_state rstate)
-{
- if (_wstate != ENQ && rstate != UNREAD)
- {
- std::ostringstream oss;
- oss << "Attempted to change read state to " << rstate_str(rstate);
- oss << " while write state is not enqueued (wstate ENQ); wstate=" << wstate_str() << ".";
- throw jexception(jerrno::JERR_DTOK_ILLEGALSTATE, oss.str(), "data_tok",
- "set_rstate");
- }
- _rstate = rstate;
-}
-*/
-
void
data_tok::reset()
{
_wstate = NONE;
-// _rstate = UNREAD;
_dsize = 0;
_dblks_written = 0;
-// _dblks_read = 0;
_pg_cnt = 0;
_fid = 0;
_rid = 0;
@@ -185,4 +136,4 @@ data_tok::status_str() const
return oss.str();
}
-}}
+}}}
Modified: qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/data_tok.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/data_tok.h?rev=1542066&r1=1542065&r2=1542066&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/data_tok.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/data_tok.h Thu Nov 14 20:39:32 2013
@@ -19,15 +19,14 @@
*
*/
-#ifndef QPID_LEGACYSTORE_JRNL_DATA_TOK_H
-#define QPID_LEGACYSTORE_JRNL_DATA_TOK_H
+#ifndef QPID_LINEARSTORE_JOURNAL_DATA_TOK_H
+#define QPID_LINEARSTORE_JOURNAL_DATA_TOK_H
-namespace qpid
-{
-namespace qls_jrnl
-{
+namespace qpid {
+namespace linearstore {
+namespace journal {
class data_tok;
-}}
+}}}
#include <cassert>
#include <cstddef>
@@ -35,10 +34,9 @@ class data_tok;
#include <pthread.h>
#include <string>
-namespace qpid
-{
-namespace qls_jrnl
-{
+namespace qpid {
+namespace linearstore {
+namespace journal {
/**
* \class data_tok
@@ -72,25 +70,13 @@ namespace qls_jrnl
COMMITTED
};
-/*
- enum read_state
- {
- UNREAD, ///< Data block not read
- READ_PART, ///< Data block is part-read; waiting for page buffer to fill
- SKIP_PART, ///< Prev. dequeued dblock is part-skipped; waiting for page buffer to fill
- READ ///< Data block is fully read
- };
-*/
-
protected:
static smutex _mutex;
static uint64_t _cnt;
uint64_t _icnt;
write_state _wstate; ///< Enqueued / dequeued state of data
-// read_state _rstate; ///< Read state of data
std::size_t _dsize; ///< Data size in bytes
uint32_t _dblks_written; ///< Data blocks read/written
-// uint32_t _dblks_read; ///< Data blocks read/written
uint32_t _pg_cnt; ///< Page counter - incr for each page containing part of data
uint64_t _fid; ///< FID containing header of enqueue record
uint64_t _rid; ///< RID of data set by enqueue operation
@@ -106,16 +92,11 @@ namespace qls_jrnl
inline write_state wstate() const { return _wstate; }
const char* wstate_str() const;
static const char* wstate_str(write_state wstate);
-// inline read_state rstate() const { return _rstate; }
-// const char* rstate_str() const;
-// static const char* rstate_str(read_state rstate);
inline bool is_writable() const { return _wstate == NONE || _wstate == ENQ_PART; }
inline bool is_enqueued() const { return _wstate == ENQ; }
inline bool is_readable() const { return _wstate == ENQ; }
-// inline bool is_read() const { return _rstate == READ; }
inline bool is_dequeueable() const { return _wstate == ENQ || _wstate == DEQ_PART; }
inline void set_wstate(const write_state wstate) { _wstate = wstate; }
-// void set_rstate(const read_state rstate);
inline std::size_t dsize() const { return _dsize; }
inline void set_dsize(std::size_t dsize) { _dsize = dsize; }
@@ -124,10 +105,6 @@ namespace qls_jrnl
{ _dblks_written += dblks_written; }
inline void set_dblocks_written(uint32_t dblks_written) { _dblks_written = dblks_written; }
-// inline uint32_t dblocks_read() const { return _dblks_read; }
-// inline void incr_dblocks_read(uint32_t dblks_read) { _dblks_read += dblks_read; }
-// inline void set_dblocks_read(uint32_t dblks_read) { _dblks_read = dblks_read; }
-
inline uint32_t pg_cnt() const { return _pg_cnt; }
inline uint32_t incr_pg_cnt() { return ++_pg_cnt; }
inline uint32_t decr_pg_cnt() { assert(_pg_cnt != 0); return --_pg_cnt; }
@@ -154,7 +131,6 @@ namespace qls_jrnl
std::string status_str() const;
};
-} // namespace qls_jrnl
-} // namespace jrnl
+}}}
-#endif // ifndef QPID_LEGACYSTORE_JRNL_DATA_TOK_H
+#endif // ifndef QPID_LINEARSTORE_JOURNAL_DATA_TOK_H
Modified: qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/deq_rec.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/deq_rec.cpp?rev=1542066&r1=1542065&r2=1542066&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/deq_rec.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/deq_rec.cpp Thu Nov 14 20:39:32 2013
@@ -19,87 +19,43 @@
*
*/
-/**
- * \file deq_rec.cpp
- *
- * Qpid asynchronous store plugin library
- *
- * This file contains the code for the mrg::journal::deq_rec (journal dequeue
- * record) class. See comments in file deq_rec.h for details.
- *
- * \author Kim van der Riet
- */
-
#include "qpid/linearstore/jrnl/deq_rec.h"
-#include "qpid/linearstore/jrnl/utils/deq_hdr.h"
-#include "qpid/linearstore/jrnl/utils/rec_tail.h"
#include <cassert>
-#include <cerrno>
-#include <cstdlib>
#include <cstring>
#include <iomanip>
-#include "qpid/linearstore/jrnl/jerrno.h"
#include "qpid/linearstore/jrnl/jexception.h"
-#include <sstream>
-namespace qpid
-{
-namespace qls_jrnl
-{
+namespace qpid {
+namespace linearstore {
+namespace journal {
deq_rec::deq_rec():
-// _deq_hdr(QLS_DEQ_MAGIC, QLS_JRNL_VERSION, 0, 0, 0, false),
_xidp(0),
_buff(0)
-// _deq_tail(_deq_hdr)
{
- ::deq_hdr_init(&_deq_hdr, QLS_DEQ_MAGIC, QLS_JRNL_VERSION, 0, 0, 0, 0);
+ ::deq_hdr_init(&_deq_hdr, QLS_DEQ_MAGIC, QLS_JRNL_VERSION, 0, 0, 0, 0, 0);
::rec_tail_copy(&_deq_tail, &_deq_hdr._rhdr, 0);
}
-deq_rec::deq_rec(const uint64_t rid, const uint64_t drid, const void* const xidp,
- const std::size_t xidlen, const bool txn_coml_commit):
-// _deq_hdr(QLS_DEQ_MAGIC, QLS_JRNL_VERSION, rid, drid, xidlen, owi, txn_coml_commit),
- _xidp(xidp),
- _buff(0)
-// _deq_tail(_deq_hdr)
-{
- ::deq_hdr_init(&_deq_hdr, QLS_DEQ_MAGIC, QLS_JRNL_VERSION, 0, rid, drid, xidlen);
- ::rec_tail_copy(&_deq_tail, &_deq_hdr._rhdr, 0);
- ::set_txn_coml_commit(&_deq_hdr, txn_coml_commit);
-}
-
deq_rec::~deq_rec()
{
clean();
}
void
-deq_rec::reset()
-{
- _deq_hdr._rhdr._rid = 0;
-// _deq_hdr.set_owi(false);
- ::set_txn_coml_commit(&_deq_hdr, false);
- _deq_hdr._deq_rid = 0;
- _deq_hdr._xidsize = 0;
- _deq_tail._checksum = 0;
- _deq_tail._rid = 0;
- _xidp = 0;
- _buff = 0;
-}
-
-void
-deq_rec::reset(const uint64_t rid, const uint64_t drid, const void* const xidp,
- const std::size_t xidlen, const bool txn_coml_commit)
+deq_rec::reset(const uint64_t serial, const uint64_t rid, const uint64_t drid, const void* const xidp,
+ const std::size_t xidlen, const bool txn_coml_commit)
{
+ _deq_hdr._rhdr._serial = serial;
_deq_hdr._rhdr._rid = rid;
::set_txn_coml_commit(&_deq_hdr, txn_coml_commit);
_deq_hdr._deq_rid = drid;
_deq_hdr._xidsize = xidlen;
- _deq_tail._rid = rid;
_xidp = xidp;
_buff = 0;
+ _deq_tail._serial = serial;
+ _deq_tail._rid = rid;
}
uint32_t
@@ -214,132 +170,16 @@ deq_rec::encode(void* wptr, uint32_t rec
return size_dblks(wr_cnt);
}
-uint32_t
-deq_rec::decode(rec_hdr_t& h, void* rptr, uint32_t rec_offs_dblks, uint32_t max_size_dblks)
-{
- assert(rptr != 0);
- assert(max_size_dblks > 0);
-
- std::size_t rd_cnt = 0;
- if (rec_offs_dblks) // Continuation of record on new page
- {
- const uint32_t hdr_xid_dblks = size_dblks(sizeof(deq_hdr_t) + _deq_hdr._xidsize);
- const uint32_t hdr_xid_tail_dblks = size_dblks(sizeof(deq_hdr_t) + _deq_hdr._xidsize +
- sizeof(rec_tail_t));
- const std::size_t rec_offs = rec_offs_dblks * QLS_DBLK_SIZE_BYTES;
-
- if (hdr_xid_tail_dblks - rec_offs_dblks <= max_size_dblks)
- {
- // Remainder of xid fits within this page
- if (rec_offs - sizeof(deq_hdr_t) < _deq_hdr._xidsize)
- {
- // Part of xid still outstanding, copy remainder of xid and tail
- const std::size_t xid_offs = rec_offs - sizeof(deq_hdr_t);
- const std::size_t xid_rem = _deq_hdr._xidsize - xid_offs;
- std::memcpy((char*)_buff + xid_offs, rptr, xid_rem);
- rd_cnt = xid_rem;
- std::memcpy((void*)&_deq_tail, ((char*)rptr + rd_cnt), sizeof(_deq_tail));
- chk_tail();
- rd_cnt += sizeof(_deq_tail);
- }
- else
- {
- // Tail or part of tail only outstanding, complete tail
- const std::size_t tail_offs = rec_offs - sizeof(deq_hdr_t) - _deq_hdr._xidsize;
- const std::size_t tail_rem = sizeof(rec_tail_t) - tail_offs;
- std::memcpy((char*)&_deq_tail + tail_offs, rptr, tail_rem);
- chk_tail();
- rd_cnt = tail_rem;
- }
- }
- else if (hdr_xid_dblks - rec_offs_dblks <= max_size_dblks)
- {
- // Remainder of xid fits within this page, tail split
- const std::size_t xid_offs = rec_offs - sizeof(deq_hdr_t);
- const std::size_t xid_rem = _deq_hdr._xidsize - xid_offs;
- std::memcpy((char*)_buff + xid_offs, rptr, xid_rem);
- rd_cnt += xid_rem;
- const std::size_t tail_rem = (max_size_dblks * QLS_DBLK_SIZE_BYTES) - rd_cnt;
- if (tail_rem)
- {
- std::memcpy((void*)&_deq_tail, ((char*)rptr + xid_rem), tail_rem);
- rd_cnt += tail_rem;
- }
- }
- else
- {
- // Remainder of xid split
- const std::size_t xid_cp_size = (max_size_dblks * QLS_DBLK_SIZE_BYTES);
- std::memcpy((char*)_buff + rec_offs - sizeof(deq_hdr_t), rptr, xid_cp_size);
- rd_cnt += xid_cp_size;
- }
- }
- else // Start of record
- {
- // Get and check header
- //_deq_hdr.hdr_copy(h);
- ::rec_hdr_copy(&_deq_hdr._rhdr, &h);
- rd_cnt = sizeof(rec_hdr_t);
- _deq_hdr._deq_rid = *(uint64_t*)((char*)rptr + rd_cnt);
- rd_cnt += sizeof(uint64_t);
- _deq_hdr._xidsize = *(std::size_t*)((char*)rptr + rd_cnt);
- rd_cnt = sizeof(deq_hdr_t);
- chk_hdr();
- if (_deq_hdr._xidsize)
- {
- _buff = std::malloc(_deq_hdr._xidsize);
- MALLOC_CHK(_buff, "_buff", "deq_rec", "decode");
- const uint32_t hdr_xid_dblks = size_dblks(sizeof(deq_hdr_t) + _deq_hdr._xidsize);
- const uint32_t hdr_xid_tail_dblks = size_dblks(sizeof(deq_hdr_t) + _deq_hdr._xidsize +
- sizeof(rec_tail_t));
-
- // Check if record (header + xid + tail) fits within this page, we can check the
- // tail before the expense of copying data to memory
- if (hdr_xid_tail_dblks <= max_size_dblks)
- {
- // Entire header, xid and tail fits within this page
- std::memcpy(_buff, (char*)rptr + rd_cnt, _deq_hdr._xidsize);
- rd_cnt += _deq_hdr._xidsize;
- std::memcpy((void*)&_deq_tail, (char*)rptr + rd_cnt, sizeof(_deq_tail));
- rd_cnt += sizeof(_deq_tail);
- chk_tail();
- }
- else if (hdr_xid_dblks <= max_size_dblks)
- {
- // Entire header and xid fit within this page, tail split
- std::memcpy(_buff, (char*)rptr + rd_cnt, _deq_hdr._xidsize);
- rd_cnt += _deq_hdr._xidsize;
- const std::size_t tail_rem = (max_size_dblks * QLS_DBLK_SIZE_BYTES) - rd_cnt;
- if (tail_rem)
- {
- std::memcpy((void*)&_deq_tail, (char*)rptr + rd_cnt, tail_rem);
- rd_cnt += tail_rem;
- }
- }
- else
- {
- // Header fits within this page, xid split
- const std::size_t xid_cp_size = (max_size_dblks * QLS_DBLK_SIZE_BYTES) - rd_cnt;
- std::memcpy(_buff, (char*)rptr + rd_cnt, xid_cp_size);
- rd_cnt += xid_cp_size;
- }
- }
- }
- return size_dblks(rd_cnt);
-}
-
bool
-deq_rec::rcv_decode(rec_hdr_t h, std::ifstream* ifsp, std::size_t& rec_offs)
+deq_rec::decode(::rec_hdr_t& h, std::ifstream* ifsp, std::size_t& rec_offs)
{
+ uint32_t checksum = 0UL; // TODO: Add checksum math
if (rec_offs == 0)
{
//_deq_hdr.hdr_copy(h);
::rec_hdr_copy(&_deq_hdr._rhdr, &h);
- ifsp->read((char*)&_deq_hdr._deq_rid, sizeof(uint64_t));
- ifsp->read((char*)&_deq_hdr._xidsize, sizeof(std::size_t));
-#if defined(JRNL_32_BIT)
- ifsp->ignore(sizeof(uint32_t)); // _filler0
-#endif
+ ifsp->read((char*)&_deq_hdr._deq_rid, sizeof(_deq_hdr._deq_rid));
+ ifsp->read((char*)&_deq_hdr._xidsize, sizeof(_deq_hdr._xidsize));
rec_offs = sizeof(_deq_hdr);
// Read header, allocate (if req'd) for xid
if (_deq_hdr._xidsize)
@@ -382,9 +222,21 @@ deq_rec::rcv_decode(rec_hdr_t h, std::if
}
}
ifsp->ignore(rec_size_dblks() * QLS_DBLK_SIZE_BYTES - rec_size());
- if (_deq_hdr._xidsize)
- chk_tail(); // Throws if tail invalid or record incomplete
assert(!ifsp->fail() && !ifsp->bad());
+ if (_deq_hdr._xidsize) {
+ int res = ::rec_tail_check(&_deq_tail, &_deq_hdr._rhdr, checksum);
+ if (res != 0) {
+ std::stringstream oss;
+ switch (res) {
+ case 1: oss << std::hex << "Magic: expected 0x" << ~_deq_hdr._rhdr._magic << "; found 0x" << _deq_tail._xmagic; break;
+ case 2: oss << std::hex << "Serial: expected 0x" << _deq_hdr._rhdr._serial << "; found 0x" << _deq_tail._serial; break;
+ case 3: oss << std::hex << "Record Id: expected 0x" << _deq_hdr._rhdr._rid << "; found 0x" << _deq_tail._rid; break;
+ case 4: oss << std::hex << "Checksum: expected 0x" << checksum << "; found 0x" << _deq_tail._checksum; break;
+ default: oss << "Unknown error " << res;
+ }
+ throw jexception(jerrno::JERR_JREC_BADRECTAIL, oss.str(), "deq_rec", "decode"); // TODO: Don't throw exception, log info
+ }
+ }
return true;
}
@@ -427,38 +279,9 @@ deq_rec::rec_size() const
}
void
-deq_rec::chk_hdr() const
-{
- jrec::chk_hdr(_deq_hdr._rhdr);
- if (_deq_hdr._rhdr._magic != QLS_DEQ_MAGIC)
- {
- std::ostringstream oss;
- oss << std::hex << std::setfill('0');
- oss << "deq magic: rid=0x" << std::setw(16) << _deq_hdr._rhdr._rid;
- oss << ": expected=0x" << std::setw(8) << QLS_DEQ_MAGIC;
- oss << " read=0x" << std::setw(2) << (int)_deq_hdr._rhdr._magic;
- throw jexception(jerrno::JERR_JREC_BADRECHDR, oss.str(), "deq_rec", "chk_hdr");
- }
-}
-
-void
-deq_rec::chk_hdr(uint64_t rid) const
-{
- chk_hdr();
- jrec::chk_rid(_deq_hdr._rhdr, rid);
-}
-
-void
-deq_rec::chk_tail() const
-{
- jrec::chk_tail(_deq_tail, _deq_hdr._rhdr);
-}
-
-void
deq_rec::clean()
{
// clean up allocated memory here
}
-} // namespace journal
-} // namespace mrg
+}}}
Modified: qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/deq_rec.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/deq_rec.h?rev=1542066&r1=1542065&r2=1542066&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/deq_rec.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/deq_rec.h Thu Nov 14 20:39:32 2013
@@ -19,73 +19,51 @@
*
*/
-#ifndef QPID_LEGACYSTORE_JRNL_DEQ_REQ_H
-#define QPID_LEGACYSTORE_JRNL_DEQ_REQ_H
+#ifndef QPID_LINEARSTORE_JOURNAL_DEQ_REQ_H
+#define QPID_LINEARSTORE_JOURNAL_DEQ_REQ_H
-namespace qpid
-{
-namespace qls_jrnl
-{
-class deq_rec;
-}}
-
-#include <cstddef>
-#include "qpid/linearstore/jrnl/utils/deq_hdr.h"
#include "qpid/linearstore/jrnl/jrec.h"
+#include "qpid/linearstore/jrnl/utils/deq_hdr.h"
+#include "qpid/linearstore/jrnl/utils/rec_tail.h"
-namespace qpid
-{
-namespace qls_jrnl
+namespace qpid {
+namespace linearstore {
+namespace journal {
+
+/**
+* \class deq_rec
+* \brief Class to handle a single journal dequeue record.
+*/
+class deq_rec : public jrec
{
+private:
+ ::deq_hdr_t _deq_hdr; ///< Local instance of dequeue header struct
+ const void* _xidp; ///< xid pointer for encoding (writing to disk)
+ void* _buff; ///< Pointer to buffer to receive data read from disk
+ ::rec_tail_t _deq_tail; ///< Local instance of enqueue tail struct, only encoded if XID is present
+
+public:
+ deq_rec();
+ virtual ~deq_rec();
+
+ void reset(const uint64_t serial, const uint64_t rid, const uint64_t drid, const void* const xidp,
+ const std::size_t xidlen, const bool txn_coml_commit);
+ uint32_t encode(void* wptr, uint32_t rec_offs_dblks, uint32_t max_size_dblks);
+ bool decode(::rec_hdr_t& h, std::ifstream* ifsp, std::size_t& rec_offs);
+
+ inline bool is_txn_coml_commit() const { return ::is_txn_coml_commit(&_deq_hdr); }
+ inline uint64_t rid() const { return _deq_hdr._rhdr._rid; }
+ inline uint64_t deq_rid() const { return _deq_hdr._deq_rid; }
+ std::size_t get_xid(void** const xidpp);
+ std::string& str(std::string& str) const;
+ inline std::size_t data_size() const { return 0; } // This record never carries data
+ std::size_t xid_size() const;
+ std::size_t rec_size() const;
+
+private:
+ virtual void clean();
+};
- /**
- * \class deq_rec
- * \brief Class to handle a single journal dequeue record.
- */
- class deq_rec : public jrec
- {
- private:
- deq_hdr_t _deq_hdr; ///< Dequeue header
- const void* _xidp; ///< xid pointer for encoding (writing to disk)
- void* _buff; ///< Pointer to buffer to receive data read from disk
- rec_tail_t _deq_tail; ///< Record tail, only encoded if XID is present
-
- public:
- // constructor used for read operations and xid will have memory allocated
- deq_rec();
- // constructor used for write operations, where xid already exists
- deq_rec(const uint64_t rid, const uint64_t drid, const void* const xidp,
- const std::size_t xidlen, const bool txn_coml_commit);
- virtual ~deq_rec();
-
- // Prepare instance for use in reading data from journal
- void reset();
- // Prepare instance for use in writing data to journal
- void reset(const uint64_t rid, const uint64_t drid, const void* const xidp,
- const std::size_t xidlen, const bool txn_coml_commit);
- uint32_t encode(void* wptr, uint32_t rec_offs_dblks, uint32_t max_size_dblks);
- uint32_t decode(rec_hdr_t& h, void* rptr, uint32_t rec_offs_dblks,
- uint32_t max_size_dblks);
- // Decode used for recover
- bool rcv_decode(rec_hdr_t h, std::ifstream* ifsp, std::size_t& rec_offs);
-
- inline bool is_txn_coml_commit() const { return ::is_txn_coml_commit(&_deq_hdr); }
- inline uint64_t rid() const { return _deq_hdr._rhdr._rid; }
- inline uint64_t deq_rid() const { return _deq_hdr._deq_rid; }
- std::size_t get_xid(void** const xidpp);
- std::string& str(std::string& str) const;
- inline std::size_t data_size() const { return 0; } // This record never carries data
- std::size_t xid_size() const;
- std::size_t rec_size() const;
-
- private:
- virtual void chk_hdr() const;
- virtual void chk_hdr(uint64_t rid) const;
- virtual void chk_tail() const;
- virtual void clean();
- }; // class deq_rec
-
-} // namespace journal
-} // namespace mrg
+}}}
-#endif // ifndef QPID_LEGACYSTORE_JRNL_DEQ_REQ_H
+#endif // ifndef QPID_LINEARSTORE_JOURNAL_DEQ_REQ_H
Modified: qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/enq_map.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/enq_map.cpp?rev=1542066&r1=1542065&r2=1542066&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/enq_map.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/enq_map.cpp Thu Nov 14 20:39:32 2013
@@ -27,10 +27,9 @@
#include <sstream>
-namespace qpid
-{
-namespace qls_jrnl
-{
+namespace qpid {
+namespace linearstore {
+namespace journal {
// static return/error codes
int16_t enq_map::EMAP_DUP_RID = -3;
@@ -183,4 +182,4 @@ enq_map::pfid_list(std::vector<uint64_t>
}
}
-}}
+}}}
Modified: qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/enq_map.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/enq_map.h?rev=1542066&r1=1542065&r2=1542066&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/enq_map.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/enq_map.h Thu Nov 14 20:39:32 2013
@@ -19,15 +19,14 @@
*
*/
-#ifndef QPID_LEGACYSTORE_JRNL_ENQ_MAP_H
-#define QPID_LEGACYSTORE_JRNL_ENQ_MAP_H
+#ifndef QPID_LINEARSTORE_JOURNAL_ENQ_MAP_H
+#define QPID_LINEARSTORE_JOURNAL_ENQ_MAP_H
-namespace qpid
-{
-namespace qls_jrnl
-{
+namespace qpid {
+namespace linearstore {
+namespace journal {
class enq_map;
-}}
+}}}
#include "qpid/linearstore/jrnl/jexception.h"
#include "qpid/linearstore/jrnl/smutex.h"
@@ -35,10 +34,9 @@ class enq_map;
#include <pthread.h>
#include <vector>
-namespace qpid
-{
-namespace qls_jrnl
-{
+namespace qpid {
+namespace linearstore {
+namespace journal {
/**
* \class enq_map
@@ -107,6 +105,6 @@ namespace qls_jrnl
void pfid_list(std::vector<uint64_t>& fv);
};
-}}
+}}}
-#endif // ifndef QPID_LEGACYSTORE_JRNL_ENQ_MAP_H
+#endif // ifndef QPID_LINEARSTORE_JOURNAL_ENQ_MAP_H
Modified: qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/enq_rec.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/enq_rec.cpp?rev=1542066&r1=1542065&r2=1542066&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/enq_rec.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/enq_rec.cpp Thu Nov 14 20:39:32 2013
@@ -22,45 +22,22 @@
#include "qpid/linearstore/jrnl/enq_rec.h"
#include <cassert>
-#include <cerrno>
-#include <cstdlib>
#include <cstring>
#include <iomanip>
-#include "qpid/linearstore/jrnl/jerrno.h"
#include "qpid/linearstore/jrnl/jexception.h"
-#include <sstream>
-namespace qpid
-{
-namespace qls_jrnl
-{
+namespace qpid {
+namespace linearstore {
+namespace journal {
-// Constructor used for read operations, where buf contains preallocated space to receive data.
enq_rec::enq_rec():
jrec(), // superclass
- //_enq_hdr(QLS_ENQ_MAGIC, QLS_JRNL_VERSION, 0, 0, 0, false, false),
_xidp(0),
_data(0),
_buff(0)
- //_enq_tail(_enq_hdr)
-{
- ::enq_hdr_init(&_enq_hdr, QLS_ENQ_MAGIC, QLS_JRNL_VERSION, 0, 0, 0, false);
- ::rec_tail_copy(&_enq_tail, &_enq_hdr._rhdr, 0);
-}
-
-// Constructor used for transactional write operations, where dbuf contains data to be written.
-enq_rec::enq_rec(const uint64_t rid, const void* const dbuf, const std::size_t dlen,
- const void* const xidp, const std::size_t xidlen, const bool transient):
- jrec(), // superclass
- //_enq_hdr(QLS_ENQ_MAGIC, QLS_JRNL_VERSION, rid, xidlen, dlen, owi, transient),
- _xidp(xidp),
- _data(dbuf),
- _buff(0)
- //_enq_tail(_enq_hdr)
{
- ::enq_hdr_init(&_enq_hdr, QLS_ENQ_MAGIC, QLS_JRNL_VERSION, 0, rid, xidlen, dlen);
+ ::enq_hdr_init(&_enq_hdr, QLS_ENQ_MAGIC, QLS_JRNL_VERSION, 0, 0, 0, 0, false);
::rec_tail_copy(&_enq_tail, &_enq_hdr._rhdr, 0);
- ::set_enq_transient(&_enq_hdr, transient);
}
enq_rec::~enq_rec()
@@ -68,28 +45,11 @@ enq_rec::~enq_rec()
clean();
}
-// Prepare instance for use in reading data from journal, where buf contains preallocated space
-// to receive data.
void
-enq_rec::reset()
-{
- _enq_hdr._rhdr._rid = 0;
- ::set_enq_transient(&_enq_hdr, false);
- _enq_hdr._xidsize = 0;
- _enq_hdr._dsize = 0;
- _xidp = 0;
- _data = 0;
- _buff = 0;
- _enq_tail._rid = 0;
-}
-
-// Prepare instance for use in writing transactional data to journal, where dbuf contains data to
-// be written.
-void
-enq_rec::reset(const uint64_t rid, const void* const dbuf, const std::size_t dlen,
- const void* const xidp, const std::size_t xidlen, const bool transient,
- const bool external)
+enq_rec::reset(const uint64_t serial, const uint64_t rid, const void* const dbuf, const std::size_t dlen,
+ const void* const xidp, const std::size_t xidlen, const bool transient, const bool external)
{
+ _enq_hdr._rhdr._serial = serial;
_enq_hdr._rhdr._rid = rid;
::set_enq_transient(&_enq_hdr, transient);
::set_enq_external(&_enq_hdr, external);
@@ -98,6 +58,7 @@ enq_rec::reset(const uint64_t rid, const
_xidp = xidp;
_data = dbuf;
_buff = 0;
+ _enq_tail._serial = serial;
_enq_tail._rid = rid;
}
@@ -246,217 +207,19 @@ enq_rec::encode(void* wptr, uint32_t rec
return size_dblks(wr_cnt);
}
-uint32_t
-enq_rec::decode(rec_hdr_t& h, void* rptr, uint32_t rec_offs_dblks, uint32_t max_size_dblks)
-{
- assert(rptr != 0);
- assert(max_size_dblks > 0);
-
- std::size_t rd_cnt = 0;
- if (rec_offs_dblks) // Continuation of record on new page
- {
- const uint32_t hdr_xid_data_size = sizeof(enq_hdr_t) + _enq_hdr._xidsize +
- (::is_enq_external(&_enq_hdr) ? 0 : _enq_hdr._dsize);
- const uint32_t hdr_xid_data_tail_size = hdr_xid_data_size + sizeof(rec_tail_t);
- const uint32_t hdr_data_dblks = size_dblks(hdr_xid_data_size);
- const uint32_t hdr_tail_dblks = size_dblks(hdr_xid_data_tail_size);
- const std::size_t rec_offs = rec_offs_dblks * QLS_DBLK_SIZE_BYTES;
- const std::size_t offs = rec_offs - sizeof(enq_hdr_t);
-
- if (hdr_tail_dblks - rec_offs_dblks <= max_size_dblks)
- {
- // Remainder of record fits within this page
- if (offs < _enq_hdr._xidsize)
- {
- // some XID still outstanding, copy remainder of XID, data and tail
- const std::size_t rem = _enq_hdr._xidsize + _enq_hdr._dsize - offs;
- std::memcpy((char*)_buff + offs, rptr, rem);
- rd_cnt += rem;
- std::memcpy((void*)&_enq_tail, ((char*)rptr + rd_cnt), sizeof(_enq_tail));
- chk_tail();
- rd_cnt += sizeof(_enq_tail);
- }
- else if (offs < _enq_hdr._xidsize + _enq_hdr._dsize && !::is_enq_external(&_enq_hdr))
- {
- // some data still outstanding, copy remainder of data and tail
- const std::size_t data_offs = offs - _enq_hdr._xidsize;
- const std::size_t data_rem = _enq_hdr._dsize - data_offs;
- std::memcpy((char*)_buff + offs, rptr, data_rem);
- rd_cnt += data_rem;
- std::memcpy((void*)&_enq_tail, ((char*)rptr + rd_cnt), sizeof(_enq_tail));
- chk_tail();
- rd_cnt += sizeof(_enq_tail);
- }
- else
- {
- // Tail or part of tail only outstanding, complete tail
- const std::size_t tail_offs = rec_offs - sizeof(enq_hdr_t) - _enq_hdr._xidsize -
- _enq_hdr._dsize;
- const std::size_t tail_rem = sizeof(rec_tail_t) - tail_offs;
- std::memcpy((char*)&_enq_tail + tail_offs, rptr, tail_rem);
- chk_tail();
- rd_cnt = tail_rem;
- }
- }
- else if (hdr_data_dblks - rec_offs_dblks <= max_size_dblks)
- {
- // Remainder of xid & data fits within this page; tail split
-
- /*
- * TODO: This section needs revision. Since it is known that the end of the page falls within the
- * tail record, it is only necessary to write from the current offset to the end of the page under
- * all circumstances. The multiple if/else combinations may be eliminated, as well as one memcpy()
- * operation.
- *
- * Also note that Coverity has detected a possible memory overwrite in this block. It occurs if
- * both the following two if() stmsts (numbered) are false. With rd_cnt = 0, this would result in
- * the value of tail_rem > sizeof(tail_rec). Practically, this could only happen if the start and
- * end of a page both fall within the same tail record, in which case the tail would have to be
- * (much!) larger. However, the logic here does not account for this possibility.
- *
- * If the optimization above is undertaken, this code would probably be removed.
- */
- if (offs < _enq_hdr._xidsize) // 1
- {
- // some XID still outstanding, copy remainder of XID and data
- const std::size_t rem = _enq_hdr._xidsize + _enq_hdr._dsize - offs;
- std::memcpy((char*)_buff + offs, rptr, rem);
- rd_cnt += rem;
- }
- else if (offs < _enq_hdr._xidsize + _enq_hdr._dsize && !::is_enq_external(&_enq_hdr)) // 2
- {
- // some data still outstanding, copy remainder of data
- const std::size_t data_offs = offs - _enq_hdr._xidsize;
- const std::size_t data_rem = _enq_hdr._dsize - data_offs;
- std::memcpy((char*)_buff + offs, rptr, data_rem);
- rd_cnt += data_rem;
- }
- const std::size_t tail_rem = (max_size_dblks * QLS_DBLK_SIZE_BYTES) - rd_cnt;
- if (tail_rem)
- {
- std::memcpy((void*)&_enq_tail, ((char*)rptr + rd_cnt), tail_rem);
- rd_cnt += tail_rem;
- }
- }
- else
- {
- // Since xid and data are contiguous, both fit within current page - copy whole page
- const std::size_t data_cp_size = (max_size_dblks * QLS_DBLK_SIZE_BYTES);
- std::memcpy((char*)_buff + offs, rptr, data_cp_size);
- rd_cnt += data_cp_size;
- }
- }
- else // Start of record
- {
- // Get and check header
- //_enq_hdr.hdr_copy(h);
- ::rec_hdr_copy(&_enq_hdr._rhdr, &h);
- rd_cnt = sizeof(rec_hdr_t);
- _enq_hdr._xidsize = *(std::size_t*)((char*)rptr + rd_cnt);
- rd_cnt += sizeof(std::size_t);
-#if defined(JRNL_32_BIT)
- rd_cnt += sizeof(uint32_t); // Filler 0
-#endif
- _enq_hdr._dsize = *(std::size_t*)((char*)rptr + rd_cnt);
- rd_cnt = sizeof(enq_hdr_t);
- chk_hdr();
- if (_enq_hdr._xidsize + (::is_enq_external(&_enq_hdr) ? 0 : _enq_hdr._dsize))
- {
- _buff = std::malloc(_enq_hdr._xidsize + (::is_enq_external(&_enq_hdr) ? 0 : _enq_hdr._dsize));
- MALLOC_CHK(_buff, "_buff", "enq_rec", "decode");
-
- const uint32_t hdr_xid_size = sizeof(enq_hdr_t) + _enq_hdr._xidsize;
- const uint32_t hdr_xid_data_size = hdr_xid_size + (::is_enq_external(&_enq_hdr) ? 0 : _enq_hdr._dsize);
- const uint32_t hdr_xid_data_tail_size = hdr_xid_data_size + sizeof(rec_tail_t);
- const uint32_t hdr_xid_dblks = size_dblks(hdr_xid_size);
- const uint32_t hdr_data_dblks = size_dblks(hdr_xid_data_size);
- const uint32_t hdr_tail_dblks = size_dblks(hdr_xid_data_tail_size);
- // Check if record (header + data + tail) fits within this page, we can check the
- // tail before the expense of copying data to memory
- if (hdr_tail_dblks <= max_size_dblks)
- {
- // Header, xid, data and tail fits within this page
- if (_enq_hdr._xidsize)
- {
- std::memcpy(_buff, (char*)rptr + rd_cnt, _enq_hdr._xidsize);
- rd_cnt += _enq_hdr._xidsize;
- }
- if (_enq_hdr._dsize && !::is_enq_external(&_enq_hdr))
- {
- std::memcpy((char*)_buff + _enq_hdr._xidsize, (char*)rptr + rd_cnt,
- _enq_hdr._dsize);
- rd_cnt += _enq_hdr._dsize;
- }
- std::memcpy((void*)&_enq_tail, (char*)rptr + rd_cnt, sizeof(_enq_tail));
- chk_tail();
- rd_cnt += sizeof(_enq_tail);
- }
- else if (hdr_data_dblks <= max_size_dblks)
- {
- // Header, xid and data fit within this page, tail split or separated
- if (_enq_hdr._xidsize)
- {
- std::memcpy(_buff, (char*)rptr + rd_cnt, _enq_hdr._xidsize);
- rd_cnt += _enq_hdr._xidsize;
- }
- if (_enq_hdr._dsize && !::is_enq_external(&_enq_hdr))
- {
- std::memcpy((char*)_buff + _enq_hdr._xidsize, (char*)rptr + rd_cnt,
- _enq_hdr._dsize);
- rd_cnt += _enq_hdr._dsize;
- }
- const std::size_t tail_rem = (max_size_dblks * QLS_DBLK_SIZE_BYTES) - rd_cnt;
- if (tail_rem)
- {
- std::memcpy((void*)&_enq_tail, (char*)rptr + rd_cnt, tail_rem);
- rd_cnt += tail_rem;
- }
- }
- else if (hdr_xid_dblks <= max_size_dblks)
- {
- // Header and xid fits within this page, data split or separated
- if (_enq_hdr._xidsize)
- {
- std::memcpy(_buff, (char*)rptr + rd_cnt, _enq_hdr._xidsize);
- rd_cnt += _enq_hdr._xidsize;
- }
- if (_enq_hdr._dsize && !::is_enq_external(&_enq_hdr))
- {
- const std::size_t data_cp_size = (max_size_dblks * QLS_DBLK_SIZE_BYTES) - rd_cnt;
- std::memcpy((char*)_buff + _enq_hdr._xidsize, (char*)rptr + rd_cnt, data_cp_size);
- rd_cnt += data_cp_size;
- }
- }
- else
- {
- // Header fits within this page, xid split or separated
- const std::size_t data_cp_size = (max_size_dblks * QLS_DBLK_SIZE_BYTES) - rd_cnt;
- std::memcpy(_buff, (char*)rptr + rd_cnt, data_cp_size);
- rd_cnt += data_cp_size;
- }
- }
- }
- return size_dblks(rd_cnt);
-}
-
bool
-enq_rec::rcv_decode(rec_hdr_t h, std::ifstream* ifsp, std::size_t& rec_offs)
+enq_rec::decode(::rec_hdr_t& h, std::ifstream* ifsp, std::size_t& rec_offs)
{
+ uint32_t checksum = 0UL; // TODO: Add checksum math
if (rec_offs == 0)
{
// Read header, allocate (if req'd) for xid
//_enq_hdr.hdr_copy(h);
::rec_hdr_copy(&_enq_hdr._rhdr, &h);
- ifsp->read((char*)&_enq_hdr._xidsize, sizeof(std::size_t));
-#if defined(JRNL_32_BIT)
- ifsp->ignore(sizeof(uint32_t)); // _filler0
-#endif
- ifsp->read((char*)&_enq_hdr._dsize, sizeof(std::size_t));
-#if defined(JRNL_32_BIT)
- ifsp->ignore(sizeof(uint32_t)); // _filler1
-#endif
+ ifsp->read((char*)&_enq_hdr._xidsize, sizeof(_enq_hdr._xidsize));
+ ifsp->read((char*)&_enq_hdr._dsize, sizeof(_enq_hdr._dsize));
rec_offs = sizeof(_enq_hdr);
- if (_enq_hdr._xidsize)
+ if (_enq_hdr._xidsize > 0)
{
_buff = std::malloc(_enq_hdr._xidsize);
MALLOC_CHK(_buff, "_buff", "enq_rec", "rcv_decode");
@@ -517,8 +280,19 @@ enq_rec::rcv_decode(rec_hdr_t h, std::if
}
}
ifsp->ignore(rec_size_dblks() * QLS_DBLK_SIZE_BYTES - rec_size());
- chk_tail(); // Throws if tail invalid or record incomplete
assert(!ifsp->fail() && !ifsp->bad());
+ int res = ::rec_tail_check(&_enq_tail, &_enq_hdr._rhdr, checksum);
+ if (res != 0) {
+ std::stringstream oss;
+ switch (res) {
+ case 1: oss << std::hex << "Magic: expected 0x" << ~_enq_hdr._rhdr._magic << "; found 0x" << _enq_tail._xmagic; break;
+ case 2: oss << std::hex << "Serial: expected 0x" << _enq_hdr._rhdr._serial << "; found 0x" << _enq_tail._serial; break;
+ case 3: oss << std::hex << "Record Id: expected 0x" << _enq_hdr._rhdr._rid << "; found 0x" << _enq_tail._rid; break;
+ case 4: oss << std::hex << "Checksum: expected 0x" << checksum << "; found 0x" << _enq_tail._checksum; break;
+ default: oss << "Unknown error " << res;
+ }
+ throw jexception(jerrno::JERR_JREC_BADRECTAIL, oss.str(), "enq_rec", "decode"); // TODO: Don't throw exception, log info
+ }
return true;
}
@@ -578,44 +352,9 @@ enq_rec::rec_size(const std::size_t xids
}
void
-enq_rec::set_rid(const uint64_t rid)
-{
- _enq_hdr._rhdr._rid = rid;
- _enq_tail._rid = rid;
-}
-
-void
-enq_rec::chk_hdr() const
-{
- jrec::chk_hdr(_enq_hdr._rhdr);
- if (_enq_hdr._rhdr._magic != QLS_ENQ_MAGIC)
- {
- std::ostringstream oss;
- oss << std::hex << std::setfill('0');
- oss << "enq magic: rid=0x" << std::setw(16) << _enq_hdr._rhdr._rid;
- oss << ": expected=0x" << std::setw(8) << QLS_ENQ_MAGIC;
- oss << " read=0x" << std::setw(2) << (int)_enq_hdr._rhdr._magic;
- throw jexception(jerrno::JERR_JREC_BADRECHDR, oss.str(), "enq_rec", "chk_hdr");
- }
-}
-
-void
-enq_rec::chk_hdr(uint64_t rid) const
-{
- chk_hdr();
- jrec::chk_rid(_enq_hdr._rhdr, rid);
-}
-
-void
-enq_rec::chk_tail() const
-{
- jrec::chk_tail(_enq_tail, _enq_hdr._rhdr);
-}
-
-void
enq_rec::clean()
{
// clean up allocated memory here
}
-}}
+}}}
Modified: qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/enq_rec.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/enq_rec.h?rev=1542066&r1=1542065&r2=1542066&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/enq_rec.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/enq_rec.h Thu Nov 14 20:39:32 2013
@@ -19,86 +19,54 @@
*
*/
-#ifndef QPID_LEGACYSTORE_JRNL_ENQ_REC_H
-#define QPID_LEGACYSTORE_JRNL_ENQ_REC_H
+#ifndef QPID_LINEARSTORE_JOURNAL_ENQ_REC_H
+#define QPID_LINEARSTORE_JOURNAL_ENQ_REC_H
-namespace qpid
-{
-namespace qls_jrnl
-{
-class enq_rec;
-}}
-
-#include <cstddef>
-#include "qpid/linearstore/jrnl/utils/enq_hdr.h"
#include "qpid/linearstore/jrnl/jrec.h"
+#include "qpid/linearstore/jrnl/utils/enq_hdr.h"
+#include "qpid/linearstore/jrnl/utils/rec_tail.h"
-namespace qpid
-{
-namespace qls_jrnl
+namespace qpid {
+namespace linearstore {
+namespace journal {
+
+/**
+* \class enq_rec
+* \brief Class to handle a single journal enqueue record.
+*/
+class enq_rec : public jrec
{
+private:
+ ::enq_hdr_t _enq_hdr; ///< Local instance of enqueue header struct
+ const void* _xidp; ///< xid pointer for encoding (for writing to disk)
+ const void* _data; ///< Pointer to data to be written to disk
+ void* _buff; ///< Pointer to buffer to receive data read from disk
+ ::rec_tail_t _enq_tail; ///< Local instance of enqueue tail struct
+
+public:
+ enq_rec();
+ virtual ~enq_rec();
+
+ void reset(const uint64_t serial, const uint64_t rid, const void* const dbuf, const std::size_t dlen,
+ const void* const xidp, const std::size_t xidlen, const bool transient, const bool external);
+ uint32_t encode(void* wptr, uint32_t rec_offs_dblks, uint32_t max_size_dblks);
+ bool decode(::rec_hdr_t& h, std::ifstream* ifsp, std::size_t& rec_offs);
+
+ std::size_t get_xid(void** const xidpp);
+ std::size_t get_data(void** const datapp);
+ inline bool is_transient() const { return ::is_enq_transient(&_enq_hdr); }
+ inline bool is_external() const { return ::is_enq_external(&_enq_hdr); }
+ std::string& str(std::string& str) const;
+ inline std::size_t data_size() const { return _enq_hdr._dsize; }
+ inline std::size_t xid_size() const { return _enq_hdr._xidsize; }
+ std::size_t rec_size() const;
+ static std::size_t rec_size(const std::size_t xidsize, const std::size_t dsize, const bool external);
+ inline uint64_t rid() const { return _enq_hdr._rhdr._rid; }
+
+private:
+ virtual void clean();
+};
- /**
- * \class enq_rec
- * \brief Class to handle a single journal enqueue record.
- */
- class enq_rec : public jrec
- {
- private:
- enq_hdr_t _enq_hdr;
- const void* _xidp; ///< xid pointer for encoding (for writing to disk)
- const void* _data; ///< Pointer to data to be written to disk
- void* _buff; ///< Pointer to buffer to receive data read from disk
- rec_tail_t _enq_tail;
-
- public:
- /**
- * \brief Constructor used for read operations.
- */
- enq_rec();
-
- /**
- * \brief Constructor used for write operations, where mbuf contains data to be written.
- */
- enq_rec(const uint64_t rid, const void* const dbuf, const std::size_t dlen,
- const void* const xidp, const std::size_t xidlen, const bool transient);
-
- /**
- * \brief Destructor
- */
- virtual ~enq_rec();
-
- // Prepare instance for use in reading data from journal, xid and data will be allocated
- void reset();
- // Prepare instance for use in writing data to journal
- void reset(const uint64_t rid, const void* const dbuf, const std::size_t dlen,
- const void* const xidp, const std::size_t xidlen, const bool transient,
- const bool external);
-
- uint32_t encode(void* wptr, uint32_t rec_offs_dblks, uint32_t max_size_dblks);
- uint32_t decode(rec_hdr_t& h, void* rptr, uint32_t rec_offs_dblks, uint32_t max_size_dblks);
- // Decode used for recover
- bool rcv_decode(rec_hdr_t h, std::ifstream* ifsp, std::size_t& rec_offs);
-
- std::size_t get_xid(void** const xidpp);
- std::size_t get_data(void** const datapp);
- inline bool is_transient() const { return ::is_enq_transient(&_enq_hdr); }
- inline bool is_external() const { return ::is_enq_external(&_enq_hdr); }
- std::string& str(std::string& str) const;
- inline std::size_t data_size() const { return _enq_hdr._dsize; }
- inline std::size_t xid_size() const { return _enq_hdr._xidsize; }
- std::size_t rec_size() const;
- static std::size_t rec_size(const std::size_t xidsize, const std::size_t dsize, const bool external);
- inline uint64_t rid() const { return _enq_hdr._rhdr._rid; }
- void set_rid(const uint64_t rid);
-
- private:
- void chk_hdr() const;
- void chk_hdr(uint64_t rid) const;
- void chk_tail() const;
- virtual void clean();
- }; // class enq_rec
-
-}}
+}}}
-#endif // ifndef QPID_LEGACYSTORE_JRNL_ENQ_REC_H
+#endif // ifndef QPID_LINEARSTORE_JOURNAL_ENQ_REC_H
Modified: qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/enums.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/enums.h?rev=1542066&r1=1542065&r2=1542066&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/enums.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/enums.h Thu Nov 14 20:39:32 2013
@@ -19,13 +19,12 @@
*
*/
-#ifndef QPID_LINEARSTORE_JRNL_ENUMS_H
-#define QPID_LINEARSTORE_JRNL_ENUMS_H
+#ifndef QPID_LINEARSTORE_JOURNAL_ENUMS_H
+#define QPID_LINEARSTORE_JOURNAL_ENUMS_H
-namespace qpid
-{
-namespace qls_jrnl
-{
+namespace qpid {
+namespace linearstore {
+namespace journal {
// TODO: Change this to flags, as multiple of these conditions may exist simultaneously
/**
@@ -37,12 +36,7 @@ namespace qls_jrnl
RHM_IORES_PAGE_AIOWAIT, ///< IO operation suspended - next page is waiting for AIO.
RHM_IORES_FILE_AIOWAIT, ///< IO operation suspended - next file is waiting for AIO.
RHM_IORES_EMPTY, ///< During read operations, nothing further is available to read.
-// RHM_IORES_RCINVALID, ///< Read page cache is invalid (ie obsolete or uninitialized)
-// RHM_IORES_ENQCAPTHRESH, ///< Enqueue capacity threshold (limit) reached.
-// RHM_IORES_FULL, ///< During write operations, the journal files are full.
-// RHM_IORES_BUSY, ///< Another blocking operation is in progress.
RHM_IORES_TXPENDING ///< Operation blocked by pending transaction.
-// RHM_IORES_NOTIMPL ///< Function is not implemented.
};
typedef _iores iores;
@@ -54,46 +48,11 @@ namespace qls_jrnl
case RHM_IORES_PAGE_AIOWAIT: return "RHM_IORES_PAGE_AIOWAIT";
case RHM_IORES_FILE_AIOWAIT: return "RHM_IORES_FILE_AIOWAIT";
case RHM_IORES_EMPTY: return "RHM_IORES_EMPTY";
-// case RHM_IORES_RCINVALID: return "RHM_IORES_RCINVALID";
-// case RHM_IORES_ENQCAPTHRESH: return "RHM_IORES_ENQCAPTHRESH";
-// case RHM_IORES_FULL: return "RHM_IORES_FULL";
-// case RHM_IORES_BUSY: return "RHM_IORES_BUSY";
case RHM_IORES_TXPENDING: return "RHM_IORES_TXPENDING";
-// case RHM_IORES_NOTIMPL: return "RHM_IORES_NOTIMPL";
}
return "<iores unknown>";
}
-/*
- enum _log_level
- {
- LOG_TRACE = 0,
- LOG_DEBUG,
- LOG_INFO,
- LOG_NOTICE,
- LOG_WARN,
- LOG_ERROR,
- LOG_CRITICAL
- };
- typedef _log_level log_level_t;
-
- static inline const char* log_level_str(log_level_t ll)
- {
- switch (ll)
- {
- case LOG_TRACE: return "TRACE";
- case LOG_DEBUG: return "DEBUG";
- case LOG_INFO: return "INFO";
- case LOG_NOTICE: return "NOTICE";
- case LOG_WARN: return "WARN";
- case LOG_ERROR: return "ERROR";
- case LOG_CRITICAL: return "CRITICAL";
- }
- return "<log level unknown>";
- }
-*/
-
-
-}}
+}}}
-#endif // ifndef QPID_LINEARSTORE_JRNL_ENUMS_H
+#endif // ifndef QPID_LINEARSTORE_JOURNAL_ENUMS_H
Modified: qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/jcfg.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/jcfg.h?rev=1542066&r1=1542065&r2=1542066&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/jcfg.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/jcfg.h Thu Nov 14 20:39:32 2013
@@ -19,8 +19,8 @@
*
*/
-#ifndef QPID_LEGACYSTORE_JRNL_JCFG_H
-#define QPID_LEGACYSTORE_JRNL_JCFG_H
+#ifndef QPID_QLS_JRNL_JCFG_H
+#define QPID_QLS_JRNL_JCFG_H
#define QLS_SBLK_SIZE_BYTES 4096 /**< Disk softblock size in bytes, should match size used on disk media */
#define QLS_AIO_ALIGN_BOUNDARY_BYTES QLS_SBLK_SIZE_BYTES /** Memory alignment boundary used for DMA */
@@ -55,4 +55,4 @@
#define QLS_CLEAN /**< If defined, writes QLS_CLEAN_CHAR to all filled areas on disk */
#define QLS_CLEAN_CHAR 0xff /**< Char used to clear empty space on disk */
-#endif /* ifndef QPID_LEGACYSTORE_JRNL_JCFG_H */
+#endif /* ifndef QPID_QLS_JRNL_JCFG_H */
Modified: qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/jcntl.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/jcntl.cpp?rev=1542066&r1=1542065&r2=1542066&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/jcntl.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/jcntl.cpp Thu Nov 14 20:39:32 2013
@@ -38,10 +38,9 @@
#include <sstream>
#include <unistd.h>
-namespace qpid
-{
-namespace qls_jrnl
-{
+namespace qpid {
+namespace linearstore {
+namespace journal {
#define AIO_CMPL_TIMEOUT_SEC 5
#define AIO_CMPL_TIMEOUT_NSEC 0
@@ -132,9 +131,9 @@ jcntl::recover(EmptyFilePoolManager* efp
_recoveryManager.analyzeJournals(prep_txn_list_ptr, efpmp, &_emptyFilePoolPtr);
highest_rid = _recoveryManager.getHighestRecordId();
- _jrnl_log.log(/*LOG_DEBUG*/JournalLog::LOG_INFO, _jid, _recoveryManager.toString(_jid, true));
+ _jrnl_log.log(/*LOG_DEBUG*/JournalLog::LOG_INFO, _jid, _recoveryManager.toLog(_jid, 5));
_linearFileController.initialize(_jdir.dirname(), _emptyFilePoolPtr, _recoveryManager.getHighestFileNumber());
- _recoveryManager.setLinearFileControllerJournals(&qpid::qls_jrnl::LinearFileController::addJournalFile, &_linearFileController);
+ _recoveryManager.setLinearFileControllerJournals(&qpid::linearstore::journal::LinearFileController::addJournalFile, &_linearFileController);
_wmgr.initialize(cbp, wcache_pgsize_sblks, wcache_num_pages, QLS_WMGR_MAXDTOKPP, QLS_WMGR_MAXWAITUS,
(_recoveryManager.isLastFileFull() ? 0 : _recoveryManager.getEndOffset()));
@@ -430,4 +429,4 @@ std::cout << "&&&&&& jcntl::handle_aio_w
return false;
}
-}}
+}}}
Modified: qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/jcntl.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/jcntl.h?rev=1542066&r1=1542065&r2=1542066&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/jcntl.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/jcntl.h Thu Nov 14 20:39:32 2013
@@ -19,15 +19,14 @@
*
*/
-#ifndef QPID_LINEARSTORE_JRNL_JCNTL_H
-#define QPID_LINEARSTORE_JRNL_JCNTL_H
+#ifndef QPID_LINEARSTORE_JOURNAL_JCNTL_H
+#define QPID_LINEARSTORE_JOURNAL_JCNTL_H
-namespace qpid
-{
-namespace qls_jrnl
-{
+namespace qpid {
+namespace linearstore {
+namespace journal {
class jcntl;
-}}
+}}}
#include <cstddef>
#include <deque>
@@ -38,10 +37,9 @@ namespace qls_jrnl
#include "qpid/linearstore/jrnl/smutex.h"
#include "qpid/linearstore/jrnl/wmgr.h"
-namespace qpid
-{
-namespace qls_jrnl
-{
+namespace qpid {
+namespace linearstore {
+namespace journal {
class EmptyFilePool;
class EmptyFilePoolManager;
@@ -570,6 +568,6 @@ namespace qls_jrnl
bool handle_aio_wait(const iores res, iores& resout, const data_tok* dtp);
};
-}}
+}}}
-#endif // ifndef QPID_LINEARSTORE_JRNL_JCNTL_H
+#endif // ifndef QPID_LINEARSTORE_JOURNAL_JCNTL_H
Modified: qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/jdir.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/jdir.cpp?rev=1542066&r1=1542065&r2=1542066&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/jdir.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/jdir.cpp Thu Nov 14 20:39:32 2013
@@ -32,10 +32,9 @@
#include <sys/stat.h>
#include <unistd.h>
-namespace qpid
-{
-namespace qls_jrnl
-{
+namespace qpid {
+namespace linearstore {
+namespace journal {
jdir::jdir(const std::string& dirname/*, const std::string& _base_filename*/):
_dirname(dirname)/*,
@@ -476,4 +475,4 @@ operator<<(std::ostream& os, const jdir*
return os;
}
-}}
+}}}
Modified: qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/jdir.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/jdir.h?rev=1542066&r1=1542065&r2=1542066&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/jdir.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/jdir.h Thu Nov 14 20:39:32 2013
@@ -19,25 +19,23 @@
*
*/
-#ifndef QPID_LINEARSTORE_JRNL_JDIR_H
-#define QPID_LINEARSTORE_JRNL_JDIR_H
+#ifndef QPID_LINEARSTORE_JOURNAL_JDIR_H
+#define QPID_LINEARSTORE_JOURNAL_JDIR_H
-namespace qpid
-{
-namespace qls_jrnl
-{
+namespace qpid {
+namespace linearstore {
+namespace journal {
class jdir;
-}}
+}}}
//#include "qpid/linearstore/jrnl/jinf.h"
#include <dirent.h>
#include <string>
#include <vector>
-namespace qpid
-{
-namespace qls_jrnl
-{
+namespace qpid {
+namespace linearstore {
+namespace journal {
/**
* \class jdir
@@ -364,6 +362,6 @@ namespace qls_jrnl
static void close_dir(DIR* dir, const std::string& dir_name, const std::string& fn_name);
};
-}}
+}}}
-#endif // ifndef QPID_LINEARSTORE_JRNL_JDIR_H
+#endif // ifndef QPID_LINEARSTORE_JOURNAL_JDIR_H
Modified: qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/jerrno.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/jerrno.cpp?rev=1542066&r1=1542065&r2=1542066&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/jerrno.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/jerrno.cpp Thu Nov 14 20:39:32 2013
@@ -21,10 +21,9 @@
#include "qpid/linearstore/jrnl/jerrno.h"
-namespace qpid
-{
-namespace qls_jrnl
-{
+namespace qpid {
+namespace linearstore {
+namespace journal {
std::map<uint32_t, const char*> jerrno::_err_map;
std::map<uint32_t, const char*>::iterator jerrno::_err_map_itr;
@@ -214,4 +213,4 @@ jerrno::err_msg(const uint32_t err_no) t
return _err_map_itr->second;
}
-}}
+}}}
Modified: qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/jerrno.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/jerrno.h?rev=1542066&r1=1542065&r2=1542066&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/jerrno.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/jerrno.h Thu Nov 14 20:39:32 2013
@@ -19,24 +19,22 @@
*
*/
-#ifndef QPID_LEGACYSTORE_JRNL_JERRNO_H
-#define QPID_LEGACYSTORE_JRNL_JERRNO_H
+#ifndef QPID_LINEARSTORE_JOURNAL_JERRNO_H
+#define QPID_LINEARSTORE_JOURNAL_JERRNO_H
-namespace qpid
-{
-namespace qls_jrnl
-{
+namespace qpid {
+namespace linearstore {
+namespace journal {
class jerrno;
-}}
+}}}
#include <map>
#include <stdint.h>
#include <string>
-namespace qpid
-{
-namespace qls_jrnl
-{
+namespace qpid {
+namespace linearstore {
+namespace journal {
/**
* \class jerrno
@@ -144,6 +142,6 @@ namespace qls_jrnl
static bool __init();
};
-}}
+}}}
-#endif // ifndef QPID_LEGACYSTORE_JRNL_JERRNO_H
+#endif // ifndef QPID_LINEARSTORE_JOURNAL_JERRNO_H
Modified: qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/jexception.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/jexception.cpp?rev=1542066&r1=1542065&r2=1542066&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/jexception.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/jexception.cpp Thu Nov 14 20:39:32 2013
@@ -27,10 +27,9 @@
#define CATLEN(p) MAX_MSG_SIZE - std::strlen(p) - 1
-namespace qpid
-{
-namespace qls_jrnl
-{
+namespace qpid {
+namespace linearstore {
+namespace journal {
jexception::jexception() throw ():
std::exception(),
@@ -168,4 +167,4 @@ operator<<(std::ostream& os, const jexce
return os;
}
-}}
+}}}
Modified: qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/jexception.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/jexception.h?rev=1542066&r1=1542065&r2=1542066&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/jexception.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/jexception.h Thu Nov 14 20:39:32 2013
@@ -19,15 +19,14 @@
*
*/
-#ifndef QPID_LEGACYSTORE_JRNL_JEXCEPTION_H
-#define QPID_LEGACYSTORE_JRNL_JEXCEPTION_H
+#ifndef QPID_LINEARSTORE_JOURNAL_JEXCEPTION_H
+#define QPID_LINEARSTORE_JOURNAL_JEXCEPTION_H
-namespace qpid
-{
-namespace qls_jrnl
-{
+namespace qpid {
+namespace linearstore {
+namespace journal {
class jexception;
-}}
+}}}
#include <cerrno>
#include <cstdio>
@@ -70,10 +69,10 @@ class jexception;
::abort(); \
}
-namespace qpid
-{
-namespace qls_jrnl
-{
+namespace qpid {
+namespace linearstore {
+namespace journal {
+
/**
* \class jexception
* \brief Generic journal exception class
@@ -121,6 +120,6 @@ namespace qls_jrnl
friend std::ostream& operator<<(std::ostream& os, const jexception* jePtr);
}; // class jexception
-}}
+}}}
-#endif // ifndef QPID_LEGACYSTORE_JRNL_JEXCEPTION_H
+#endif // ifndef QPID_LINEARSTORE_JOURNAL_JEXCEPTION_H
Modified: qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/jrec.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/jrec.cpp?rev=1542066&r1=1542065&r2=1542066&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/jrec.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/jrec.cpp Thu Nov 14 20:39:32 2013
@@ -24,84 +24,15 @@
#include <iomanip>
#include "qpid/linearstore/jrnl/jerrno.h"
#include "qpid/linearstore/jrnl/jexception.h"
+#include "qpid/linearstore/jrnl/utils/rec_hdr.h"
+#include "qpid/linearstore/jrnl/utils/rec_tail.h"
#include <sstream>
-namespace qpid
-{
-namespace qls_jrnl
-{
+namespace qpid {
+namespace linearstore {
+namespace journal {
jrec::jrec() {}
jrec::~jrec() {}
-void
-jrec::chk_hdr(const rec_hdr_t& hdr)
-{
- if (hdr._magic == 0)
- {
- std::ostringstream oss;
- oss << std::hex << std::setfill('0');
- oss << "enq magic NULL: rid=0x" << hdr._rid;
- throw jexception(jerrno::JERR_JREC_BADRECHDR, oss.str(), "jrec", "chk_hdr");
- }
- if (hdr._version != QLS_JRNL_VERSION)
- {
- std::ostringstream oss;
- oss << std::hex << std::setfill('0');
- oss << "version: rid=0x" << hdr._rid;
- oss << ": expected=0x" << std::setw(2) << (int)QLS_JRNL_VERSION;
- oss << " read=0x" << std::setw(2) << (int)hdr._version;
- throw jexception(jerrno::JERR_JREC_BADRECHDR, oss.str(), "jrec", "chk_hdr");
- }
-//#if defined (JRNL_LITTLE_ENDIAN)
-// uint8_t endian_flag = RHM_LENDIAN_FLAG;
-//#else
-// uint8_t endian_flag = RHM_BENDIAN_FLAG;
-//#endif
-// if (hdr._eflag != endian_flag)
-// {
-// std::ostringstream oss;
-// oss << std::hex << std::setfill('0');
-// oss << "endian_flag: rid=" << hdr._rid;
-// oss << ": expected=0x" << std::setw(2) << (int)endian_flag;
-// oss << " read=0x" << std::setw(2) << (int)hdr._eflag;
-// throw jexception(jerrno::JERR_JREC_BADRECHDR, oss.str(), "jrec", "chk_hdr");
-// }
-}
-
-void
-jrec::chk_rid(const rec_hdr_t& hdr, const uint64_t rid)
-{
- if (hdr._rid != rid)
- {
- std::ostringstream oss;
- oss << std::hex << std::setfill('0');
- oss << "rid mismatch: expected=0x" << rid;
- oss << " read=0x" << hdr._rid;
- throw jexception(jerrno::JERR_JREC_BADRECHDR, oss.str(), "jrec", "chk_hdr");
- }
-}
-
-void
-jrec::chk_tail(const rec_tail_t& tail, const rec_hdr_t& hdr)
-{
- if (tail._xmagic != ~hdr._magic)
- {
- std::ostringstream oss;
- oss << std::hex << std::setfill('0');
- oss << "magic: rid=0x" << hdr._rid;
- oss << ": expected=0x" << ~hdr._magic;
- oss << " read=0x" << tail._xmagic;
- throw jexception(jerrno::JERR_JREC_BADRECTAIL, oss.str(), "jrec", "chk_tail");
- }
- if (tail._rid != hdr._rid)
- {
- std::ostringstream oss;
- oss << std::hex << std::setfill('0');
- oss << "rid: rid=0x" << hdr._rid;
- oss << ": read=0x" << tail._rid;
- throw jexception(jerrno::JERR_JREC_BADRECTAIL, oss.str(), "jrec", "chk_tail");
- }
-}
-
-}}
+}}}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org