You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kp...@apache.org on 2013/10/07 20:39:25 UTC
svn commit: r1530024 [3/4] - in /qpid/branches/linearstore/qpid/cpp/src: ./
qpid/linearstore/ qpid/linearstore/jrnl/ qpid/linearstore/jrnl/utils/
tests/linearstore/
Modified: qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/jcntl.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/jcntl.cpp?rev=1530024&r1=1530023&r2=1530024&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/jcntl.cpp (original)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/jcntl.cpp Mon Oct 7 18:39:24 2013
@@ -30,10 +30,12 @@
#include <iomanip>
#include <iostream>
#include <qpid/linearstore/jrnl/EmptyFilePool.h>
+#include <qpid/linearstore/jrnl/EmptyFilePoolManager.h>
//#include "qpid/linearstore/jrnl/file_hdr.h"
#include "qpid/linearstore/jrnl/jerrno.h"
//#include "qpid/linearstore/jrnl/jinf.h"
-#include "qpid/linearstore/jrnl/JournalFileController.h"
+//#include "qpid/linearstore/jrnl/JournalFileController.h"
+#include "qpid/linearstore/jrnl/utils/enq_hdr.h"
#include <limits>
#include <sstream>
#include <unistd.h>
@@ -72,15 +74,16 @@ jcntl::jcntl(const std::string& jid, con
_stop_flag(false),
_readonly_flag(false),
// _autostop(true),
- _jfcp(0),
+ _linearFileController(*this),
+ _emptyFilePoolPtr(0),
// _jfsize_sblks(0),
// _lpmgr(),
_emap(),
_tmap(),
// _rrfc(&_lpmgr),
// _wrfc(&_lpmgr),
- _rmgr(this, _emap, _tmap/*, _rrfc*/),
- _wmgr(this, _emap, _tmap/*, _wrfc*/),
+// _rmgr(this, _emap, _tmap/*, _rrfc*/),
+ _wmgr(this, _emap, _tmap, _linearFileController/*, _wrfc*/),
_rcvdat()
{}
@@ -90,11 +93,7 @@ jcntl::~jcntl()
try { stop(true); }
catch (const jexception& e) { std::cerr << e << std::endl; }
// _lpmgr.finalize();
- if (_jfcp) {
- _jfcp->finalize();
- delete _jfcp;
- _jfcp = 0;
- }
+ _linearFileController.finalize();
}
void
@@ -109,11 +108,7 @@ jcntl::initialize(/*const uint16_t num_j
_emap.clear();
_tmap.clear();
- if (_jfcp) {
- _jfcp->finalize();
- delete _jfcp;
- _jfcp = 0;
- }
+ _linearFileController.finalize();
// _lpmgr.finalize();
@@ -129,14 +124,15 @@ jcntl::initialize(/*const uint16_t num_j
// Clear any existing journal files
_jdir.clear_dir();
-// _lpmgr.initialize(num_jfiles, ae, ae_max_jfiles, this, &new_fcntl);
+// _lpmgr.initialize(num_jfiles, ae, ae_max_jfiles, this, &new_fcntl); // Creates new journal files
- _jfcp = new JournalFileController(_jdir.dirname(), efpp);
- _jfcp->pullEmptyFileFromEfp(1, 4096, _jid);
+ _linearFileController.initialize(_jdir.dirname(), efpp);
+ _linearFileController.pullEmptyFileFromEfp();
+ std::cout << _linearFileController.status(2);
// _wrfc.initialize(_jfsize_sblks);
// _rrfc.initialize();
// _rrfc.set_findex(0);
- _rmgr.initialize(cbp);
+// _rmgr.initialize(cbp);
_wmgr.initialize(cbp, wcache_pgsize_sblks, wcache_num_pages, JRNL_WMGR_MAXDTOKPP, JRNL_WMGR_MAXWAITUS);
// Write info file (<basename>.jinf) to disk
@@ -146,11 +142,12 @@ jcntl::initialize(/*const uint16_t num_j
}
void
-jcntl::recover(/*const uint16_t num_jfiles, const bool ae, const uint16_t ae_max_jfiles,
- const uint32_t jfsize_sblks,*/ const uint16_t wcache_num_pages, const uint32_t wcache_pgsize_sblks,
-// const rd_aio_cb rd_cb, const wr_aio_cb wr_cb, const std::vector<std::string>* prep_txn_list_ptr,
- aio_callback* const cbp, const std::vector<std::string>* prep_txn_list_ptr,
- uint64_t& highest_rid)
+jcntl::recover(EmptyFilePoolManager* efpm,
+ const uint16_t wcache_num_pages,
+ const uint32_t wcache_pgsize_sblks,
+ aio_callback* const cbp,
+ const std::vector<std::string>* prep_txn_list_ptr,
+ uint64_t& highest_rid)
{
_init_flag = false;
_stop_flag = false;
@@ -159,6 +156,8 @@ jcntl::recover(/*const uint16_t num_jfil
_emap.clear();
_tmap.clear();
+ _linearFileController.finalize();
+
// _lpmgr.finalize();
// assert(num_jfiles >= JRNL_MIN_NUM_FILES);
@@ -171,18 +170,19 @@ jcntl::recover(/*const uint16_t num_jfil
_jdir.verify_dir();
// _rcvdat.reset(num_jfiles/*, ae, ae_max_jfiles*/);
- rcvr_janalyze(_rcvdat, prep_txn_list_ptr);
+ rcvr_janalyze(prep_txn_list_ptr, efpm);
highest_rid = _rcvdat._h_rid;
- if (_rcvdat._jfull)
- throw jexception(jerrno::JERR_JCNTL_RECOVERJFULL, "jcntl", "recover");
- this->log(LOG_DEBUG, _jid, _rcvdat.to_log(_jid));
+// if (_rcvdat._jfull)
+// throw jexception(jerrno::JERR_JCNTL_RECOVERJFULL, "jcntl", "recover");
+ this->log(/*LOG_DEBUG*/LOG_INFO, _jid, _rcvdat.to_log(_jid));
// _lpmgr.recover(_rcvdat, this, &new_fcntl);
+ _linearFileController.initialize(_jdir.dirname(), _emptyFilePoolPtr);
// _wrfc.initialize(_jfsize_sblks, &_rcvdat);
// _rrfc.initialize();
// _rrfc.set_findex(_rcvdat.ffid());
- _rmgr.initialize(cbp);
+// _rmgr.initialize(cbp);
_wmgr.initialize(cbp, wcache_pgsize_sblks, wcache_num_pages, JRNL_WMGR_MAXDTOKPP, JRNL_WMGR_MAXWAITUS,
(_rcvdat._lffull ? 0 : _rcvdat._eo));
@@ -200,7 +200,7 @@ jcntl::recover_complete()
// _wrfc.initialize(_jfsize_sblks, &_rcvdat);
// _rrfc.initialize();
// _rrfc.set_findex(_rcvdat.ffid());
- _rmgr.recover_complete();
+// _rmgr.recover_complete();
_readonly_flag = false;
}
@@ -208,7 +208,7 @@ void
jcntl::delete_jrnl_files()
{
stop(true); // wait for AIO to complete
- _jfcp->purgeFilesToEfp();
+ _linearFileController.purgeFilesToEfp();
_jdir.delete_dir();
}
@@ -287,8 +287,55 @@ jcntl::discard_data_record(data_tok* con
iores
jcntl::read_data_record(void** const datapp, std::size_t& dsize, void** const xidpp, std::size_t& xidsize,
- bool& transient, bool& external, data_tok* const dtokp, bool ignore_pending_txns)
+ bool& transient, bool& external, data_tok* const dtokp, bool /*ignore_pending_txns*/)
{
+ if (!dtokp->is_readable()) {
+ std::ostringstream oss;
+ oss << std::hex << std::setfill('0');
+ oss << "dtok_id=0x" << std::setw(8) << dtokp->id();
+ oss << "; dtok_rid=0x" << std::setw(16) << dtokp->rid();
+ oss << "; dtok_wstate=" << dtokp->wstate_str();
+ throw jexception(jerrno::JERR_JCNTL_ENQSTATE, oss.str(), "jcntl", "read_data_record");
+ }
+ std::vector<uint64_t> ridl;
+ _emap.rid_list(ridl);
+ enq_map::emap_data_struct_t eds;
+ for (std::vector<uint64_t>::const_iterator i=ridl.begin(); i!=ridl.end(); ++i) {
+ short res = _emap.get_data(*i, eds);
+ if (res == enq_map::EMAP_OK) {
+ std::ifstream ifs(_rcvdat._fm[eds._pfid].c_str(), std::ifstream::in | std::ifstream::binary);
+ if (!ifs.good()) {
+ std::ostringstream oss;
+ oss << "rid=" << (*i) << " pfid=" << eds._pfid << " file=" << _rcvdat._fm[eds._pfid] << " file_posn=" << eds._file_posn;
+ throw jexception(jerrno::JERR_JCNTL_OPENRD, oss.str(), "jcntl", "read_data_record");
+ }
+ ifs.seekg(eds._file_posn, std::ifstream::beg);
+ ::enq_hdr_t eh;
+ ifs.read((char*)&eh, sizeof(::enq_hdr_t));
+ if (!::validate_enq_hdr(&eh, QLS_ENQ_MAGIC, QLS_JRNL_VERSION, *i)) {
+ std::ostringstream oss;
+ oss << "rid=" << (*i) << " pfid=" << eds._pfid << " file=" << _rcvdat._fm[eds._pfid] << " file_posn=" << eds._file_posn;
+ throw jexception(jerrno::JERR_JCNTL_INVALIDENQHDR, oss.str(), "jcntl", "read_data_record");
+ }
+ dsize = eh._dsize;
+ xidsize = eh._xidsize;
+ transient = ::is_enq_transient(&eh);
+ external = ::is_enq_external(&eh);
+ if (xidsize) {
+ *xidpp = ::malloc(xidsize);
+ ifs.read((char*)(*xidpp), xidsize);
+ } else {
+ *xidpp = 0;
+ }
+ if (dsize) {
+ *datapp = ::malloc(dsize);
+ ifs.read((char*)(*datapp), dsize);
+ } else {
+ *datapp = 0;
+ }
+ }
+ }
+/*
check_rstatus("read_data");
iores res = _rmgr.read(datapp, dsize, xidpp, xidsize, transient, external, dtokp, ignore_pending_txns);
if (res == RHM_IORES_RCINVALID)
@@ -302,6 +349,8 @@ jcntl::read_data_record(void** const dat
res = _rmgr.read(datapp, dsize, xidpp, xidsize, transient, external, dtokp, ignore_pending_txns);
}
return res;
+*/
+ return RHM_IORES_SUCCESS;
}
iores
@@ -370,11 +419,13 @@ jcntl::get_wr_events(timespec* const tim
return res;
}
+/*
int32_t
jcntl::get_rd_events(timespec* const timeout)
{
return _rmgr.get_events(pmgr::AIO_COMPLETE, timeout);
}
+*/
void
jcntl::stop(const bool block_till_aio_cmpl)
@@ -386,26 +437,13 @@ jcntl::stop(const bool block_till_aio_cm
_stop_flag = true;
if (!_readonly_flag)
flush(block_till_aio_cmpl);
-// _rrfc.finalize();
-// _lpmgr.finalize();
+ _linearFileController.finalize();
}
-/*
-uint16_t
-jcntl::get_earliest_fid()
-{
- uint16_t ffid = _wrfc.earliest_index();
- uint16_t fid = _wrfc.index();
- while ( _emap.get_enq_cnt(ffid) == 0 && _tmap.get_txn_pfid_cnt(ffid) == 0 && ffid != fid)
- {
- if (++ffid >= _lpmgr.num_jfiles())
- ffid = 0;
- }
- if (!_rrfc.is_active())
- _rrfc.set_findex(ffid);
- return ffid;
+LinearFileController&
+jcntl::getLinearFileControllerRef() {
+ return _linearFileController;
}
-*/
iores
jcntl::flush(const bool block_till_aio_cmpl)
@@ -496,22 +534,6 @@ jcntl::check_rstatus(const char* fn_name
throw jexception(jerrno::JERR_JCNTL_STOPPED, "jcntl", fn_name);
}
-/*
-void
-jcntl::write_infofile() const
-{
- timespec ts;
- if (::clock_gettime(CLOCK_REALTIME, &ts))
- {
- std::ostringstream oss;
- oss << FORMAT_SYSERR(errno);
- throw jexception(jerrno::JERR__RTCLOCK, oss.str(), "jcntl", "write_infofile");
- }
- jinf ji(_jid, _jdir.dirname(), _base_filename, _lpmgr.num_jfiles(), _lpmgr.is_ae(), _lpmgr.ae_max_jfiles(),
- _jfsize_sblks, _wmgr.cache_pgsize_sblks(), _wmgr.cache_num_pages(), ts);
- ji.write();
-}
-*/
void
jcntl::aio_cmpl_wait()
@@ -530,6 +552,7 @@ jcntl::aio_cmpl_wait()
}
}
+
bool
jcntl::handle_aio_wait(const iores res, iores& resout, const data_tok* dtp)
{
@@ -569,64 +592,71 @@ jcntl::handle_aio_wait(const iores res,
return false;
}
-void
-jcntl::rcvr_janalyze(rcvdat& /*rd*/, const std::vector<std::string>* /*prep_txn_list_ptr*/)
-{
-/*
- jinf ji(_jdir.dirname() + "/" + _base_filename + "." + JRNL_INFO_EXTENSION, true);
- // If the number of files does not tie up with the jinf file from the journal being recovered,
- // use the jinf data.
- if (rd._njf != ji.num_jfiles())
- {
- std::ostringstream oss;
- oss << "Recovery found " << ji.num_jfiles() <<
- " files (different from --num-jfiles value of " << rd._njf << ").";
- this->log(LOG_INFO, oss.str());
- rd._njf = ji.num_jfiles();
- _rcvdat._enq_cnt_list.resize(rd._njf);
- }
- _emap.set_num_jfiles(rd._njf);
- _tmap.set_num_jfiles(rd._njf);
- if (_jfsize_sblks != ji.jfsize_sblks())
- {
+// static
+void
+jcntl::rcvr_read_jfile(const std::string& jfn, ::file_hdr_t* fh, std::string& queueName) {
+ const std::size_t headerBlockSize = QLS_JRNL_FHDR_RES_SIZE_SBLKS * JRNL_SBLK_SIZE_KIB * 1024;
+ char buffer[headerBlockSize];
+ std::ifstream ifs(jfn.c_str(), std::ifstream::in | std::ifstream::binary);
+ if (!ifs.good()) {
std::ostringstream oss;
- oss << "Recovery found file size = " << (ji.jfsize_sblks() / JRNL_RMGR_PAGE_SIZE) <<
- " (different from --jfile-size-pgs value of " <<
- (_jfsize_sblks / JRNL_RMGR_PAGE_SIZE) << ").";
- this->log(LOG_INFO, oss.str());
- _jfsize_sblks = ji.jfsize_sblks();
+ oss << "File=" << jfn;
+ throw jexception(jerrno::JERR_JCNTL_OPENRD, oss.str(), "jcntl", "rcvr_read_jfile");
}
- if (_jdir.dirname().compare(ji.jdir()))
- {
+ ifs.read(buffer, headerBlockSize);
+ if (!ifs) {
+ std::streamsize s = ifs.gcount();
+ ifs.close();
std::ostringstream oss;
- oss << "Journal file location change: original = \"" << ji.jdir() <<
- "\"; current = \"" << _jdir.dirname() << "\"";
- this->log(LOG_WARN, oss.str());
- ji.set_jdir(_jdir.dirname());
+ oss << "File=" << jfn << "; attempted_read_size=" << headerBlockSize << "; actual_read_size=" << s;
+ throw jexception(jerrno::JERR_JCNTL_READ, oss.str(), "jcntl", "rcvr_read_jfile");
}
+ ifs.close();
+ ::memcpy(fh, buffer, sizeof(::file_hdr_t));
+ queueName.assign(buffer + sizeof(::file_hdr_t), fh->_queue_name_len);
+}
- try
- {
- rd._ffid = ji.get_first_pfid();
- rd._lfid = ji.get_last_pfid();
- rd._owi = ji.get_initial_owi();
- rd._frot = ji.get_frot();
- rd._jempty = false;
- ji.get_normalized_pfid_list(rd._fid_list); // _pfid_list
+
+void jcntl::rcvr_analyze_fhdrs(EmptyFilePoolManager* efpmp) {
+ std::string headerQueueName;
+ ::file_hdr_t fh;
+ efpIdentity_t efpid;
+// std::map<uint64_t, std::string> fileMap;
+ std::vector<std::string> dirList;
+ jdir::read_dir(_jdir.dirname(), dirList, false, true, false, true);
+ for (std::vector<std::string>::iterator i = dirList.begin(); i != dirList.end(); ++i) {
+ rcvr_read_jfile(*i, &fh, headerQueueName);
+ if (headerQueueName.compare(_jid) != 0) {
+ std::ostringstream oss;
+ oss << "Journal file " << (*i) << " belongs to queue \"" << headerQueueName << "\": ignoring";
+ log(LOG_WARN, _jid, oss.str());
+ } else {
+ _rcvdat._fm[fh._file_number] = *i;
+ efpid.first = fh._efp_partition;
+ efpid.second = fh._file_size_kib;
+ }
}
- catch (const jexception& e)
- {
- if (e.err_code() != jerrno::JERR_JINF_JDATEMPTY) throw;
+ _rcvdat._jfl.clear();
+ for (std::map<uint64_t, std::string>::iterator i=_rcvdat._fm.begin(); i!=_rcvdat._fm.end(); ++i) {
+ _rcvdat._jfl.push_back(i->second);
}
+ _rcvdat._enq_cnt_list.resize(_rcvdat._jfl.size(), 0);
+ _emptyFilePoolPtr = efpmp->getEmptyFilePool(efpid);
+}
+
+
+void jcntl::rcvr_janalyze(const std::vector<std::string>* prep_txn_list_ptr, EmptyFilePoolManager* efpmp) {
+ // Analyze file headers of existing journal files
+ rcvr_analyze_fhdrs(efpmp);
// Restore all read and write pointers and transactions
- if (!rd._jempty)
+ if (!_rcvdat._jempty)
{
- uint16_t fid = rd._ffid;
+ uint16_t fid = 0;
std::ifstream ifs;
- bool lowi = rd._owi; // local copy of owi to be used during analysis
- while (rcvr_get_next_record(fid, &ifs, lowi, rd)) ;
+ //bool lowi = rd._owi; // local copy of owi to be used during analysis
+ while (rcvr_get_next_record(fid, &ifs)) ;
if (ifs.is_open()) ifs.close();
// Remove all txns from tmap that are not in the prepared list
@@ -645,7 +675,7 @@ jcntl::rcvr_janalyze(rcvdat& /*rd*/, con
for (tdl_itr i=tdl.begin(); i<tdl.end(); i++)
{
if (i->_enq_flag) // enq op - decrement enqueue count
- rd._enq_cnt_list[i->_pfid]--;
+ _rcvdat._enq_cnt_list[i->_pfid]--;
else if (_emap.is_enqueued(i->_drid, true)) // deq op - unlock enq record
{
int16_t ret = _emap.unlock(i->_drid);
@@ -662,18 +692,14 @@ jcntl::rcvr_janalyze(rcvdat& /*rd*/, con
}
}
- // Check for file full condition - add one to _jfsize_sblks to account for file header
- rd._lffull = rd._eo == (1 + _jfsize_sblks) * JRNL_SBLK_SIZE;
-
- // Check for journal full condition
- uint16_t next_wr_fid = (rd._lfid + 1) % rd._njf;
- rd._jfull = rd._ffid == next_wr_fid && rd._enq_cnt_list[next_wr_fid] && rd._lffull;
+ // Check for file full condition
+ _rcvdat._lffull = _rcvdat._eo == _emptyFilePoolPtr->fileSize_kib() * 1024;
}
-*/
}
+
bool
-jcntl::rcvr_get_next_record(uint16_t& fid, std::ifstream* ifsp/*, bool& lowi*/, rcvdat& rd)
+jcntl::rcvr_get_next_record(uint16_t& fid, std::ifstream* ifsp)
{
std::size_t cum_size_read = 0;
void* xidp = 0;
@@ -685,7 +711,7 @@ jcntl::rcvr_get_next_record(uint16_t& fi
{
if (!ifsp->is_open())
{
- if (!jfile_cycle(fid, ifsp/*, lowi*/, rd, true))
+ if (!jfile_cycle(fid, ifsp, true))
return false;
}
file_pos = ifsp->tellg();
@@ -694,7 +720,7 @@ jcntl::rcvr_get_next_record(uint16_t& fi
hdr_ok = true;
else
{
- if (!jfile_cycle(fid, ifsp/*, lowi*/, rd, true))
+ if (!jfile_cycle(fid, ifsp, true))
return false;
}
}
@@ -703,13 +729,14 @@ jcntl::rcvr_get_next_record(uint16_t& fi
{
case QLS_ENQ_MAGIC:
{
+ std::cout << " e" << std::flush;
enq_rec er;
uint16_t start_fid = fid; // fid may increment in decode() if record folds over file boundary
- if (!decode(er, fid, ifsp, cum_size_read, h/*, lowi*/, rd, file_pos))
+ if (!decode(er, fid, ifsp, cum_size_read, h, file_pos))
return false;
if (!er.is_transient()) // Ignore transient msgs
{
- rd._enq_cnt_list[start_fid]++;
+ _rcvdat._enq_cnt_list[start_fid]++;
if (er.xid_size())
{
er.get_xid(&xidp);
@@ -726,7 +753,7 @@ jcntl::rcvr_get_next_record(uint16_t& fi
}
else
{
- if (_emap.insert_pfid(h._rid, start_fid) < enq_map::EMAP_OK) // fail
+ if (_emap.insert_pfid(h._rid, start_fid, file_pos) < enq_map::EMAP_OK) // fail
{
// The only error code emap::insert_pfid() returns is enq_map::EMAP_DUP_RID.
std::ostringstream oss;
@@ -739,9 +766,10 @@ jcntl::rcvr_get_next_record(uint16_t& fi
break;
case QLS_DEQ_MAGIC:
{
+ std::cout << " d" << std::flush;
deq_rec dr;
uint16_t start_fid = fid; // fid may increment in decode() if record folds over file boundary
- if (!decode(dr, fid, ifsp, cum_size_read, h/*, lowi*/, rd, file_pos))
+ if (!decode(dr, fid, ifsp, cum_size_read, h, file_pos))
return false;
if (dr.xid_size())
{
@@ -762,16 +790,17 @@ jcntl::rcvr_get_next_record(uint16_t& fi
}
else
{
- int16_t enq_fid = _emap.get_remove_pfid(dr.deq_rid(), true);
- if (enq_fid >= enq_map::EMAP_OK) // ignore not found error
- rd._enq_cnt_list[enq_fid]--;
+ int16_t enq_fid;
+ if (_emap.get_remove_pfid(dr.deq_rid(), enq_fid, true) == enq_map::EMAP_OK) // ignore not found error
+ _rcvdat._enq_cnt_list[enq_fid]--;
}
}
break;
case QLS_TXA_MAGIC:
{
+ std::cout << " a" << std::flush;
txn_rec ar;
- if (!decode(ar, fid, ifsp, cum_size_read, h/*, lowi*/, rd, file_pos))
+ if (!decode(ar, fid, ifsp, cum_size_read, h, file_pos))
return false;
// Delete this txn from tmap, unlock any locked records in emap
ar.get_xid(&xidp);
@@ -781,7 +810,7 @@ jcntl::rcvr_get_next_record(uint16_t& fi
for (tdl_itr itr = tdl.begin(); itr != tdl.end(); itr++)
{
if (itr->_enq_flag)
- rd._enq_cnt_list[itr->_pfid]--;
+ _rcvdat._enq_cnt_list[itr->_pfid]--;
else
_emap.unlock(itr->_drid); // ignore not found error
}
@@ -790,8 +819,9 @@ jcntl::rcvr_get_next_record(uint16_t& fi
break;
case QLS_TXC_MAGIC:
{
+ std::cout << " t" << std::flush;
txn_rec cr;
- if (!decode(cr, fid, ifsp, cum_size_read, h/*, lowi*/, rd, file_pos))
+ if (!decode(cr, fid, ifsp, cum_size_read, h, file_pos))
return false;
// Delete this txn from tmap, process records into emap
cr.get_xid(&xidp);
@@ -802,7 +832,7 @@ jcntl::rcvr_get_next_record(uint16_t& fi
{
if (itr->_enq_flag) // txn enqueue
{
- if (_emap.insert_pfid(itr->_rid, itr->_pfid) < enq_map::EMAP_OK) // fail
+ if (_emap.insert_pfid(itr->_rid, itr->_pfid, file_pos) < enq_map::EMAP_OK) // fail
{
// The only error code emap::insert_pfid() returns is enq_map::EMAP_DUP_RID.
std::ostringstream oss;
@@ -812,9 +842,9 @@ jcntl::rcvr_get_next_record(uint16_t& fi
}
else // txn dequeue
{
- int16_t enq_fid = _emap.get_remove_pfid(itr->_drid, true);
- if (enq_fid >= enq_map::EMAP_OK)
- rd._enq_cnt_list[enq_fid]--;
+ int16_t enq_fid;
+ if (_emap.get_remove_pfid(itr->_drid, enq_fid, true) == enq_map::EMAP_OK) // ignore not found error
+ _rcvdat._enq_cnt_list[enq_fid]--;
}
}
std::free(xidp);
@@ -822,32 +852,40 @@ jcntl::rcvr_get_next_record(uint16_t& fi
break;
case QLS_EMPTY_MAGIC:
{
+ std::cout << " x" << std::flush;
uint32_t rec_dblks = jrec::size_dblks(sizeof(rec_hdr_t));
- ifsp->ignore(rec_dblks * JRNL_DBLK_SIZE - sizeof(rec_hdr_t));
+ ifsp->ignore(rec_dblks * JRNL_DBLK_SIZE_BYTES - sizeof(rec_hdr_t));
assert(!ifsp->fail() && !ifsp->bad());
- if (!jfile_cycle(fid, ifsp/*, lowi*/, rd, false))
+ if (!jfile_cycle(fid, ifsp, false))
return false;
}
break;
case 0:
- check_journal_alignment(fid, file_pos, rd);
+ std::cout << " 0" << std::endl << std::flush;
+ check_journal_alignment(fid, file_pos);
return false;
default:
+ std::cout << " ?" << std::endl << std::flush;
// Stop as this is the overwrite boundary.
- check_journal_alignment(fid, file_pos, rd);
+ check_journal_alignment(fid, file_pos);
return false;
}
return true;
}
+
bool
jcntl::decode(jrec& rec, uint16_t& fid, std::ifstream* ifsp, std::size_t& cum_size_read,
- rec_hdr_t& h/*, bool& lowi*/, rcvdat& rd, std::streampos& file_offs)
+ rec_hdr_t& h, std::streampos& file_offs)
{
uint16_t start_fid = fid;
std::streampos start_file_offs = file_offs;
-// if (!check_owi(fid, h, lowi, rd, file_offs))
-// return false;
+
+ if (_rcvdat._h_rid == 0)
+ _rcvdat._h_rid = h._rid;
+ else if (h._rid - _rcvdat._h_rid < 0x8000000000000000ULL) // RFC 1982 comparison for unsigned 64-bit
+ _rcvdat._h_rid = h._rid;
+
bool done = false;
while (!done)
{
@@ -860,64 +898,60 @@ jcntl::decode(jrec& rec, uint16_t& fid,
// fid != (rd._ffid ? rd._ffid - 1 : _num_jfiles - 1)) throw;
// Tried this, but did not work
// if (e.err_code() != jerrno::JERR_JREC_BADRECTAIL || h._magic != 0) throw;
- check_journal_alignment(start_fid, start_file_offs, rd);
+ check_journal_alignment(start_fid, start_file_offs);
// rd._lfid = start_fid;
return false;
}
- if (!done && !jfile_cycle(fid, ifsp/*, lowi*/, rd, false))
+ if (!done && !jfile_cycle(fid, ifsp, /*lowi, rd,*/ false))
{
- check_journal_alignment(start_fid, start_file_offs, rd);
+ check_journal_alignment(start_fid, start_file_offs);
return false;
}
}
return true;
}
+
bool
-jcntl::jfile_cycle(uint16_t& fid, std::ifstream* ifsp/*, bool& lowi*/, rcvdat& rd, const bool jump_fro)
+jcntl::jfile_cycle(uint16_t& fid, std::ifstream* ifsp, const bool jump_fro)
{
if (ifsp->is_open())
{
if (ifsp->eof() || !ifsp->good())
{
ifsp->clear();
- rd._eo = ifsp->tellg(); // remember file offset before closing
- assert(rd._eo != std::numeric_limits<std::size_t>::max()); // Check for error code -1
+ _rcvdat._eo = ifsp->tellg(); // remember file offset before closing
+ assert(_rcvdat._eo != std::numeric_limits<std::size_t>::max()); // Check for error code -1
ifsp->close();
- if (++fid >= rd._njf)
- {
- fid = 0;
-// lowi = !lowi; // Flip local owi
- }
- if (fid == rd._ffid) // used up all journal files
+ if (++fid == _rcvdat._jfl.size()) // used up all known journal files
return false;
}
}
if (!ifsp->is_open())
{
- std::ostringstream oss;
- oss << _jdir.dirname() << "/" /*<< _base_filename*/ << "."; // TODO - linear journal name
- oss << std::hex << std::setfill('0') << std::setw(4) << fid << QLS_JRNL_FILE_EXTENSION;
ifsp->clear(); // clear eof flag, req'd for older versions of c++
- ifsp->open(oss.str().c_str(), std::ios_base::in | std::ios_base::binary);
+ ifsp->open(_rcvdat._jfl[fid].c_str(), std::ios_base::in | std::ios_base::binary);
if (!ifsp->good())
- throw jexception(jerrno::JERR__FILEIO, oss.str(), "jcntl", "jfile_cycle");
+ throw jexception(jerrno::JERR__FILEIO, _rcvdat._jfl[fid], "jcntl", "jfile_cycle");
// Read file header
+ std::cout << " F" << fid << std::flush;
file_hdr_t fhdr;
ifsp->read((char*)&fhdr, sizeof(fhdr));
assert(ifsp->good());
if (fhdr._rhdr._magic == QLS_FILE_MAGIC)
{
-// assert(fhdr._lfid == fid);
- if (!rd._fro)
- rd._fro = fhdr._fro;
- std::streamoff foffs = jump_fro ? fhdr._fro : JRNL_SBLK_SIZE;
+ if (!_rcvdat._fro)
+ _rcvdat._fro = fhdr._fro;
+ std::streamoff foffs = jump_fro ? fhdr._fro : JRNL_SBLK_SIZE_BYTES;
ifsp->seekg(foffs);
}
else
{
ifsp->close();
+ if (fid == 0) {
+ _rcvdat._jempty = true;
+ }
return false;
}
}
@@ -925,46 +959,17 @@ jcntl::jfile_cycle(uint16_t& fid, std::i
}
-/*
-bool
-jcntl::check_owi(const uint16_t fid, rec_hdr& h, bool& lowi, rcvdat& rd, std::streampos& file_pos)
-{
- if (rd._ffid ? h.get_owi() == lowi : h.get_owi() != lowi) // Overwrite indicator changed
- {
- uint16_t expected_fid = rd._ffid ? rd._ffid - 1 : rd._njf - 1;
- if (fid == expected_fid)
- {
- check_journal_alignment(fid, file_pos, rd);
- return false;
- }
- std::ostringstream oss;
- oss << std::hex << std::setfill('0') << "Magic=0x" << std::setw(8) << h._magic;
- oss << " fid=0x" << std::setw(4) << fid << " rid=0x" << std::setw(8) << h._rid;
- oss << " foffs=0x" << std::setw(8) << file_pos;
- oss << " expected_fid=0x" << std::setw(4) << expected_fid;
- throw jexception(jerrno::JERR_JCNTL_OWIMISMATCH, oss.str(), "jcntl",
- "check_owi");
- }
- if (rd._h_rid == 0)
- rd._h_rid = h._rid;
- else if (h._rid - rd._h_rid < 0x8000000000000000ULL) // RFC 1982 comparison for unsigned 64-bit
- rd._h_rid = h._rid;
- return true;
-}
-*/
-
-
void
-jcntl::check_journal_alignment(const uint16_t fid, std::streampos& file_pos, rcvdat& rd)
+jcntl::check_journal_alignment(const uint16_t fid, std::streampos& file_pos/*, rcvdat& rd*/)
{
- unsigned sblk_offs = file_pos % JRNL_SBLK_SIZE;
+ unsigned sblk_offs = file_pos % JRNL_SBLK_SIZE_BYTES;
if (sblk_offs)
{
{
std::ostringstream oss;
oss << std::hex << "Bad record alignment found at fid=0x" << fid;
oss << " offs=0x" << file_pos << " (likely journal overwrite boundary); " << std::dec;
- oss << (JRNL_SBLK_SIZE_DBLKS - (sblk_offs/JRNL_DBLK_SIZE)) << " filler record(s) required.";
+ oss << (JRNL_SBLK_SIZE_DBLKS - (sblk_offs/JRNL_DBLK_SIZE_BYTES)) << " filler record(s) required.";
this->log(LOG_WARN, _jid, oss.str());
}
const uint32_t xmagic = QLS_EMPTY_MAGIC;
@@ -976,17 +981,17 @@ jcntl::check_journal_alignment(const uin
if (!ofsp.good())
throw jexception(jerrno::JERR__FILEIO, oss.str(), "jcntl", "check_journal_alignment");
ofsp.seekp(file_pos);
- void* buff = std::malloc(JRNL_DBLK_SIZE);
+ void* buff = std::malloc(JRNL_DBLK_SIZE_BYTES);
assert(buff != 0);
std::memcpy(buff, (const void*)&xmagic, sizeof(xmagic));
// Normally, RHM_CLEAN must be set before these fills are done, but this is a recover
// situation (i.e. performance is not an issue), and it makes the location of the write
// clear should inspection of the file be required.
- std::memset((char*)buff + sizeof(xmagic), QLS_CLEAN_CHAR, JRNL_DBLK_SIZE - sizeof(xmagic));
+ std::memset((char*)buff + sizeof(xmagic), QLS_CLEAN_CHAR, JRNL_DBLK_SIZE_BYTES - sizeof(xmagic));
- while (file_pos % JRNL_SBLK_SIZE)
+ while (file_pos % JRNL_SBLK_SIZE_BYTES)
{
- ofsp.write((const char*)buff, JRNL_DBLK_SIZE);
+ ofsp.write((const char*)buff, JRNL_DBLK_SIZE_BYTES);
assert(!ofsp.fail());
std::ostringstream oss;
oss << std::hex << "Recover phase write: Wrote filler record: fid=0x" << fid << " offs=0x" << file_pos;
@@ -995,12 +1000,9 @@ jcntl::check_journal_alignment(const uin
}
ofsp.close();
std::free(buff);
- rd._lfid = fid;
-// if (!rd._frot)
-// rd._ffid = (fid + 1) % rd._njf;
this->log(LOG_INFO, _jid, "Bad record alignment fixed.");
}
- rd._eo = file_pos;
+ _rcvdat._eo = file_pos;
}
}}
Modified: qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/jcntl.h
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/jcntl.h?rev=1530024&r1=1530023&r2=1530024&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/jcntl.h (original)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/jcntl.h Mon Oct 7 18:39:24 2013
@@ -19,8 +19,8 @@
*
*/
-#ifndef QPID_LEGACYSTORE_JRNL_JCNTL_H
-#define QPID_LEGACYSTORE_JRNL_JCNTL_H
+#ifndef QPID_LINEARSTORE_JRNL_JCNTL_H
+#define QPID_LINEARSTORE_JRNL_JCNTL_H
namespace qpid
{
@@ -31,6 +31,7 @@ namespace qls_jrnl
#include <cstddef>
#include <deque>
+#include <qpid/linearstore/jrnl/LinearFileController.h>
#include <qpid/linearstore/jrnl/JournalLog.h>
#include "qpid/linearstore/jrnl/jdir.h"
//#include "qpid/linearstore/jrnl/fcntl.h"
@@ -46,8 +47,8 @@ namespace qpid
{
namespace qls_jrnl
{
-class EmptyFilePool;
-class JournalFileController;
+ class EmptyFilePool;
+ class EmptyFilePoolManager;
/**
* \brief Access and control interface for the journal. This is the top-level class for the
@@ -127,17 +128,14 @@ class JournalFileController;
//bool _autostop; ///< Autostop flag - stops journal when overrun occurs
// Journal control structures
- JournalFileController* _jfcp;///< Journal File Controller
- //uint32_t _jfsize_sblks; ///< Journal file size in sblks
- //lpmgr _lpmgr; ///< LFID-PFID manager tracks inserted journal files
- enq_map _emap; ///< Enqueue map for low water mark management
- txn_map _tmap; ///< Transaction map open transactions
- //rrfc _rrfc; ///< Read journal rotating file controller
- //wrfc _wrfc; ///< Write journal rotating file controller
- rmgr _rmgr; ///< Read page manager which manages AIO
- wmgr _wmgr; ///< Write page manager which manages AIO
- rcvdat _rcvdat; ///< Recovery data used for recovery
- smutex _wr_mutex; ///< Mutex for journal writes
+ LinearFileController _linearFileController; ///< Linear File Controller
+ EmptyFilePool* _emptyFilePoolPtr; ///< Pointer to Empty File Pool for this queue
+ enq_map _emap; ///< Enqueue map for low water mark management
+ txn_map _tmap; ///< Transaction map open transactions
+ //rmgr _rmgr; ///< Read page manager which manages AIO
+ wmgr _wmgr; ///< Write page manager which manages AIO
+ rcvdat _rcvdat; ///< Recovery data used for recovery
+ smutex _wr_mutex; ///< Mutex for journal writes
public:
static timespec _aio_cmpl_timeout; ///< Timeout for blocking libaio returns
@@ -230,9 +228,12 @@ class JournalFileController;
*
* \exception TODO
*/
- void recover(/*const uint16_t num_jfiles, const bool auto_expand, const uint16_t ae_max_jfiles,
- const uint32_t jfsize_sblks,*/ const uint16_t wcache_num_pages, const uint32_t wcache_pgsize_sblks,
- aio_callback* const cbp, const std::vector<std::string>* prep_txn_list_ptr, uint64_t& highest_rid);
+ void recover(EmptyFilePoolManager* efpm,
+ const uint16_t wcache_num_pages,
+ const uint32_t wcache_pgsize_sblks,
+ aio_callback* const cbp,
+ const std::vector<std::string>* prep_txn_list_ptr,
+ uint64_t& highest_rid);
/**
* \brief Notification to the journal that recovery is complete and that normal operation
@@ -532,7 +533,7 @@ class JournalFileController;
* operations, but if these operations cease, then this call needs to be made to force the
* processing of any outstanding AIO operations.
*/
- int32_t get_rd_events(timespec* const timeout);
+// int32_t get_rd_events(timespec* const timeout);
/**
* \brief Stop the journal from accepting any further requests to read or write data.
@@ -554,11 +555,11 @@ class JournalFileController;
*/
iores flush(const bool block_till_aio_cmpl = false);
- inline uint32_t get_enq_cnt() const { return _emap.size(); }
+ inline uint32_t get_enq_cnt() const { return _emap.size(); } // TODO: Thread safe?
inline uint32_t get_wr_aio_evt_rem() const { slock l(_wr_mutex); return _wmgr.get_aio_evt_rem(); }
- inline uint32_t get_rd_aio_evt_rem() const { return _rmgr.get_aio_evt_rem(); }
+// inline uint32_t get_rd_aio_evt_rem() const { return _rmgr.get_aio_evt_rem(); }
inline uint32_t get_wr_outstanding_aio_dblks() const;
/*{ return _wrfc.aio_outstanding_dblks(); }*/
@@ -575,6 +576,7 @@ class JournalFileController;
// inline uint16_t get_rd_fid() const { return _rrfc.index(); }
// inline uint16_t get_wr_fid() const { return _wrfc.index(); }
// uint16_t get_earliest_fid();
+ LinearFileController& getLinearFileControllerRef();
/**
* \brief Check if a particular rid is enqueued. Note that this function will return
@@ -692,22 +694,25 @@ class JournalFileController;
/**
* \brief Analyze journal for recovery.
*/
- void rcvr_janalyze(rcvdat& rd, const std::vector<std::string>* prep_txn_list_ptr);
+ static void rcvr_read_jfile(const std::string& jfn, ::file_hdr_t* fh, std::string& queueName);
- bool rcvr_get_next_record(uint16_t& fid, std::ifstream* ifsp/*, bool& lowi*/, rcvdat& rd);
+ void rcvr_analyze_fhdrs(EmptyFilePoolManager* efpmp);
+
+ void rcvr_janalyze(const std::vector<std::string>* prep_txn_list_ptr, EmptyFilePoolManager* efpmp);
+
+ bool rcvr_get_next_record(uint16_t& fid, std::ifstream* ifsp/*, bool& lowi, rcvdat& rd*/);
bool decode(jrec& rec, uint16_t& fid, std::ifstream* ifsp, std::size_t& cum_size_read,
- rec_hdr_t& h/*, bool& lowi*/, rcvdat& rd, std::streampos& rec_offset);
+ rec_hdr_t& h, /*bool& lowi, rcvdat& rd,*/ std::streampos& rec_offset);
- bool jfile_cycle(uint16_t& fid, std::ifstream* ifsp/*, bool& lowi*/, rcvdat& rd,
- const bool jump_fro);
+ bool jfile_cycle(uint16_t& fid, std::ifstream* ifsp, /*bool& lowi, rcvdat& rd,*/ const bool jump_fro);
//bool check_owi(const uint16_t fid, rec_hdr_t& h, bool& lowi, rcvdat& rd,
// std::streampos& read_pos);
- void check_journal_alignment(const uint16_t fid, std::streampos& rec_offset, rcvdat& rd);
+ void check_journal_alignment(const uint16_t fid, std::streampos& rec_offset/*, rcvdat& rd*/);
};
}}
-#endif // ifndef QPID_LEGACYSTORE_JRNL_JCNTL_H
+#endif // ifndef QPID_LINEARSTORE_JRNL_JCNTL_H
Modified: qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/jdir.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/jdir.cpp?rev=1530024&r1=1530023&r2=1530024&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/jdir.cpp (original)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/jdir.cpp Mon Oct 7 18:39:24 2013
@@ -409,7 +409,7 @@ jdir::exists(const std::string& name)
}
void
-jdir::read_dir(const std::string& name, std::vector<std::string>& dir_list, const bool incl_dirs, const bool incl_files, const bool incl_links) {
+jdir::read_dir(const std::string& name, std::vector<std::string>& dir_list, const bool incl_dirs, const bool incl_files, const bool incl_links, const bool return_fqfn) {
struct stat s;
if (is_dir(name)) {
DIR* dir = ::opendir(name.c_str());
@@ -425,8 +425,13 @@ jdir::read_dir(const std::string& name,
oss << "stat: file=\"" << full_name << "\"" << FORMAT_SYSERR(errno);
throw jexception(jerrno::JERR_JDIR_STAT, oss.str(), "jdir", "delete_dir");
}
- if ((S_ISREG(s.st_mode) && incl_files) || (S_ISDIR(s.st_mode) && incl_dirs) || (S_ISLNK(s.st_mode) && incl_links))
- dir_list.push_back(entry->d_name);
+ if ((S_ISREG(s.st_mode) && incl_files) || (S_ISDIR(s.st_mode) && incl_dirs) || (S_ISLNK(s.st_mode) && incl_links)) {
+ if (return_fqfn) {
+ dir_list.push_back(name + "/" + entry->d_name);
+ } else {
+ dir_list.push_back(entry->d_name);
+ }
+ }
}
}
}
Modified: qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/jdir.h
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/jdir.h?rev=1530024&r1=1530023&r2=1530024&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/jdir.h (original)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/jdir.h Mon Oct 7 18:39:24 2013
@@ -336,7 +336,7 @@ namespace qls_jrnl
*/
static bool exists(const std::string& name);
- static void read_dir(const std::string& name, std::vector<std::string>& dir_list, const bool incl_dirs, const bool incl_files, const bool incl_links);
+ static void read_dir(const std::string& name, std::vector<std::string>& dir_list, const bool incl_dirs, const bool incl_files, const bool incl_links, const bool return_fqfn);
/**
* \brief Stream operator
Modified: qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/jerrno.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/jerrno.cpp?rev=1530024&r1=1530023&r2=1530024&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/jerrno.cpp (original)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/jerrno.cpp Mon Oct 7 18:39:24 2013
@@ -42,6 +42,7 @@ const uint32_t jerrno::JERR__TIMEOUT
const uint32_t jerrno::JERR__UNEXPRESPONSE = 0x0108;
const uint32_t jerrno::JERR__RECNFOUND = 0x0109;
const uint32_t jerrno::JERR__NOTIMPL = 0x010a;
+const uint32_t jerrno::JERR__NULL = 0x010b;
// class jcntl
const uint32_t jerrno::JERR_JCNTL_STOPPED = 0x0200;
@@ -49,8 +50,11 @@ const uint32_t jerrno::JERR_JCNTL_READON
const uint32_t jerrno::JERR_JCNTL_AIOCMPLWAIT = 0x0202;
const uint32_t jerrno::JERR_JCNTL_UNKNOWNMAGIC = 0x0203;
const uint32_t jerrno::JERR_JCNTL_NOTRECOVERED = 0x0204;
-const uint32_t jerrno::JERR_JCNTL_RECOVERJFULL = 0x0205;
-const uint32_t jerrno::JERR_JCNTL_OWIMISMATCH = 0x0206;
+const uint32_t jerrno::JERR_JCNTL_OPENRD = 0x0205;
+const uint32_t jerrno::JERR_JCNTL_READ = 0x0206;
+const uint32_t jerrno::JERR_JCNTL_ENQSTATE = 0x0207;
+const uint32_t jerrno::JERR_JCNTL_INVALIDENQHDR = 0x0208;
+
// class jdir
const uint32_t jerrno::JERR_JDIR_NOTDIR = 0x0300;
@@ -65,21 +69,14 @@ const uint32_t jerrno::JERR_JDIR_STAT
const uint32_t jerrno::JERR_JDIR_UNLINK = 0x0309;
const uint32_t jerrno::JERR_JDIR_BADFTYPE = 0x030a;
-// class fcntl
-//const uint32_t jerrno::JERR_FCNTL_OPENWR = 0x0400;
-//const uint32_t jerrno::JERR_FCNTL_WRITE = 0x0401;
-//const uint32_t jerrno::JERR_FCNTL_CLOSE = 0x0402;
-//const uint32_t jerrno::JERR_FCNTL_FILEOFFSOVFL = 0x0403;
-//const uint32_t jerrno::JERR_FCNTL_CMPLOFFSOVFL = 0x0404;
-//const uint32_t jerrno::JERR_FCNTL_RDOFFSOVFL = 0x0405;
-
-// class lfmgr
-//const uint32_t jerrno::JERR_LFMGR_BADAEFNUMLIM = 0x0500;
-//const uint32_t jerrno::JERR_LFMGR_AEFNUMLIMIT = 0x0501;
-//const uint32_t jerrno::JERR_LFMGR_AEDISABLED = 0x0502;
+// class JournalFile
+const uint32_t jerrno::JERR_JNLF_OPEN = 0x0400;
+const uint32_t jerrno::JERR_JNLF_CLOSE = 0x0401;
+const uint32_t jerrno::JERR_JNLF_FILEOFFSOVFL = 0x0402;
+const uint32_t jerrno::JERR_JNLF_CMPLOFFSOVFL = 0x0403;
-// class rrfc
-//const uint32_t jerrno::JERR_RRFC_OPENRD = 0x0600;
+// class LinearFileController
+const uint32_t jerrno::JERR_LFCR_SEQNUMNOTFOUND = 0x0500;
// class jrec, enq_rec, deq_rec, txn_rec
const uint32_t jerrno::JERR_JREC_BADRECHDR = 0x0700;
@@ -91,13 +88,14 @@ const uint32_t jerrno::JERR_WMGR_BADDTOK
const uint32_t jerrno::JERR_WMGR_ENQDISCONT = 0x0803;
const uint32_t jerrno::JERR_WMGR_DEQDISCONT = 0x0804;
const uint32_t jerrno::JERR_WMGR_DEQRIDNOTENQ = 0x0805;
+const uint32_t jerrno::JERR_WMGR_BADFH = 0x0806;
-// class rmgr
-const uint32_t jerrno::JERR_RMGR_UNKNOWNMAGIC = 0x0900;
-const uint32_t jerrno::JERR_RMGR_RIDMISMATCH = 0x0901;
-//const uint32_t jerrno::JERR_RMGR_FIDMISMATCH = 0x0902;
-const uint32_t jerrno::JERR_RMGR_ENQSTATE = 0x0903;
-const uint32_t jerrno::JERR_RMGR_BADRECTYPE = 0x0904;
+//// class rmgr
+//const uint32_t jerrno::JERR_RMGR_UNKNOWNMAGIC = 0x0900;
+//const uint32_t jerrno::JERR_RMGR_RIDMISMATCH = 0x0901;
+////const uint32_t jerrno::JERR_RMGR_FIDMISMATCH = 0x0902;
+//const uint32_t jerrno::JERR_RMGR_ENQSTATE = 0x0903;
+//const uint32_t jerrno::JERR_RMGR_BADRECTYPE = 0x0904;
// class data_tok
const uint32_t jerrno::JERR_DTOK_ILLEGALSTATE = 0x0a00;
@@ -108,19 +106,6 @@ const uint32_t jerrno::JERR_MAP_DUPLICAT
const uint32_t jerrno::JERR_MAP_NOTFOUND = 0x0b01;
const uint32_t jerrno::JERR_MAP_LOCKED = 0x0b02;
-// class jinf
-//const uint32_t jerrno::JERR_JINF_CVALIDFAIL = 0x0c00;
-//const uint32_t jerrno::JERR_JINF_NOVALUESTR = 0x0c01;
-//const uint32_t jerrno::JERR_JINF_BADVALUESTR = 0x0c02;
-//const uint32_t jerrno::JERR_JINF_JDATEMPTY = 0x0c03;
-//const uint32_t jerrno::JERR_JINF_TOOMANYFILES = 0x0c04;
-//const uint32_t jerrno::JERR_JINF_INVALIDFHDR = 0x0c05;
-//const uint32_t jerrno::JERR_JINF_STAT = 0x0c06;
-//const uint32_t jerrno::JERR_JINF_NOTREGFILE = 0x0c07;
-//const uint32_t jerrno::JERR_JINF_BADFILESIZE = 0x0c08;
-//const uint32_t jerrno::JERR_JINF_OWIBAD = 0x0c09;
-//const uint32_t jerrno::JERR_JINF_ZEROLENFILE = 0x0c0a;
-
// EFP errors
const uint32_t jerrno::JERR_EFP_BADPARTITIONNAME = 0x0d01;
const uint32_t jerrno::JERR_EFP_BADPARTITIONDIR = 0x0d02;
@@ -150,6 +135,7 @@ jerrno::__init()
_err_map[JERR__UNEXPRESPONSE] = "JERR__UNEXPRESPONSE: Unexpected response to call or event.";
_err_map[JERR__RECNFOUND] = "JERR__RECNFOUND: Record not found.";
_err_map[JERR__NOTIMPL] = "JERR__NOTIMPL: Not implemented";
+ _err_map[JERR__NULL] = "JERR__NULL: Operation on null pointer";
// class jcntl
_err_map[JERR_JCNTL_STOPPED] = "JERR_JCNTL_STOPPED: Operation on stopped journal.";
@@ -157,8 +143,10 @@ jerrno::__init()
_err_map[JERR_JCNTL_AIOCMPLWAIT] = "JERR_JCNTL_AIOCMPLWAIT: Timeout waiting for AIOs to complete.";
_err_map[JERR_JCNTL_UNKNOWNMAGIC] = "JERR_JCNTL_UNKNOWNMAGIC: Found record with unknown magic.";
_err_map[JERR_JCNTL_NOTRECOVERED] = "JERR_JCNTL_NOTRECOVERED: Operation requires recover() to be run first.";
- _err_map[JERR_JCNTL_RECOVERJFULL] = "JERR_JCNTL_RECOVERJFULL: Journal data files full, cannot write.";
- _err_map[JERR_JCNTL_OWIMISMATCH] = "JERR_JCNTL_OWIMISMATCH: Overwrite Indicator (OWI) change found in unexpected location.";
+ _err_map[JERR_JCNTL_OPENRD] = "JERR_JCNTL_OPENRD: Unable to open file for write";
+ _err_map[JERR_JCNTL_READ] = "JERR_JCNTL_READ: Read error: no or insufficient data to read";
+ _err_map[JERR_JCNTL_ENQSTATE] = "JERR_JCNTL_ENQSTATE: Read error: Record not in ENQ state";
+ _err_map[JERR_JCNTL_INVALIDENQHDR] = "JERR_JCNTL_INVALIDENQHDR: Invalid ENQ header";
// class jdir
_err_map[JERR_JDIR_NOTDIR] = "JERR_JDIR_NOTDIR: Directory name exists but is not a directory.";
@@ -173,21 +161,14 @@ jerrno::__init()
_err_map[JERR_JDIR_UNLINK] = "JERR_JDIR_UNLINK: File delete failed.";
_err_map[JERR_JDIR_BADFTYPE] = "JERR_JDIR_BADFTYPE: Bad or unknown file type (stat mode).";
- // class fcntl
-// _err_map[JERR_FCNTL_OPENWR] = "JERR_FCNTL_OPENWR: Unable to open file for write.";
-// _err_map[JERR_FCNTL_WRITE] = "JERR_FCNTL_WRITE: Unable to write to file.";
-// _err_map[JERR_FCNTL_CLOSE] = "JERR_FCNTL_CLOSE: File close failed.";
-// _err_map[JERR_FCNTL_FILEOFFSOVFL] = "JERR_FCNTL_FILEOFFSOVFL: Attempted increase file offset past file size.";
-// _err_map[JERR_FCNTL_CMPLOFFSOVFL] = "JERR_FCNTL_CMPLOFFSOVFL: Attempted increase completed file offset past submitted offset.";
-// _err_map[JERR_FCNTL_RDOFFSOVFL] = "JERR_FCNTL_RDOFFSOVFL: Attempted increase read offset past write offset.";
-
- // class lfmgr
-// _err_map[JERR_LFMGR_BADAEFNUMLIM] = "JERR_LFMGR_BADAEFNUMLIM: Auto-expand file number limit lower than initial number of journal files.";
-// _err_map[JERR_LFMGR_AEFNUMLIMIT] = "JERR_LFMGR_AEFNUMLIMIT: Exceeded auto-expand file number limit.";
-// _err_map[JERR_LFMGR_AEDISABLED] = "JERR_LFMGR_AEDISABLED: Attempted to expand with auto-expand disabled.";
+ // class JournalFile
+ _err_map[JERR_JNLF_OPEN] = "JERR_JNLF_OPEN: Unable to open file for write";
+ _err_map[JERR_JNLF_CLOSE] = "JERR_JNLF_CLOSE: Unable to close file";
+ _err_map[JERR_JNLF_FILEOFFSOVFL] = "JERR_JNLF_FILEOFFSOVFL: Attempted to increase submitted offset past file size.";
+ _err_map[JERR_JNLF_CMPLOFFSOVFL] = "JERR_JNLF_CMPLOFFSOVFL: Attempted to increase completed file offset past submitted offset.";
- // class rrfc
-// _err_map[JERR_RRFC_OPENRD] = "JERR_RRFC_OPENRD: Unable to open file for read.";
+ // class LinearFileController
+ _err_map[JERR_LFCR_SEQNUMNOTFOUND] = "JERR_LFCR_SEQNUMNOTFOUND: File sequence number not found";
// class jrec, enq_rec, deq_rec, txn_rec
_err_map[JERR_JREC_BADRECHDR] = "JERR_JREC_BADRECHDR: Invalid data record header.";
@@ -199,13 +180,14 @@ jerrno::__init()
_err_map[JERR_WMGR_ENQDISCONT] = "JERR_WMGR_ENQDISCONT: Enqueued new dtok when previous enqueue returned partly completed (state ENQ_PART).";
_err_map[JERR_WMGR_DEQDISCONT] = "JERR_WMGR_DEQDISCONT: Dequeued new dtok when previous dequeue returned partly completed (state DEQ_PART).";
_err_map[JERR_WMGR_DEQRIDNOTENQ] = "JERR_WMGR_DEQRIDNOTENQ: Dequeue rid is not enqueued.";
+ _err_map[JERR_WMGR_BADFH] = "JERR_WMGR_BADFH: Bad file handle.";
- // class rmgr
- _err_map[JERR_RMGR_UNKNOWNMAGIC] = "JERR_RMGR_UNKNOWNMAGIC: Found record with unknown magic.";
- _err_map[JERR_RMGR_RIDMISMATCH] = "JERR_RMGR_RIDMISMATCH: RID mismatch between current record and dtok RID";
- //_err_map[JERR_RMGR_FIDMISMATCH] = "JERR_RMGR_FIDMISMATCH: FID mismatch between emap and rrfc";
- _err_map[JERR_RMGR_ENQSTATE] = "JERR_RMGR_ENQSTATE: Attempted read when data token wstate was not ENQ";
- _err_map[JERR_RMGR_BADRECTYPE] = "JERR_RMGR_BADRECTYPE: Attempted operation on inappropriate record type";
+// // class rmgr
+// _err_map[JERR_RMGR_UNKNOWNMAGIC] = "JERR_RMGR_UNKNOWNMAGIC: Found record with unknown magic.";
+// _err_map[JERR_RMGR_RIDMISMATCH] = "JERR_RMGR_RIDMISMATCH: RID mismatch between current record and dtok RID";
+// //_err_map[JERR_RMGR_FIDMISMATCH] = "JERR_RMGR_FIDMISMATCH: FID mismatch between emap and rrfc";
+// _err_map[JERR_RMGR_ENQSTATE] = "JERR_RMGR_ENQSTATE: Attempted read when data token wstate was not ENQ";
+// _err_map[JERR_RMGR_BADRECTYPE] = "JERR_RMGR_BADRECTYPE: Attempted operation on inappropriate record type";
// class data_tok
_err_map[JERR_DTOK_ILLEGALSTATE] = "JERR_MTOK_ILLEGALSTATE: Attempted to change to illegal state.";
@@ -216,19 +198,6 @@ jerrno::__init()
_err_map[JERR_MAP_NOTFOUND] = "JERR_MAP_NOTFOUND: Key not found in map.";
_err_map[JERR_MAP_LOCKED] = "JERR_MAP_LOCKED: Record ID locked by a pending transaction.";
- // class jinf
-// _err_map[JERR_JINF_CVALIDFAIL] = "JERR_JINF_CVALIDFAIL: Journal compatibility validation failure.";
-// _err_map[JERR_JINF_NOVALUESTR] = "JERR_JINF_NOVALUESTR: No value attribute found in jinf file.";
-// _err_map[JERR_JINF_BADVALUESTR] = "JERR_JINF_BADVALUESTR: Bad format for value attribute in jinf file";
-// _err_map[JERR_JINF_JDATEMPTY] = "JERR_JINF_JDATEMPTY: Journal data files empty.";
-// _err_map[JERR_JINF_TOOMANYFILES] = "JERR_JINF_TOOMANYFILES: Too many journal data files.";
-// _err_map[JERR_JINF_INVALIDFHDR] = "JERR_JINF_INVALIDFHDR: Invalid journal data file header";
-// _err_map[JERR_JINF_STAT] = "JERR_JINF_STAT: Error while trying to stat a journal data file";
-// _err_map[JERR_JINF_NOTREGFILE] = "JERR_JINF_NOTREGFILE: Target journal data file is not a regular file";
-// _err_map[JERR_JINF_BADFILESIZE] = "JERR_JINF_BADFILESIZE: Journal data file is of incorrect or unexpected size";
-// _err_map[JERR_JINF_OWIBAD] = "JERR_JINF_OWIBAD: Journal data files have inconsistent OWI flags; >1 transition found in non-auto-expand or min-size journal";
-// _err_map[JERR_JINF_ZEROLENFILE] = "JERR_JINF_ZEROLENFILE: Journal info file zero length";
-
// EFP errors
_err_map[JERR_EFP_BADPARTITIONNAME] = "JERR_EFP_BADPARTITIONNAME: Invalid partition name (must be \'pNNN\' where NNN is a non-zero number)";
_err_map[JERR_EFP_BADEFPDIRNAME] = "JERR_EFP_BADEFPDIRNAME: Bad Empty File Pool directory name (must be \'NNNk\', where NNN is a number which is a multiple of 4)";
Modified: qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/jerrno.h
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/jerrno.h?rev=1530024&r1=1530023&r2=1530024&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/jerrno.h (original)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/jerrno.h Mon Oct 7 18:39:24 2013
@@ -61,6 +61,7 @@ namespace qls_jrnl
static const uint32_t JERR__UNEXPRESPONSE; ///< Unexpected response to call or event
static const uint32_t JERR__RECNFOUND; ///< Record not found
static const uint32_t JERR__NOTIMPL; ///< Not implemented
+ static const uint32_t JERR__NULL; ///< Operation on null pointer
// class jcntl
static const uint32_t JERR_JCNTL_STOPPED; ///< Operation on stopped journal
@@ -68,8 +69,10 @@ namespace qls_jrnl
static const uint32_t JERR_JCNTL_AIOCMPLWAIT; ///< Timeout waiting for AIOs to complete
static const uint32_t JERR_JCNTL_UNKNOWNMAGIC; ///< Found record with unknown magic
static const uint32_t JERR_JCNTL_NOTRECOVERED; ///< Req' recover() to be called first
- static const uint32_t JERR_JCNTL_RECOVERJFULL; ///< Journal data files full, cannot write
- static const uint32_t JERR_JCNTL_OWIMISMATCH; ///< OWI change found in unexpected location
+ static const uint32_t JERR_JCNTL_OPENRD; ///< Unable to open file for read
+ static const uint32_t JERR_JCNTL_READ; ///< Read error: no or insufficient data to read
+ static const uint32_t JERR_JCNTL_ENQSTATE; ///< Read error: Record not in ENQ state
+ static const uint32_t JERR_JCNTL_INVALIDENQHDR;///< Invalid ENQ header
// class jdir
static const uint32_t JERR_JDIR_NOTDIR; ///< Exists but is not a directory
@@ -84,21 +87,14 @@ namespace qls_jrnl
static const uint32_t JERR_JDIR_UNLINK; ///< File delete failed
static const uint32_t JERR_JDIR_BADFTYPE; ///< Bad or unknown file type (stat mode)
- // class fcntl
-// static const uint32_t JERR_FCNTL_OPENWR; ///< Unable to open file for write
-// static const uint32_t JERR_FCNTL_WRITE; ///< Unable to write to file
-// static const uint32_t JERR_FCNTL_CLOSE; ///< File close failed
-// static const uint32_t JERR_FCNTL_FILEOFFSOVFL; ///< Increased offset past file size
-// static const uint32_t JERR_FCNTL_CMPLOFFSOVFL; ///< Increased cmpl offs past subm offs
-// static const uint32_t JERR_FCNTL_RDOFFSOVFL; ///< Increased read offs past write offs
-
- // class lfmgr
-// static const uint32_t JERR_LFMGR_BADAEFNUMLIM; ///< Bad auto-expand file number limit
-// static const uint32_t JERR_LFMGR_AEFNUMLIMIT; ///< Exceeded auto-expand file number limit
-// static const uint32_t JERR_LFMGR_AEDISABLED; ///< Attempted to expand with auto-expand disabled
+ // class JournalFile
+ static const uint32_t JERR_JNLF_OPEN; ///< Unable to open file for write
+ static const uint32_t JERR_JNLF_CLOSE; ///< Unable to close file
+ static const uint32_t JERR_JNLF_FILEOFFSOVFL; ///< Increased offset past file size
+ static const uint32_t JERR_JNLF_CMPLOFFSOVFL; ///< Increased cmpl offs past subm offs
- // class rrfc
-// static const uint32_t JERR_RRFC_OPENRD; ///< Unable to open file for read
+ // class LinearFileController
+ static const uint32_t JERR_LFCR_SEQNUMNOTFOUND;///< File sequence number not found
// class jrec, enq_rec, deq_rec, txn_rec
static const uint32_t JERR_JREC_BADRECHDR; ///< Invalid data record header
@@ -110,13 +106,14 @@ namespace qls_jrnl
static const uint32_t JERR_WMGR_ENQDISCONT; ///< Enq. new dtok when previous part compl.
static const uint32_t JERR_WMGR_DEQDISCONT; ///< Deq. new dtok when previous part compl.
static const uint32_t JERR_WMGR_DEQRIDNOTENQ; ///< Deq. rid not enqueued
+ static const uint32_t JERR_WMGR_BADFH; ///< Bad file handle
- // class rmgr
- static const uint32_t JERR_RMGR_UNKNOWNMAGIC; ///< Found record with unknown magic
- static const uint32_t JERR_RMGR_RIDMISMATCH; ///< RID mismatch between rec and dtok
- //static const uint32_t JERR_RMGR_FIDMISMATCH; ///< FID mismatch between emap and rrfc
- static const uint32_t JERR_RMGR_ENQSTATE; ///< Attempted read when wstate not ENQ
- static const uint32_t JERR_RMGR_BADRECTYPE; ///< Attempted op on incorrect rec type
+// // class rmgr
+// static const uint32_t JERR_RMGR_UNKNOWNMAGIC; ///< Found record with unknown magic
+// static const uint32_t JERR_RMGR_RIDMISMATCH; ///< RID mismatch between rec and dtok
+// //static const uint32_t JERR_RMGR_FIDMISMATCH; ///< FID mismatch between emap and rrfc
+// static const uint32_t JERR_RMGR_ENQSTATE; ///< Attempted read when wstate not ENQ
+// static const uint32_t JERR_RMGR_BADRECTYPE; ///< Attempted op on incorrect rec type
// class data_tok
static const uint32_t JERR_DTOK_ILLEGALSTATE; ///< Attempted to change to illegal state
@@ -127,19 +124,6 @@ namespace qls_jrnl
static const uint32_t JERR_MAP_NOTFOUND; ///< Key not found in map
static const uint32_t JERR_MAP_LOCKED; ///< rid locked by pending txn
- // class jinf
-// static const uint32_t JERR_JINF_CVALIDFAIL; ///< Compatibility validation failure
-// static const uint32_t JERR_JINF_NOVALUESTR; ///< No value attr found in jinf file
-// static const uint32_t JERR_JINF_BADVALUESTR; ///< Bad format for value attr in jinf file
-// static const uint32_t JERR_JINF_JDATEMPTY; ///< Journal data files empty
-// static const uint32_t JERR_JINF_TOOMANYFILES; ///< Too many journal data files
-// static const uint32_t JERR_JINF_INVALIDFHDR; ///< Invalid file header
-// static const uint32_t JERR_JINF_STAT; ///< Error while trying to stat a file
-// static const uint32_t JERR_JINF_NOTREGFILE; ///< Target file is not a regular file
-// static const uint32_t JERR_JINF_BADFILESIZE; ///< File is of incorrect or unexpected size
-// static const uint32_t JERR_JINF_OWIBAD; ///< OWI inconsistent (>1 transition in non-ae journal)
-// static const uint32_t JERR_JINF_ZEROLENFILE; ///< Journal info file is zero length (empty).
-
// EFP errors
static const uint32_t JERR_EFP_BADPARTITIONNAME; ///< Partition name invalid or of value 0
static const uint32_t JERR_EFP_BADEFPDIRNAME; ///< Empty File Pool directory name invalid
Modified: qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/jrec.h
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/jrec.h?rev=1530024&r1=1530023&r2=1530024&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/jrec.h (original)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/jrec.h Mon Oct 7 18:39:24 2013
@@ -148,9 +148,9 @@ namespace qls_jrnl
virtual std::size_t rec_size() const = 0;
inline virtual uint32_t rec_size_dblks() const { return size_dblks(rec_size()); }
static inline uint32_t size_dblks(const std::size_t size)
- { return size_blks(size, JRNL_DBLK_SIZE); }
+ { return size_blks(size, JRNL_DBLK_SIZE_BYTES); }
static inline uint32_t size_sblks(const std::size_t size)
- { return size_blks(size, JRNL_SBLK_SIZE); }
+ { return size_blks(size, JRNL_SBLK_SIZE_BYTES); }
static inline uint32_t size_blks(const std::size_t size, const std::size_t blksize)
{ return (size + blksize - 1)/blksize; }
virtual uint64_t rid() const = 0;
Modified: qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/pmgr.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/pmgr.cpp?rev=1530024&r1=1530023&r2=1530024&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/pmgr.cpp (original)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/pmgr.cpp Mon Oct 7 18:39:24 2013
@@ -29,7 +29,6 @@
#include "qpid/linearstore/jrnl/jerrno.h"
#include <sstream>
-
namespace qpid
{
namespace qls_jrnl
@@ -44,6 +43,7 @@ pmgr::page_cb::page_cb(uint16_t index):
_pdtokl(0),
// _wfh(0),
// _rfh(0),
+ _jfp(0),
_pbuff(0)
{}
@@ -64,7 +64,8 @@ pmgr::page_cb::state_str() const
return "<unknown>";
}
-const uint32_t pmgr::_sblksize = JRNL_SBLK_SIZE;
+// static
+const uint32_t pmgr::_sblkSizeBytes = JRNL_SBLK_SIZE_BYTES;
pmgr::pmgr(jcntl* jc, enq_map& emap, txn_map& tmap):
_cache_pgsize_sblks(0),
@@ -107,32 +108,33 @@ pmgr::initialize(aio_callback* const cbp
_cbp = cbp;
// 1. Allocate page memory (as a single block)
- std::size_t cache_pgsize = _cache_num_pages * _cache_pgsize_sblks * _sblksize;
- if (::posix_memalign(&_page_base_ptr, _sblksize, cache_pgsize))
+ std::size_t cache_pgsize = _cache_num_pages * _cache_pgsize_sblks * _sblkSizeBytes;
+ if (::posix_memalign(&_page_base_ptr, QLS_AIO_ALIGN_BOUNDARY, cache_pgsize))
{
clean();
std::ostringstream oss;
- oss << "posix_memalign(): blksize=" << _sblksize << " size=" << cache_pgsize;
+ oss << "posix_memalign(): alignment=" << QLS_AIO_ALIGN_BOUNDARY << " size=" << cache_pgsize;
oss << FORMAT_SYSERR(errno);
throw jexception(jerrno::JERR__MALLOC, oss.str(), "pmgr", "initialize");
}
+
// 2. Allocate array of page pointers
_page_ptr_arr = (void**)std::malloc(_cache_num_pages * sizeof(void*));
MALLOC_CHK(_page_ptr_arr, "_page_ptr_arr", "pmgr", "initialize");
- // 3. Allocate and initilaize page control block (page_cb) array
+ // 3. Allocate and initialize page control block (page_cb) array
_page_cb_arr = (page_cb*)std::malloc(_cache_num_pages * sizeof(page_cb));
MALLOC_CHK(_page_cb_arr, "_page_cb_arr", "pmgr", "initialize");
std::memset(_page_cb_arr, 0, _cache_num_pages * sizeof(page_cb));
- // 5. Allocate IO control block (iocb) array
+ // 4. Allocate IO control block (iocb) array
_aio_cb_arr = (aio_cb*)std::malloc(_cache_num_pages * sizeof(aio_cb));
MALLOC_CHK(_aio_cb_arr, "_aio_cb_arr", "pmgr", "initialize");
- // 6. Set page pointers in _page_ptr_arr, _page_cb_arr and iocbs to pages within page block
+ // 5. Set page pointers in _page_ptr_arr, _page_cb_arr and iocbs to pages within page block
for (uint16_t i=0; i<_cache_num_pages; i++)
{
- _page_ptr_arr[i] = (void*)((char*)_page_base_ptr + _cache_pgsize_sblks * _sblksize * i);
+ _page_ptr_arr[i] = (void*)((char*)_page_base_ptr + _cache_pgsize_sblks * _sblkSizeBytes * i);
_page_cb_arr[i]._index = i;
_page_cb_arr[i]._state = UNUSED;
_page_cb_arr[i]._pbuff = _page_ptr_arr[i];
@@ -141,12 +143,12 @@ pmgr::initialize(aio_callback* const cbp
_aio_cb_arr[i].data = (void*)&_page_cb_arr[i];
}
- // 7. Allocate io_event array, max one event per cache page plus one for each file
- const uint16_t max_aio_evts = _cache_num_pages /*+ _jc->num_jfiles()*/; // TODO find replacement here for linear store
+ // 6. Allocate io_event array, max one event per cache page plus one for each file
+ const uint16_t max_aio_evts = _cache_num_pages + 1; // One additional event for file header writes
_aio_event_arr = (aio_event*)std::malloc(max_aio_evts * sizeof(aio_event));
MALLOC_CHK(_aio_event_arr, "_aio_event_arr", "pmgr", "initialize");
- // 8. Initialize AIO context
+ // 7. Initialize AIO context
if (int ret = aio::queue_init(max_aio_evts, &_ioctx))
{
std::ostringstream oss;
@@ -158,7 +160,7 @@ pmgr::initialize(aio_callback* const cbp
void
pmgr::clean()
{
- // clean up allocated memory here
+ // Clean up allocated memory here
if (_ioctx)
aio::queue_release(_ioctx);
Modified: qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/pmgr.h
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/pmgr.h?rev=1530024&r1=1530023&r2=1530024&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/pmgr.h (original)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/pmgr.h Mon Oct 7 18:39:24 2013
@@ -45,6 +45,7 @@ namespace qpid
{
namespace qls_jrnl
{
+class JournalFile;
/**
* \brief Abstract class for managing either read or write page cache of arbitrary size and
@@ -64,7 +65,6 @@ namespace qls_jrnl
AIO_COMPLETE ///< An AIO request is complete.
};
- protected:
/**
* \brief Page control block, carries control and state information for each page in the
* cache.
@@ -79,13 +79,15 @@ namespace qls_jrnl
std::deque<data_tok*>* _pdtokl; ///< Page message tokens list
//fcntl* _wfh; ///< File handle for incrementing write compl counts
//fcntl* _rfh; ///< File handle for incrementing read compl counts
+ JournalFile* _jfp; ///< Journal file for incrementing compl counts
void* _pbuff; ///< Page buffer
page_cb(uint16_t index); ///< Convenience constructor
const char* state_str() const; ///< Return state as string for this pcb
};
- static const uint32_t _sblksize; ///< Disk softblock size
+ protected:
+ static const uint32_t _sblkSizeBytes; ///< Disk softblock size
uint32_t _cache_pgsize_sblks; ///< Size of page cache cache_num_pages
uint16_t _cache_num_pages; ///< Number of page cache cache_num_pages
jcntl* _jc; ///< Pointer to journal controller
Modified: qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/rcvdat.h
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/rcvdat.h?rev=1530024&r1=1530023&r2=1530024&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/rcvdat.h (original)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/rcvdat.h Mon Oct 7 18:39:24 2013
@@ -37,98 +37,42 @@ namespace qls_jrnl
struct rcvdat
{
- uint16_t _njf; ///< Number of journal files
-// bool _ae; ///< Auto-expand mode
-// uint16_t _aemjf; ///< Auto-expand mode max journal files
-// bool _owi; ///< Overwrite indicator
-// bool _frot; ///< First rotation flag
- bool _jempty; ///< Journal data files empty
- uint16_t _ffid; ///< First file id
- std::size_t _fro; ///< First record offset in ffid
- uint16_t _lfid; ///< Last file id
- std::size_t _eo; ///< End offset (first byte past last record)
- uint64_t _h_rid; ///< Highest rid found
- bool _lffull; ///< Last file is full
- bool _jfull; ///< Journal is full
-// std::vector<uint16_t> _fid_list; ///< Fid-lid mapping - list of fids in order of lid
- std::vector<uint32_t> _enq_cnt_list; ///< Number enqueued records found for each file
+ std::vector<std::string> _jfl; ///< Journal file list
+ std::map<uint64_t, std::string> _fm; ///< File number - name map
+ std::vector<uint32_t> _enq_cnt_list; ///< Number enqueued records found for each file
+ bool _jempty; ///< Journal data files empty
+ std::size_t _fro; ///< First record offset in ffid
+ std::size_t _eo; ///< End offset (first byte past last record)
+ uint64_t _h_rid; ///< Highest rid found
+ bool _lffull; ///< Last file is full
rcvdat():
- _njf(0),
-// _ae(false),
-// _aemjf(0),
-// _owi(false),
-// _frot(false),
- _jempty(true),
- _ffid(0),
+ _jfl(),
+ _fm(),
+ _enq_cnt_list(),
+ _jempty(false),
_fro(0),
- _lfid(0),
_eo(0),
_h_rid(0),
- _lffull(false),
- _jfull(false),
-// _fid_list(),
- _enq_cnt_list()
+ _lffull(false)
{}
- void reset(const uint16_t num_jfiles/*, const bool auto_expand, const uint16_t ae_max_jfiles*/)
- {
- _njf = num_jfiles;
-// _ae = auto_expand;
-// _aemjf = ae_max_jfiles;
-// _owi = false;
-// _frot = false;
- _jempty = true;
- _ffid = 0;
- _fro = 0;
- _lfid = 0;
- _eo = 0;
- _h_rid = 0;
- _lffull = false;
- _jfull = false;
-// _fid_list.clear();
- _enq_cnt_list.clear();
- _enq_cnt_list.resize(num_jfiles, 0);
- }
-
- // Find first fid with enqueued records
- uint16_t ffid()
- {
- uint16_t index = _ffid;
- while (index != _lfid && _enq_cnt_list[index] == 0)
- {
- if (++index >= _njf)
- index = 0;
- }
- return index;
- }
-
std::string to_string(const std::string& jid)
{
std::ostringstream oss;
oss << "Recover file analysis (jid=\"" << jid << "\"):" << std::endl;
- oss << " Number of journal files (_njf) = " << _njf << std::endl;
-// oss << " Auto-expand mode (_ae) = " << (_ae ? "TRUE" : "FALSE") << std::endl;
-// if (_ae) oss << " Auto-expand mode max journal files (_aemjf) = " << _aemjf << std::endl;
-// oss << " Overwrite indicator (_owi) = " << (_owi ? "TRUE" : "FALSE") << std::endl;
-// oss << " First rotation (_frot) = " << (_frot ? "TRUE" : "FALSE") << std::endl;
+ oss << " Number of journal files = " << _fm.size() << std::endl;
+ oss << " Journal File List (_jfl):";
+ for (std::map<uint64_t, std::string>::const_iterator i=_fm.begin(); i!=_fm.end(); ++i) {
+ oss << " " << i->first << ": " << i->second.substr(i->second.rfind('/')+1) << std::endl;
+ }
oss << " Journal empty (_jempty) = " << (_jempty ? "TRUE" : "FALSE") << std::endl;
- oss << " First (earliest) fid (_ffid) = " << _ffid << std::endl;
oss << " First record offset in first fid (_fro) = 0x" << std::hex << _fro <<
- std::dec << " (" << (_fro/JRNL_DBLK_SIZE) << " dblks)" << std::endl;
- oss << " Last (most recent) fid (_lfid) = " << _lfid << std::endl;
+ std::dec << " (" << (_fro/JRNL_DBLK_SIZE_BYTES) << " dblks)" << std::endl;
oss << " End offset (_eo) = 0x" << std::hex << _eo << std::dec << " (" <<
- (_eo/JRNL_DBLK_SIZE) << " dblks)" << std::endl;
+ (_eo/JRNL_DBLK_SIZE_BYTES) << " dblks)" << std::endl;
oss << " Highest rid (_h_rid) = 0x" << std::hex << _h_rid << std::dec << std::endl;
oss << " Last file full (_lffull) = " << (_lffull ? "TRUE" : "FALSE") << std::endl;
- oss << " Journal full (_jfull) = " << (_jfull ? "TRUE" : "FALSE") << std::endl;
- oss << " Normalized fid list (_fid_list) = [";
-// for (std::vector<uint16_t>::const_iterator i = _fid_list.begin(); i < _fid_list.end(); i++)
-// {
-// if (i != _fid_list.begin()) oss << ", ";
-// oss << *i;
-// }
- oss << "]" << std::endl;
oss << " Enqueued records (txn & non-txn):" << std::endl;
for (unsigned i=0; i<_enq_cnt_list.size(); i++)
oss << " File " << std::setw(2) << i << ": " << _enq_cnt_list[i] <<
@@ -140,28 +84,25 @@ namespace qls_jrnl
{
std::ostringstream oss;
oss << "Recover file analysis (jid=\"" << jid << "\"):";
- oss << " njf=" << _njf;
-// oss << " ae=" << (_owi ? "T" : "F");
-// oss << " aemjf=" << _aemjf;
-// oss << " owi=" << (_ae ? "T" : "F");
-// oss << " frot=" << (_frot ? "T" : "F");
+ oss << " jfl=[";
+ for (std::map<uint64_t, std::string>::const_iterator i=_fm.begin(); i!=_fm.end(); ++i) {
+ if (i!=_fm.begin()) oss << " ";
+ oss << i->first << ":" << i->second.substr(i->second.rfind('/')+1);
+ }
+ oss << "]";
+ oss << " _enq_cnt_list: [ ";
+ for (unsigned i=0; i<_enq_cnt_list.size(); i++) {
+ if (i) oss << " ";
+ oss << _enq_cnt_list[i];
+ }
+ oss << " ]";
oss << " jempty=" << (_jempty ? "T" : "F");
- oss << " ffid=" << _ffid;
oss << " fro=0x" << std::hex << _fro << std::dec << " (" <<
- (_fro/JRNL_DBLK_SIZE) << " dblks)";
- oss << " lfid=" << _lfid;
+ (_fro/JRNL_DBLK_SIZE_BYTES) << " dblks)";
oss << " eo=0x" << std::hex << _eo << std::dec << " (" <<
- (_eo/JRNL_DBLK_SIZE) << " dblks)";
+ (_eo/JRNL_DBLK_SIZE_BYTES) << " dblks)";
oss << " h_rid=0x" << std::hex << _h_rid << std::dec;
oss << " lffull=" << (_lffull ? "T" : "F");
- oss << " jfull=" << (_jfull ? "T" : "F");
- oss << " Enqueued records (txn & non-txn): [ ";
- for (unsigned i=0; i<_enq_cnt_list.size(); i++)
- {
- if (i) oss << " ";
- oss << "fid_" << std::setw(2) << std::setfill('0') << i << "=" << _enq_cnt_list[i];
- }
- oss << " ]";
return oss.str();
}
};
Modified: qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/rmgr.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/rmgr.cpp?rev=1530024&r1=1530023&r2=1530024&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/rmgr.cpp (original)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/rmgr.cpp Mon Oct 7 18:39:24 2013
@@ -53,10 +53,10 @@ rmgr::initialize(aio_callback* const cbp
pmgr::initialize(cbp, JRNL_RMGR_PAGE_SIZE, JRNL_RMGR_PAGES);
clean();
// Allocate memory for reading file header
- if (::posix_memalign(&_fhdr_buffer, _sblksize, _sblksize))
+ if (::posix_memalign(&_fhdr_buffer, _sblkSizeBytes, _sblkSizeBytes))
{
std::ostringstream oss;
- oss << "posix_memalign(): blksize=" << _sblksize << " size=" << _sblksize;
+ oss << "posix_memalign(): blksize=" << _sblkSizeBytes << " size=" << _sblkSizeBytes;
oss << FORMAT_SYSERR(errno);
throw jexception(jerrno::JERR__MALLOC, oss.str(), "rmgr", "initialize");
}
Modified: qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/txn_map.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/txn_map.cpp?rev=1530024&r1=1530023&r2=1530024&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/txn_map.cpp (original)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/txn_map.cpp Mon Oct 7 18:39:24 2013
@@ -50,23 +50,27 @@ txn_data_struct::txn_data_struct(const u
{}
txn_map::txn_map():
- _map(),
- _pfid_txn_cnt()
+ _map()/*,
+ _pfid_txn_cnt()*/
{}
txn_map::~txn_map() {}
+/*
void
txn_map::set_num_jfiles(const uint16_t num_jfiles)
{
_pfid_txn_cnt.resize(num_jfiles, 0);
}
+*/
+/*
uint32_t
txn_map::get_txn_pfid_cnt(const uint16_t pfid) const
{
return _pfid_txn_cnt.at(pfid);
}
+*/
bool
txn_map::insert_txn_data(const std::string& xid, const txn_data& td)
@@ -84,7 +88,7 @@ txn_map::insert_txn_data(const std::stri
}
else
itr->second.push_back(td);
- _pfid_txn_cnt.at(td._pfid)++;
+// _pfid_txn_cnt.at(td._pfid)++;
return ok;
}
@@ -113,8 +117,8 @@ txn_map::get_remove_tdata_list(const std
return _empty_data_list;
txn_data_list list = itr->second;
_map.erase(itr);
- for (tdl_itr i=list.begin(); i!=list.end(); i++)
- _pfid_txn_cnt.at(i->_pfid)--;
+// for (tdl_itr i=list.begin(); i!=list.end(); i++)
+// _pfid_txn_cnt.at(i->_pfid)--;
return list;
}
Modified: qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/txn_map.h
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/txn_map.h?rev=1530024&r1=1530023&r2=1530024&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/txn_map.h (original)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/txn_map.h Mon Oct 7 18:39:24 2013
@@ -112,15 +112,15 @@ namespace qls_jrnl
xmap _map;
smutex _mutex;
- std::vector<uint32_t> _pfid_txn_cnt;
+// std::vector<uint32_t> _pfid_txn_cnt;
const txn_data_list _empty_data_list;
public:
txn_map();
virtual ~txn_map();
- void set_num_jfiles(const uint16_t num_jfiles);
- uint32_t get_txn_pfid_cnt(const uint16_t pfid) const;
+// void set_num_jfiles(const uint16_t num_jfiles);
+// uint32_t get_txn_pfid_cnt(const uint16_t pfid) const;
bool insert_txn_data(const std::string& xid, const txn_data& td);
const txn_data_list get_tdata_list(const std::string& xid);
const txn_data_list get_remove_tdata_list(const std::string& xid);
Modified: qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/txn_rec.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/txn_rec.cpp?rev=1530024&r1=1530023&r2=1530024&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/txn_rec.cpp (original)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/txn_rec.cpp Mon Oct 7 18:39:24 2013
@@ -94,8 +94,8 @@ txn_rec::encode(void* wptr, uint32_t rec
assert(max_size_dblks > 0);
assert(_xidp != 0 && _txn_hdr._xidsize > 0);
- std::size_t rec_offs = rec_offs_dblks * JRNL_DBLK_SIZE;
- std::size_t rem = max_size_dblks * JRNL_DBLK_SIZE;
+ std::size_t rec_offs = rec_offs_dblks * JRNL_DBLK_SIZE_BYTES;
+ std::size_t rem = max_size_dblks * JRNL_DBLK_SIZE_BYTES;
std::size_t wr_cnt = 0;
if (rec_offs_dblks) // Continuation of split dequeue record (over 2 or more pages)
{
@@ -146,8 +146,8 @@ txn_rec::encode(void* wptr, uint32_t rec
std::memcpy((char*)wptr + wr_cnt, (char*)&_txn_tail + rec_offs, wsize);
wr_cnt += wsize;
#ifdef RHM_CLEAN
- std::size_t rec_offs = rec_offs_dblks * JRNL_DBLK_SIZE;
- std::size_t dblk_rec_size = size_dblks(rec_size() - rec_offs) * JRNL_DBLK_SIZE;
+ std::size_t rec_offs = rec_offs_dblks * JRNL_DBLK_SIZE_BYTES;
+ std::size_t dblk_rec_size = size_dblks(rec_size() - rec_offs) * JRNL_DBLK_SIZE_BYTES;
std::memset((char*)wptr + wr_cnt, RHM_CLEAN_CHAR, dblk_rec_size - wr_cnt);
#endif
}
@@ -187,7 +187,7 @@ txn_rec::encode(void* wptr, uint32_t rec
std::memcpy((char*)wptr + wr_cnt, (void*)&_txn_tail, sizeof(_txn_tail));
wr_cnt += sizeof(_txn_tail);
#ifdef RHM_CLEAN
- std::size_t dblk_rec_size = size_dblks(rec_size()) * JRNL_DBLK_SIZE;
+ std::size_t dblk_rec_size = size_dblks(rec_size()) * JRNL_DBLK_SIZE_BYTES;
std::memset((char*)wptr + wr_cnt, RHM_CLEAN_CHAR, dblk_rec_size - wr_cnt);
#endif
}
@@ -206,7 +206,7 @@ txn_rec::decode(rec_hdr_t& h, void* rptr
{
const uint32_t hdr_xid_dblks = size_dblks(sizeof(txn_hdr_t) + _txn_hdr._xidsize);
const uint32_t hdr_xid_tail_dblks = size_dblks(sizeof(txn_hdr_t) + _txn_hdr._xidsize + sizeof(rec_tail_t));
- const std::size_t rec_offs = rec_offs_dblks * JRNL_DBLK_SIZE;
+ const std::size_t rec_offs = rec_offs_dblks * JRNL_DBLK_SIZE_BYTES;
if (hdr_xid_tail_dblks - rec_offs_dblks <= max_size_dblks)
{
@@ -239,7 +239,7 @@ txn_rec::decode(rec_hdr_t& h, void* rptr
const std::size_t xid_rem = _txn_hdr._xidsize - xid_offs;
std::memcpy((char*)_buff + xid_offs, rptr, xid_rem);
rd_cnt += xid_rem;
- const std::size_t tail_rem = (max_size_dblks * JRNL_DBLK_SIZE) - rd_cnt;
+ const std::size_t tail_rem = (max_size_dblks * JRNL_DBLK_SIZE_BYTES) - rd_cnt;
if (tail_rem)
{
std::memcpy((void*)&_txn_tail, ((char*)rptr + xid_rem), tail_rem);
@@ -249,7 +249,7 @@ txn_rec::decode(rec_hdr_t& h, void* rptr
else
{
// Remainder of xid split
- const std::size_t xid_cp_size = (max_size_dblks * JRNL_DBLK_SIZE);
+ const std::size_t xid_cp_size = (max_size_dblks * JRNL_DBLK_SIZE_BYTES);
std::memcpy((char*)_buff + rec_offs - sizeof(txn_hdr_t), rptr, xid_cp_size);
rd_cnt += xid_cp_size;
}
@@ -288,7 +288,7 @@ txn_rec::decode(rec_hdr_t& h, void* rptr
// Entire header and xid fit within this page, tail split
std::memcpy(_buff, (char*)rptr + rd_cnt, _txn_hdr._xidsize);
rd_cnt += _txn_hdr._xidsize;
- const std::size_t tail_rem = (max_size_dblks * JRNL_DBLK_SIZE) - rd_cnt;
+ const std::size_t tail_rem = (max_size_dblks * JRNL_DBLK_SIZE_BYTES) - rd_cnt;
if (tail_rem)
{
std::memcpy((void*)&_txn_tail, (char*)rptr + rd_cnt, tail_rem);
@@ -298,7 +298,7 @@ txn_rec::decode(rec_hdr_t& h, void* rptr
else
{
// Header fits within this page, xid split
- const std::size_t xid_cp_size = (max_size_dblks * JRNL_DBLK_SIZE) - rd_cnt;
+ const std::size_t xid_cp_size = (max_size_dblks * JRNL_DBLK_SIZE_BYTES) - rd_cnt;
std::memcpy(_buff, (char*)rptr + rd_cnt, xid_cp_size);
rd_cnt += xid_cp_size;
}
@@ -357,7 +357,7 @@ txn_rec::rcv_decode(rec_hdr_t h, std::if
return false;
}
}
- ifsp->ignore(rec_size_dblks() * JRNL_DBLK_SIZE - rec_size());
+ ifsp->ignore(rec_size_dblks() * JRNL_DBLK_SIZE_BYTES - rec_size());
chk_tail(); // Throws if tail invalid or record incomplete
assert(!ifsp->fail() && !ifsp->bad());
return true;
Modified: qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/utils/enq_hdr.c
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/utils/enq_hdr.c?rev=1530024&r1=1530023&r2=1530024&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/utils/enq_hdr.c (original)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/utils/enq_hdr.c Mon Oct 7 18:39:24 2013
@@ -54,3 +54,10 @@ void set_enq_external(enq_hdr_t *eh, con
eh->_rhdr._uflag = external ? eh->_rhdr._uflag | ENQ_HDR_EXTERNAL_MASK :
eh->_rhdr._uflag & (~ENQ_HDR_EXTERNAL_MASK);
}
+
+bool validate_enq_hdr(enq_hdr_t *eh, const uint32_t magic, const uint16_t version, const uint64_t rid) {
+ return eh->_rhdr._magic == magic &&
+ eh->_rhdr._version == version &&
+ rid > 0 ? eh->_rhdr._rid == rid /* If rid == 0, don't compare rids */
+ : true;
+}
Modified: qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/utils/enq_hdr.h
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/utils/enq_hdr.h?rev=1530024&r1=1530023&r2=1530024&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/utils/enq_hdr.h (original)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/utils/enq_hdr.h Mon Oct 7 18:39:24 2013
@@ -70,6 +70,7 @@ bool is_enq_transient(const enq_hdr_t *e
void set_enq_transient(enq_hdr_t *eh, const bool transient);
bool is_enq_external(const enq_hdr_t *eh);
void set_enq_external(enq_hdr_t *eh, const bool external);
+bool validate_enq_hdr(enq_hdr_t *eh, const uint32_t magic, const uint16_t version, const uint64_t rid);
#pragma pack()
Modified: qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/utils/file_hdr.c
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/utils/file_hdr.c?rev=1530024&r1=1530023&r2=1530024&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/utils/file_hdr.c (original)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/utils/file_hdr.c Mon Oct 7 18:39:24 2013
@@ -36,19 +36,21 @@ void file_hdr_create(file_hdr_t* dest, c
dest->_queue_name_len = 0;
}
-int file_hdr_init(file_hdr_t* dest, const uint16_t uflag, const uint64_t rid, const uint64_t fro,
+int file_hdr_init(void* dest, const uint64_t dest_len, const uint16_t uflag, const uint64_t rid, const uint64_t fro,
const uint64_t file_number, const uint16_t queue_name_len, const char* queue_name) {
- dest->_rhdr._uflag = uflag;
- dest->_rhdr._rid = rid;
- dest->_fro = fro;
- dest->_file_number = file_number;
+ file_hdr_t* fhp = (file_hdr_t*)dest;
+ fhp->_rhdr._uflag = uflag;
+ fhp->_rhdr._rid = rid;
+ fhp->_fro = fro;
+ fhp->_file_number = file_number;
if (sizeof(file_hdr_t) + queue_name_len < MAX_FILE_HDR_LEN) {
- dest->_queue_name_len = queue_name_len;
+ fhp->_queue_name_len = queue_name_len;
} else {
- dest->_queue_name_len = MAX_FILE_HDR_LEN - sizeof(file_hdr_t);
+ fhp->_queue_name_len = MAX_FILE_HDR_LEN - sizeof(file_hdr_t);
}
- dest->_queue_name_len = queue_name_len;
- memcpy(dest + sizeof(file_hdr_t), queue_name, queue_name_len);
+ fhp->_queue_name_len = queue_name_len;
+ memcpy((char*)dest + sizeof(file_hdr_t), queue_name, queue_name_len);
+ memset((char*)dest + sizeof(file_hdr_t) + queue_name_len, 0, dest_len - sizeof(file_hdr_t) - queue_name_len);
return set_time_now(dest);
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org