You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kp...@apache.org on 2013/10/07 20:39:25 UTC
svn commit: r1530024 [4/4] - in /qpid/branches/linearstore/qpid/cpp/src: ./
qpid/linearstore/ qpid/linearstore/jrnl/ qpid/linearstore/jrnl/utils/
tests/linearstore/
Modified: qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/utils/file_hdr.h
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/utils/file_hdr.h?rev=1530024&r1=1530023&r2=1530024&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/utils/file_hdr.h (original)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/utils/file_hdr.h Mon Oct 7 18:39:24 2013
@@ -89,7 +89,7 @@ typedef struct file_hdr_t {
void file_hdr_create(file_hdr_t* dest, const uint32_t magic, const uint16_t version, const uint16_t fhdr_size_sblks,
const uint16_t efp_partition, const uint64_t file_size);
-int file_hdr_init(file_hdr_t* dest, const uint16_t uflag, const uint64_t rid, const uint64_t fro,
+int file_hdr_init(void* dest, const uint64_t dest_len, const uint16_t uflag, const uint64_t rid, const uint64_t fro,
const uint64_t file_number, const uint16_t queue_name_len, const char* queue_name);
void file_hdr_reset(file_hdr_t* target);
int is_file_hdr_reset(file_hdr_t* target);
Modified: qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/wmgr.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/wmgr.cpp?rev=1530024&r1=1530023&r2=1530024&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/wmgr.cpp (original)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/wmgr.cpp Mon Oct 7 18:39:24 2013
@@ -28,26 +28,23 @@
#include "qpid/linearstore/jrnl/utils/file_hdr.h"
#include "qpid/linearstore/jrnl/jcntl.h"
#include "qpid/linearstore/jrnl/jerrno.h"
+#include "qpid/linearstore/jrnl/JournalFile.h"
#include <sstream>
#include <stdint.h>
+//#include <iostream> // DEBUG
+
namespace qpid
{
namespace qls_jrnl
{
-wmgr::wmgr(jcntl* jc, enq_map& emap, txn_map& tmap/*, wrfc& wrfc*/):
+wmgr::wmgr(jcntl* jc, enq_map& emap, txn_map& tmap, LinearFileController& lfc):
pmgr(jc, emap, tmap),
-// _wrfc(wrfc),
+ _lfc(lfc),
_max_dtokpp(0),
_max_io_wait_us(0),
- _fhdr_base_ptr(0),
- _fhdr_ptr_arr(0),
- _fhdr_aio_cb_arr(0),
_cached_offset_dblks(0),
-// _jfsize_dblks(0),
-// _jfsize_pgs(0),
-// _num_jfiles(0),
_enq_busy(false),
_deq_busy(false),
_abort_busy(false),
@@ -55,19 +52,12 @@ wmgr::wmgr(jcntl* jc, enq_map& emap, txn
_txn_pending_set()
{}
-wmgr::wmgr(jcntl* jc, enq_map& emap, txn_map& tmap/*, wrfc& wrfc*/,
- const uint32_t max_dtokpp, const uint32_t max_iowait_us):
+wmgr::wmgr(jcntl* jc, enq_map& emap, txn_map& tmap, LinearFileController& lfc, const uint32_t max_dtokpp, const uint32_t max_iowait_us):
pmgr(jc, emap, tmap /* , dtoklp */),
-// _wrfc(wrfc),
+ _lfc(lfc),
_max_dtokpp(max_dtokpp),
_max_io_wait_us(max_iowait_us),
- _fhdr_base_ptr(0),
- _fhdr_ptr_arr(0),
- _fhdr_aio_cb_arr(0),
_cached_offset_dblks(0),
-// _jfsize_dblks(0),
-// _jfsize_pgs(0),
-// _num_jfiles(0),
_enq_busy(false),
_deq_busy(false),
_abort_busy(false),
@@ -94,14 +84,10 @@ wmgr::initialize(aio_callback* const cbp
initialize(cbp, wcache_pgsize_sblks, wcache_num_pages);
-// _jfsize_dblks = _jc->jfsize_sblks() * JRNL_SBLK_SIZE_DBLKS;
-// _jfsize_pgs = _jc->jfsize_sblks() / _cache_pgsize_sblks;
-// assert(_jc->jfsize_sblks() % JRNL_RMGR_PAGE_SIZE == 0);
-
if (eo)
{
const uint32_t wr_pg_size_dblks = _cache_pgsize_sblks * JRNL_SBLK_SIZE_DBLKS;
- uint32_t data_dblks = (eo / JRNL_DBLK_SIZE) - 4; // 4 dblks for file hdr
+ uint32_t data_dblks = (eo / JRNL_DBLK_SIZE_BYTES) - 4; // 4 dblks for file hdr
_pg_cntr = data_dblks / wr_pg_size_dblks;
_pg_offset_dblks = data_dblks - (_pg_cntr * wr_pg_size_dblks);
}
@@ -138,7 +124,7 @@ wmgr::enqueue(const void* const data_buf
}
}
- uint64_t rid = (dtokp->external_rid() | cont) ? dtokp->rid() : /*_wrfc.get_incr_rid()*/0; // TODO: replace for linearstore: _wrfc
+ uint64_t rid = (dtokp->external_rid() | cont) ? dtokp->rid() : _lfc.getNextRecordId();/*_wrfc.get_incr_rid()*/
_enq_rec.reset(rid, data_buff, tot_data_len, xid_ptr, xid_len/*, _wrfc.owi()*/, transient,
external);
if (!cont)
@@ -155,15 +141,15 @@ wmgr::enqueue(const void* const data_buf
while (!done)
{
assert(_pg_offset_dblks < _cache_pgsize_sblks * JRNL_SBLK_SIZE_DBLKS);
- void* wptr = (void*)((char*)_page_ptr_arr[_pg_index] + _pg_offset_dblks * JRNL_DBLK_SIZE);
+ void* wptr = (void*)((char*)_page_ptr_arr[_pg_index] + _pg_offset_dblks * JRNL_DBLK_SIZE_BYTES);
uint32_t data_offs_dblks = dtokp->dblocks_written();
uint32_t ret = _enq_rec.encode(wptr, data_offs_dblks,
(_cache_pgsize_sblks * JRNL_SBLK_SIZE_DBLKS) - _pg_offset_dblks);
// Remember fid which contains the record header in case record is split over several files
- // TODO: replace for linearstore: _wrfc
-// if (data_offs_dblks == 0)
-// dtokp->set_fid(_wrfc.index());
+ if (data_offs_dblks == 0) {
+ dtokp->set_fid(_lfc.getCurrentFileSeqNum());
+ }
_pg_offset_dblks += ret;
_cached_offset_dblks += ret;
dtokp->incr_dblocks_written(ret);
@@ -180,7 +166,7 @@ wmgr::enqueue(const void* const data_buf
// long multi-page messages have their token on the page containing the END of the
// message. AIO callbacks will then only process this token when entire message is
// enqueued.
- //_wrfc.incr_enqcnt(dtokp->fid()); // TODO: replace for linearstore: _wrfc
+ _lfc.incrEnqueuedRecordCount();
if (xid_len) // If part of transaction, add to transaction map
{
@@ -189,7 +175,7 @@ wmgr::enqueue(const void* const data_buf
}
else
{
- if (_emap.insert_pfid(rid, dtokp->fid()) < enq_map::EMAP_OK) // fail
+ if (_emap.insert_pfid(rid, dtokp->fid(), 0) < enq_map::EMAP_OK) // fail
{
// The only error code emap::insert_pfid() returns is enq_map::EMAP_DUP_RID.
std::ostringstream oss;
@@ -260,15 +246,15 @@ wmgr::dequeue(data_tok* dtokp, const voi
while (!done)
{
assert(_pg_offset_dblks < _cache_pgsize_sblks * JRNL_SBLK_SIZE_DBLKS);
- void* wptr = (void*)((char*)_page_ptr_arr[_pg_index] + _pg_offset_dblks * JRNL_DBLK_SIZE);
+ void* wptr = (void*)((char*)_page_ptr_arr[_pg_index] + _pg_offset_dblks * JRNL_DBLK_SIZE_BYTES);
uint32_t data_offs_dblks = dtokp->dblocks_written();
uint32_t ret = _deq_rec.encode(wptr, data_offs_dblks,
(_cache_pgsize_sblks * JRNL_SBLK_SIZE_DBLKS) - _pg_offset_dblks);
// Remember fid which contains the record header in case record is split over several files
- // TODO: replace for linearstore: _wrfc
-// if (data_offs_dblks == 0)
-// dtokp->set_fid(_wrfc.index());
+ if (data_offs_dblks == 0) {
+ dtokp->set_fid(_lfc.getCurrentFileSeqNum());
+ }
_pg_offset_dblks += ret;
_cached_offset_dblks += ret;
dtokp->incr_dblocks_written(ret);
@@ -290,23 +276,24 @@ wmgr::dequeue(data_tok* dtokp, const voi
}
else
{
- int16_t fid = _emap.get_remove_pfid(dtokp->dequeue_rid());
- if (fid < enq_map::EMAP_OK) // fail
+ int16_t fid;
+ short eres = _emap.get_remove_pfid(dtokp->dequeue_rid(), fid);
+ if (eres < enq_map::EMAP_OK) // fail
{
- if (fid == enq_map::EMAP_RID_NOT_FOUND)
+ if (eres == enq_map::EMAP_RID_NOT_FOUND)
{
std::ostringstream oss;
oss << std::hex << "rid=0x" << rid;
throw jexception(jerrno::JERR_MAP_NOTFOUND, oss.str(), "wmgr", "dequeue");
}
- if (fid == enq_map::EMAP_LOCKED)
+ if (eres == enq_map::EMAP_LOCKED)
{
std::ostringstream oss;
oss << std::hex << "rid=0x" << rid;
throw jexception(jerrno::JERR_MAP_LOCKED, oss.str(), "wmgr", "dequeue");
}
}
-// _wrfc.decr_enqcnt(fid); // TODO: replace for linearstore: _wrfc
+ _lfc.decrEnqueuedRecordCount();
}
done = true;
@@ -348,7 +335,7 @@ wmgr::abort(data_tok* dtokp, const void*
}
}
- uint64_t rid = (dtokp->external_rid() | cont) ? dtokp->rid() :/* _wrfc.get_incr_rid()*/0; // TODO: replace for linearstore: _wrfc
+ uint64_t rid = (dtokp->external_rid() | cont) ? dtokp->rid() : _lfc.getNextRecordId();
_txn_rec.reset(QLS_TXA_MAGIC, rid, xid_ptr, xid_len/*, _wrfc.owi()*/);
if (!cont)
{
@@ -362,15 +349,14 @@ wmgr::abort(data_tok* dtokp, const void*
while (!done)
{
assert(_pg_offset_dblks < _cache_pgsize_sblks * JRNL_SBLK_SIZE_DBLKS);
- void* wptr = (void*)((char*)_page_ptr_arr[_pg_index] + _pg_offset_dblks * JRNL_DBLK_SIZE);
+ void* wptr = (void*)((char*)_page_ptr_arr[_pg_index] + _pg_offset_dblks * JRNL_DBLK_SIZE_BYTES);
uint32_t data_offs_dblks = dtokp->dblocks_written();
uint32_t ret = _txn_rec.encode(wptr, data_offs_dblks,
(_cache_pgsize_sblks * JRNL_SBLK_SIZE_DBLKS) - _pg_offset_dblks);
// Remember fid which contains the record header in case record is split over several files
- // TODO: replace for linearstore: _wrfc
-// if (data_offs_dblks == 0)
-// dtokp->set_fid(_wrfc.index());
+ if (data_offs_dblks == 0)
+ dtokp->set_fid(_lfc.getCurrentFileSeqNum());
_pg_offset_dblks += ret;
_cached_offset_dblks += ret;
dtokp->incr_dblocks_written(ret);
@@ -389,9 +375,8 @@ wmgr::abort(data_tok* dtokp, const void*
{
if (!itr->_enq_flag)
_emap.unlock(itr->_drid); // ignore rid not found error
- // TODO: replace for linearstore: _wrfc
-// if (itr->_enq_flag)
-// _wrfc.decr_enqcnt(itr->_pfid);
+ if (itr->_enq_flag)
+ _lfc.decrEnqueuedRecordCount(itr->_pfid);
}
std::pair<std::set<std::string>::iterator, bool> res = _txn_pending_set.insert(xid);
if (!res.second)
@@ -440,7 +425,7 @@ wmgr::commit(data_tok* dtokp, const void
}
}
- uint64_t rid = (dtokp->external_rid() | cont) ? dtokp->rid() : /*_wrfc.get_incr_rid()*/0; // TODO: replace for linearstore: _wrfc
+ uint64_t rid = (dtokp->external_rid() | cont) ? dtokp->rid() : _lfc.getNextRecordId();
_txn_rec.reset(QLS_TXC_MAGIC, rid, xid_ptr, xid_len/*, _wrfc.owi()*/);
if (!cont)
{
@@ -454,15 +439,14 @@ wmgr::commit(data_tok* dtokp, const void
while (!done)
{
assert(_pg_offset_dblks < _cache_pgsize_sblks * JRNL_SBLK_SIZE_DBLKS);
- void* wptr = (void*)((char*)_page_ptr_arr[_pg_index] + _pg_offset_dblks * JRNL_DBLK_SIZE);
+ void* wptr = (void*)((char*)_page_ptr_arr[_pg_index] + _pg_offset_dblks * JRNL_DBLK_SIZE_BYTES);
uint32_t data_offs_dblks = dtokp->dblocks_written();
uint32_t ret = _txn_rec.encode(wptr, data_offs_dblks,
(_cache_pgsize_sblks * JRNL_SBLK_SIZE_DBLKS) - _pg_offset_dblks);
// Remember fid which contains the record header in case record is split over several files
- // TODO: replace for linearstore: _wrfc
-// if (data_offs_dblks == 0)
-// dtokp->set_fid(_wrfc.index());
+ if (data_offs_dblks == 0)
+ dtokp->set_fid(_lfc.getCurrentFileSeqNum());
_pg_offset_dblks += ret;
_cached_offset_dblks += ret;
dtokp->incr_dblocks_written(ret);
@@ -481,7 +465,7 @@ wmgr::commit(data_tok* dtokp, const void
{
if (itr->_enq_flag) // txn enqueue
{
- if (_emap.insert_pfid(itr->_rid, itr->_pfid) < enq_map::EMAP_OK) // fail
+ if (_emap.insert_pfid(itr->_rid, itr->_pfid, 0) < enq_map::EMAP_OK) // fail
{
// The only error code emap::insert_pfid() returns is enq_map::EMAP_DUP_RID.
std::ostringstream oss;
@@ -491,23 +475,24 @@ wmgr::commit(data_tok* dtokp, const void
}
else // txn dequeue
{
- int16_t fid = _emap.get_remove_pfid(itr->_drid, true);
- if (fid < enq_map::EMAP_OK) // fail
+ int16_t fid;
+ short eres = _emap.get_remove_pfid(itr->_drid, fid, true);
+ if (eres < enq_map::EMAP_OK) // fail
{
- if (fid == enq_map::EMAP_RID_NOT_FOUND)
+ if (eres == enq_map::EMAP_RID_NOT_FOUND)
{
std::ostringstream oss;
oss << std::hex << "rid=0x" << rid;
throw jexception(jerrno::JERR_MAP_NOTFOUND, oss.str(), "wmgr", "dequeue");
}
- if (fid == enq_map::EMAP_LOCKED)
+ if (eres == enq_map::EMAP_LOCKED)
{
std::ostringstream oss;
oss << std::hex << "rid=0x" << rid;
throw jexception(jerrno::JERR_MAP_LOCKED, oss.str(), "wmgr", "dequeue");
}
}
-// _wrfc.decr_enqcnt(fid); // TODO: replace for linearstore: _wrfc
+ _lfc.decrEnqueuedRecordCount(fid);
}
}
std::pair<std::set<std::string>::iterator, bool> res = _txn_pending_set.insert(xid);
@@ -532,30 +517,26 @@ wmgr::commit(data_tok* dtokp, const void
}
void
-wmgr::file_header_check(const uint64_t /*rid*/, const bool /*cont*/, const uint32_t /*rec_dblks_rem*/)
+wmgr::file_header_check(const uint64_t rid, const bool cont, const uint32_t rec_dblks_rem)
{
- // Has the file header been written (i.e. write pointers still at 0)?
- // TODO: replace for linearstore: _wrfc
-/*
- if (_wrfc.is_void()) // TODO: replace for linearstore: _wrfc
+ if (_lfc.isEmpty()) // File never written (i.e. no header or data)
{
- bool file_fit = rec_dblks_rem <= _jfsize_dblks;
- bool file_full = rec_dblks_rem == _jfsize_dblks;
std::size_t fro = 0;
- if (cont)
- {
- if (file_fit && !file_full)
- fro = (rec_dblks_rem + JRNL_SBLK_SIZE_DBLKS) * JRNL_DBLK_SIZE;
+ if (cont) {
+ bool file_fit = rec_dblks_rem <= _lfc.dataSize_sblks() * JRNL_SBLK_SIZE_DBLKS; // Will fit within this journal file
+ bool file_full = rec_dblks_rem == _lfc.dataSize_sblks() * JRNL_SBLK_SIZE_DBLKS; // Will exactly fill this journal file
+ if (file_fit && !file_full) {
+ fro = (rec_dblks_rem + (QLS_JRNL_FHDR_RES_SIZE_SBLKS * JRNL_SBLK_SIZE_DBLKS)) * JRNL_DBLK_SIZE_BYTES;
+ }
+ } else {
+ fro = QLS_JRNL_FHDR_RES_SIZE_SBLKS * JRNL_SBLK_SIZE_BYTES;
}
- else
- fro = JRNL_SBLK_SIZE;
- write_fhdr(rid, _wrfc.index(), _wrfc.index(), fro); // TODO: replace for linearstore: _wrfc
+ _lfc.asyncFileHeaderWrite(_ioctx, 0, rid, fro);
}
-*/
}
void
-wmgr::flush_check(iores& res, bool& /*cont*/, bool& done)
+wmgr::flush_check(iores& res, bool& cont, bool& done)
{
// Is page is full, flush
if (_pg_offset_dblks >= _cache_pgsize_sblks * JRNL_SBLK_SIZE_DBLKS)
@@ -569,22 +550,15 @@ wmgr::flush_check(iores& res, bool& /*co
done = true;
}
-/*
// If file is full, rotate to next file
- if (_pg_cntr >= _jfsize_pgs)
+ uint32_t fileSize_pgs = _lfc.fileSize_sblks() / _cache_pgsize_sblks;
+ if (_pg_cntr >= fileSize_pgs)
{
- iores rfres = rotate_file();
- if (rfres != RHM_IORES_SUCCESS)
- res = rfres;
- if (!done)
- {
- if (rfres == RHM_IORES_SUCCESS)
- cont = true;
- else
- done = true;
+ get_next_file();
+ if (!done) {
+ cont = true;
}
}
-*/
}
}
@@ -592,14 +566,10 @@ iores
wmgr::flush()
{
iores res = write_flush();
-/*
- if (_pg_cntr >= _jfsize_pgs)
- {
- iores rfres = rotate_file();
- if (rfres != RHM_IORES_SUCCESS)
- res = rfres;
+ uint32_t fileSize_pgs = _lfc.fileSize_sblks() / _cache_pgsize_sblks;
+ if (res == RHM_IORES_SUCCESS && _pg_cntr >= fileSize_pgs) {
+ get_next_file();
}
-*/
return res;
}
@@ -607,7 +577,6 @@ iores
wmgr::write_flush()
{
iores res = RHM_IORES_SUCCESS;
-/*
// Don't bother flushing an empty page or one that is still in state AIO_PENDING
if (_cached_offset_dblks)
{
@@ -629,18 +598,9 @@ wmgr::write_flush()
// if necessary.
dblk_roundup();
- std::size_t pg_offs = (_pg_offset_dblks - _cached_offset_dblks) * JRNL_DBLK_SIZE;
+ std::size_t pg_offs = (_pg_offset_dblks - _cached_offset_dblks) * JRNL_DBLK_SIZE_BYTES;
aio_cb* aiocbp = &_aio_cb_arr[_pg_index];
- aio::prep_pwrite_2(aiocbp, _wrfc.fh(),
- (char*)_page_ptr_arr[_pg_index] + pg_offs, _cached_offset_dblks * JRNL_DBLK_SIZE,
- _wrfc.subm_offs());
- page_cb* pcbp = (page_cb*)(aiocbp->data); // This page control block (pcb)
- pcbp->_wdblks = _cached_offset_dblks;
- pcbp->_wfh = _wrfc.file_controller();
- if (aio::submit(_ioctx, 1, &aiocbp) < 0)
- throw jexception(jerrno::JERR__AIO, "wmgr", "write_flush");
- _wrfc.add_subm_cnt_dblks(_cached_offset_dblks);
- _wrfc.incr_aio_cnt();
+ _lfc.asyncPageWrite(_ioctx, aiocbp, (char*)_page_ptr_arr[_pg_index] + pg_offs, _cached_offset_dblks);
_aio_evt_rem++;
_cached_offset_dblks = 0;
_jc->instr_incr_outstanding_aio_cnt();
@@ -653,21 +613,14 @@ wmgr::write_flush()
get_events(UNUSED, 0);
if (_page_cb_arr[_pg_index]._state == UNUSED)
_page_cb_arr[_pg_index]._state = IN_USE;
-*/
return res;
}
-iores
-wmgr::rotate_file()
+void
+wmgr::get_next_file()
{
- // TODO: replace for linearstore: _wrfc
-/*
_pg_cntr = 0;
- iores res = _wrfc.rotate();
- _jc->chk_wr_frot();
- return res;
-*/
- return RHM_IORES_SUCCESS;
+ _lfc.pullEmptyFileFromEfp();
}
int32_t
@@ -702,17 +655,15 @@ wmgr::get_events(page_state state, times
aio_cb* aiocbp = _aio_event_arr[i].obj; // This I/O control block (iocb)
page_cb* pcbp = (page_cb*)(aiocbp->data); // This page control block (pcb)
long aioret = (long)_aio_event_arr[i].res;
- if (aioret < 0)
- {
+ if (aioret < 0) {
std::ostringstream oss;
oss << "AIO write operation failed: " << std::strerror(-aioret) << " (" << aioret << ") [";
- if (pcbp)
+ if (pcbp) {
oss << "pg=" << pcbp->_index;
- else
- {
- // TODO: replace for linearstore: fhp->_pfid
-// file_hdr_t* fhp = (file_hdr_t*)aiocbp->u.c.buf;
-// oss << "fid=" << fhp->_pfid;
+ } else {
+ file_hdr_t* fhp = (file_hdr_t*)aiocbp->u.c.buf;
+ oss << "fnum=" << fhp->_file_number;
+ oss << " qname=" << std::string((char*)fhp + sizeof(file_hdr_t), fhp->_queue_name_len);
}
oss << " size=" << aiocbp->u.c.nbytes;
oss << " offset=" << aiocbp->u.c.offset << " fh=" << aiocbp->aio_fildes << "]";
@@ -790,16 +741,15 @@ wmgr::get_events(page_state state, times
oss << "dtok_id=" << dtokp->id() << " dtok_state=" << dtokp->wstate_str();
throw jexception(jerrno::JERR_WMGR_BADDTOKSTATE, oss.str(), "wmgr",
"get_events");
- } // switch
- } // if
- } // for
+ }
+ }
+ }
// Increment the completed write offset
// NOTE: We cannot use _wrfc here, as it may have rotated since submitting count.
// Use stored pointer to fcntl in the pcb instead.
- // TODO: replace for linearstore: pcbp->_wfh
-// pcbp->_wfh->add_wr_cmpl_cnt_dblks(pcbp->_wdblks);
-// pcbp->_wfh->decr_aio_cnt();
+ pcbp->_jfp->addCompletedDblkCount(pcbp->_wdblks);
+ pcbp->_jfp->decrOutstandingAioOperationCount();
_jc->instr_decr_outstanding_aio_cnt();
// Clean up this pcb's data_tok list
@@ -812,16 +762,10 @@ wmgr::get_events(page_state state, times
}
else // File header writes have no pcb
{
- // get lfid from original file header record, update info for that lfid
- // TODO: replace for linearstore: lfid
-/*
- file_hdr_t* fhp = (file_hdr*)aiocbp->u.c.buf;
- uint32_t lfid = fhp->_lfid;
- fcntl* fcntlp = _jc->get_fcntlp(lfid);
- fcntlp->add_wr_cmpl_cnt_dblks(JRNL_SBLK_SIZE_DBLKS);
- fcntlp->decr_aio_cnt();
- fcntlp->set_wr_fhdr_aio_outstanding(false);
-*/
+ file_hdr_t* fhp = (file_hdr_t*)aiocbp->u.c.buf;
+ _lfc.addWriteCompletedDblkCount(fhp->_file_number, QLS_JRNL_FHDR_RES_SIZE_SBLKS * JRNL_SBLK_SIZE_DBLKS);
+ _lfc.decrOutstandingAioOperationCount(fhp->_file_number);
+ //fcntlp->set_wr_fhdr_aio_outstanding(false); // TODO: Do we need this?
}
}
@@ -840,35 +784,15 @@ wmgr::is_txn_synced(const std::string& x
}
void
-wmgr::initialize(aio_callback* const /*cbp*/, const uint32_t /*wcache_pgsize_sblks*/, const uint16_t /*wcache_num_pages*/)
+wmgr::initialize(aio_callback* const cbp, const uint32_t wcache_pgsize_sblks, const uint16_t wcache_num_pages)
{
-/*
+
pmgr::initialize(cbp, wcache_pgsize_sblks, wcache_num_pages);
wmgr::clean();
- _num_jfiles = _jc->num_jfiles(); // TODO: replace for linearstore: _jc->num_jfiles()
- if (::posix_memalign(&_fhdr_base_ptr, _sblksize, _sblksize * _num_jfiles))
- {
- wmgr::clean();
- std::ostringstream oss;
- oss << "posix_memalign(): blksize=" << _sblksize << " size=" << _sblksize;
- oss << FORMAT_SYSERR(errno);
- throw jexception(jerrno::JERR__MALLOC, oss.str(), "wmgr", "initialize");
- }
- _fhdr_ptr_arr = (void**)std::malloc(_num_jfiles * sizeof(void*));
- MALLOC_CHK(_fhdr_ptr_arr, "_fhdr_ptr_arr", "wmgr", "initialize");
- _fhdr_aio_cb_arr = (aio_cb**)std::malloc(sizeof(aio_cb*) * _num_jfiles);
- MALLOC_CHK(_fhdr_aio_cb_arr, "_fhdr_aio_cb_arr", "wmgr", "initialize");
- std::memset(_fhdr_aio_cb_arr, 0, sizeof(aio_cb*) * _num_jfiles);
- for (uint16_t i=0; i<_num_jfiles; i++)
- {
- _fhdr_ptr_arr[i] = (void*)((char*)_fhdr_base_ptr + _sblksize * i);
- _fhdr_aio_cb_arr[i] = new aio_cb;
- }
_page_cb_arr[0]._state = IN_USE;
_ddtokl.clear();
_cached_offset_dblks = 0;
_enq_busy = false;
-*/
}
iores
@@ -877,7 +801,7 @@ wmgr::pre_write_check(const _op_type op,
) const
{
// Check status of current file
- // TODO: replace for linearstore: _wrfc
+ // TODO: Replace for LFC
/*
if (!_wrfc.is_wr_reset())
{
@@ -907,13 +831,6 @@ wmgr::pre_write_check(const _op_type op,
{
case WMGR_ENQUEUE:
{
- // Check for enqueue reaching cutoff threshold
- // TODO: replace for linearstore: _wrfc
-/*
- uint32_t size_dblks = jrec::size_dblks(enq_rec::rec_size(xidsize, dsize, external));
- if (!_enq_busy && _wrfc.enq_threshold(_cached_offset_dblks + size_dblks))
- return RHM_IORES_ENQCAPTHRESH;
-*/
if (!dtokp->is_writable())
{
std::ostringstream oss;
@@ -948,25 +865,22 @@ wmgr::dequeue_check(const std::string& x
{
// First check emap
bool found = false;
- int16_t fid = _emap.get_pfid(drid);
- if (fid < enq_map::EMAP_OK) // fail
- {
- if (fid == enq_map::EMAP_RID_NOT_FOUND)
- {
- if (xid.size())
+ int16_t fid;
+ short eres = _emap.get_pfid(drid, fid);
+ if (eres < enq_map::EMAP_OK) { // fail
+ if (eres == enq_map::EMAP_RID_NOT_FOUND) {
+ if (xid.size()) {
found = _tmap.data_exists(xid, drid);
- }
- else if (fid == enq_map::EMAP_LOCKED)
- {
+ }
+ } else if (eres == enq_map::EMAP_LOCKED) {
std::ostringstream oss;
oss << std::hex << "drid=0x" << drid;
throw jexception(jerrno::JERR_MAP_LOCKED, oss.str(), "wmgr", "dequeue_check");
}
- }
- else
+ } else {
found = true;
- if (!found)
- {
+ }
+ if (!found) {
std::ostringstream oss;
oss << "jrnl=" << _jc->id() << " drid=0x" << std::hex << drid;
throw jexception(jerrno::JERR_WMGR_DEQRIDNOTENQ, oss.str(), "wmgr", "dequeue_check");
@@ -980,10 +894,10 @@ wmgr::dblk_roundup()
uint32_t wdblks = jrec::size_blks(_cached_offset_dblks, JRNL_SBLK_SIZE_DBLKS) * JRNL_SBLK_SIZE_DBLKS;
while (_cached_offset_dblks < wdblks)
{
- void* wptr = (void*)((char*)_page_ptr_arr[_pg_index] + _pg_offset_dblks * JRNL_DBLK_SIZE);
+ void* wptr = (void*)((char*)_page_ptr_arr[_pg_index] + _pg_offset_dblks * JRNL_DBLK_SIZE_BYTES);
std::memcpy(wptr, (const void*)&xmagic, sizeof(xmagic));
#ifdef RHM_CLEAN
- std::memset((char*)wptr + sizeof(xmagic), RHM_CLEAN_CHAR, JRNL_DBLK_SIZE - sizeof(xmagic));
+ std::memset((char*)wptr + sizeof(xmagic), RHM_CLEAN_CHAR, JRNL_DBLK_SIZE_BYTES - sizeof(xmagic));
#endif
_pg_offset_dblks++;
_cached_offset_dblks++;
@@ -991,28 +905,6 @@ wmgr::dblk_roundup()
}
void
-wmgr::write_fhdr(uint64_t rid, uint16_t fid, uint16_t /*lid*/, std::size_t fro)
-{
- file_hdr_t fhdr/*(QLS_FILE_MAGIC, QLS_JRNL_VERSION, rid, fid, lid, fro, _wrfc.owi(), true)*/;
- /*int err =*/ ::file_hdr_init(&fhdr, 0, rid, fro, 0, _jc->id().length(), _jc->id().c_str());
- std::memcpy(_fhdr_ptr_arr[fid], &fhdr, sizeof(fhdr));
-#ifdef RHM_CLEAN
- std::memset((char*)_fhdr_ptr_arr[fid] + sizeof(fhdr), RHM_CLEAN_CHAR, _sblksize - sizeof(fhdr));
-#endif
- aio_cb* aiocbp = _fhdr_aio_cb_arr[fid];
-// aio::prep_pwrite(aiocbp, _wrfc.fh(), _fhdr_ptr_arr[fid], _sblksize, 0); // TODO: replace for linearstore: _wrfc
- if (aio::submit(_ioctx, 1, &aiocbp) < 0)
- throw jexception(jerrno::JERR__AIO, "wmgr", "write_fhdr");
- _aio_evt_rem++;
- // TODO: replace for linearstore: _wrfc
-/*
- _wrfc.add_subm_cnt_dblks(JRNL_SBLK_SIZE_DBLKS);
- _wrfc.incr_aio_cnt();
- _wrfc.file_controller()->set_wr_fhdr_aio_outstanding(true);
-*/
-}
-
-void
wmgr::rotate_page()
{
_page_cb_arr[_pg_index]._state = AIO_PENDING;
@@ -1026,21 +918,8 @@ wmgr::rotate_page()
}
void
-wmgr::clean()
-{
- std::free(_fhdr_base_ptr);
- _fhdr_base_ptr = 0;
-
- std::free(_fhdr_ptr_arr);
- _fhdr_ptr_arr = 0;
-
- if (_fhdr_aio_cb_arr)
- {
-// for (uint32_t i=0; i<_num_jfiles; i++)
-// delete _fhdr_aio_cb_arr[i];
- std::free(_fhdr_aio_cb_arr);
- _fhdr_aio_cb_arr = 0;
- }
+wmgr::clean() {
+ // Clean up allocated memory here
}
const std::string
@@ -1063,7 +942,7 @@ wmgr::status_str() const
default: oss << _page_cb_arr[i]._state;
}
}
- oss << "] " /*<< _wrfc.status_str()*/; // TODO: replace for linearstore: _wrfc
+ oss << "] " << _lfc.status(0);
return oss.str();
}
Modified: qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/wmgr.h
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/wmgr.h?rev=1530024&r1=1530023&r2=1530024&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/wmgr.h (original)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/wmgr.h Mon Oct 7 18:39:24 2013
@@ -30,15 +30,18 @@ class wmgr;
}}
#include <cstring>
+#include "qpid/linearstore/jrnl/EmptyFilePoolTypes.h"
#include "qpid/linearstore/jrnl/enums.h"
#include "qpid/linearstore/jrnl/pmgr.h"
-//#include "qpid/linearstore/jrnl/wrfc.h"
#include <set>
+class file_hdr_t;
+
namespace qpid
{
namespace qls_jrnl
{
+ class LinearFileController;
/**
* \brief Class for managing a write page cache of arbitrary size and number of pages.
@@ -59,17 +62,11 @@ namespace qls_jrnl
class wmgr : public pmgr
{
private:
-// wrfc& _wrfc; ///< Ref to write rotating file controller
+ LinearFileController& _lfc; ///< Linear File Controller ref
uint32_t _max_dtokpp; ///< Max data writes per page
uint32_t _max_io_wait_us; ///< Max wait in microseconds till submit
- void* _fhdr_base_ptr; ///< Base pointer to file header memory
- void** _fhdr_ptr_arr; ///< Array of pointers to file headers memory
- aio_cb** _fhdr_aio_cb_arr; ///< Array of iocb pointers for file header writes
uint32_t _cached_offset_dblks; ///< Amount of unwritten data in page (dblocks)
std::deque<data_tok*> _ddtokl; ///< Deferred dequeue data_tok list
-// uint32_t _jfsize_dblks; ///< Journal file size in dblks (NOT sblks!)
-// uint32_t _jfsize_pgs; ///< Journal file size in cache pages
-// uint16_t _num_jfiles; ///< Number of files used in iocb mallocs
// TODO: Convert _enq_busy etc into a proper threadsafe lock
// TODO: Convert to enum? Are these encodes mutually exclusive?
@@ -87,9 +84,8 @@ namespace qls_jrnl
std::set<std::string> _txn_pending_set; ///< Set containing xids of pending commits/aborts
public:
- wmgr(jcntl* jc, enq_map& emap, txn_map& tmap/*, wrfc& wrfc*/);
- wmgr(jcntl* jc, enq_map& emap, txn_map& tmap/*, wrfc& wrfc*/, const uint32_t max_dtokpp,
- const uint32_t max_iowait_us);
+ wmgr(jcntl* jc, enq_map& emap, txn_map& tmap, LinearFileController& lfc);
+ wmgr(jcntl* jc, enq_map& emap, txn_map& tmap, LinearFileController& lfc, const uint32_t max_dtokpp, const uint32_t max_iowait_us);
virtual ~wmgr();
void initialize(aio_callback* const cbp, const uint32_t wcache_pgsize_sblks,
@@ -106,7 +102,6 @@ namespace qls_jrnl
int32_t get_events(page_state state, timespec* const timeout, bool flush = false);
bool is_txn_synced(const std::string& xid);
inline bool curr_pg_blocked() const { return _page_cb_arr[_pg_index]._state != UNUSED; }
-// inline bool curr_file_blocked() const; { return _wrfc.aio_cnt() > 0; }
inline uint32_t unflushed_dblks() { return _cached_offset_dblks; }
// Debug aid
@@ -122,9 +117,8 @@ namespace qls_jrnl
void file_header_check(const uint64_t rid, const bool cont, const uint32_t rec_dblks_rem);
void flush_check(iores& res, bool& cont, bool& done);
iores write_flush();
- iores rotate_file();
+ void get_next_file();
void dblk_roundup();
- void write_fhdr(uint64_t rid, uint16_t fid, uint16_t lid, std::size_t fro);
void rotate_page();
void clean();
};
Added: qpid/branches/linearstore/qpid/cpp/src/tests/linearstore/linearstoredirsetup.sh
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/tests/linearstore/linearstoredirsetup.sh?rev=1530024&view=auto
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/tests/linearstore/linearstoredirsetup.sh (added)
+++ qpid/branches/linearstore/qpid/cpp/src/tests/linearstore/linearstoredirsetup.sh Mon Oct 7 18:39:24 2013
@@ -0,0 +1,24 @@
+#!/bin/bash
+
+STORE_DIR=/tmp
+LINEARSTOREDIR=~/RedHat/linearstore
+
+rm -rf $STORE_DIR/qls
+rm -rf $STORE_DIR/p002
+rm $STORE_DIR/p004
+
+mkdir $STORE_DIR/qls
+mkdir $STORE_DIR/p002
+touch $STORE_DIR/p004
+mkdir $STORE_DIR/qls/p001
+touch $STORE_DIR/qls/p003
+ln -s $STORE_DIR/p002 $STORE_DIR/qls/p002
+ln -s $STORE_DIR/p004 $STORE_DIR/qls/p004
+
+${LINEARSTOREDIR}/tools/src/py/linearstore/efptool.py $STORE_DIR/qls/ -a -p 1 -s 2048 -n 25
+${LINEARSTOREDIR}/tools/src/py/linearstore/efptool.py $STORE_DIR/qls/ -a -p 1 -s 512 -n 25
+${LINEARSTOREDIR}/tools/src/py/linearstore/efptool.py $STORE_DIR/qls/ -a -p 2 -s 2048 -n 25
+
+${LINEARSTOREDIR}/tools/src/py/linearstore/efptool.py $STORE_DIR/qls/ -l
+tree -la $STORE_DIR/qls
+
Propchange: qpid/branches/linearstore/qpid/cpp/src/tests/linearstore/linearstoredirsetup.sh
------------------------------------------------------------------------------
svn:executable = *
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org