You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kp...@apache.org on 2013/10/21 23:26:11 UTC
svn commit: r1534383 [3/4] - in /qpid/branches/linearstore/qpid/cpp/src: ./
qpid/broker/ qpid/legacystore/jrnl/ qpid/linearstore/
qpid/linearstore/jrnl/ qpid/linearstore/jrnl/utils/
Modified: qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/deq_rec.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/deq_rec.cpp?rev=1534383&r1=1534382&r2=1534383&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/deq_rec.cpp (original)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/deq_rec.cpp Mon Oct 21 21:26:10 2013
@@ -110,8 +110,8 @@ deq_rec::encode(void* wptr, uint32_t rec
if (_xidp == 0)
assert(_deq_hdr._xidsize == 0);
- std::size_t rec_offs = rec_offs_dblks * JRNL_DBLK_SIZE_BYTES;
- std::size_t rem = max_size_dblks * JRNL_DBLK_SIZE_BYTES;
+ std::size_t rec_offs = rec_offs_dblks * QLS_DBLK_SIZE_BYTES;
+ std::size_t rem = max_size_dblks * QLS_DBLK_SIZE_BYTES;
std::size_t wr_cnt = 0;
if (rec_offs_dblks) // Continuation of split dequeue record (over 2 or more pages)
{
@@ -161,10 +161,10 @@ deq_rec::encode(void* wptr, uint32_t rec
{
std::memcpy((char*)wptr + wr_cnt, (char*)&_deq_tail + rec_offs, wsize);
wr_cnt += wsize;
-#ifdef RHM_CLEAN
- std::size_t rec_offs = rec_offs_dblks * JRNL_DBLK_SIZE_BYTES;
- std::size_t dblk_rec_size = size_dblks(rec_size() - rec_offs) * JRNL_DBLK_SIZE_BYTES;
- std::memset((char*)wptr + wr_cnt, RHM_CLEAN_CHAR, dblk_rec_size - wr_cnt);
+#ifdef QLS_CLEAN
+ std::size_t rec_offs = rec_offs_dblks * QLS_DBLK_SIZE_BYTES;
+ std::size_t dblk_rec_size = size_dblks(rec_size() - rec_offs) * QLS_DBLK_SIZE_BYTES;
+ std::memset((char*)wptr + wr_cnt, QLS_CLEAN_CHAR, dblk_rec_size - wr_cnt);
#endif
}
rec_offs -= sizeof(_deq_tail) - wsize;
@@ -205,9 +205,9 @@ deq_rec::encode(void* wptr, uint32_t rec
std::memcpy((char*)wptr + wr_cnt, (void*)&_deq_tail, sizeof(_deq_tail));
wr_cnt += sizeof(_deq_tail);
}
-#ifdef RHM_CLEAN
- std::size_t dblk_rec_size = size_dblks(rec_size()) * JRNL_DBLK_SIZE_BYTES;
- std::memset((char*)wptr + wr_cnt, RHM_CLEAN_CHAR, dblk_rec_size - wr_cnt);
+#ifdef QLS_CLEAN
+ std::size_t dblk_rec_size = size_dblks(rec_size()) * QLS_DBLK_SIZE_BYTES;
+ std::memset((char*)wptr + wr_cnt, QLS_CLEAN_CHAR, dblk_rec_size - wr_cnt);
#endif
}
}
@@ -226,7 +226,7 @@ deq_rec::decode(rec_hdr_t& h, void* rptr
const uint32_t hdr_xid_dblks = size_dblks(sizeof(deq_hdr_t) + _deq_hdr._xidsize);
const uint32_t hdr_xid_tail_dblks = size_dblks(sizeof(deq_hdr_t) + _deq_hdr._xidsize +
sizeof(rec_tail_t));
- const std::size_t rec_offs = rec_offs_dblks * JRNL_DBLK_SIZE_BYTES;
+ const std::size_t rec_offs = rec_offs_dblks * QLS_DBLK_SIZE_BYTES;
if (hdr_xid_tail_dblks - rec_offs_dblks <= max_size_dblks)
{
@@ -259,7 +259,7 @@ deq_rec::decode(rec_hdr_t& h, void* rptr
const std::size_t xid_rem = _deq_hdr._xidsize - xid_offs;
std::memcpy((char*)_buff + xid_offs, rptr, xid_rem);
rd_cnt += xid_rem;
- const std::size_t tail_rem = (max_size_dblks * JRNL_DBLK_SIZE_BYTES) - rd_cnt;
+ const std::size_t tail_rem = (max_size_dblks * QLS_DBLK_SIZE_BYTES) - rd_cnt;
if (tail_rem)
{
std::memcpy((void*)&_deq_tail, ((char*)rptr + xid_rem), tail_rem);
@@ -269,7 +269,7 @@ deq_rec::decode(rec_hdr_t& h, void* rptr
else
{
// Remainder of xid split
- const std::size_t xid_cp_size = (max_size_dblks * JRNL_DBLK_SIZE_BYTES);
+ const std::size_t xid_cp_size = (max_size_dblks * QLS_DBLK_SIZE_BYTES);
std::memcpy((char*)_buff + rec_offs - sizeof(deq_hdr_t), rptr, xid_cp_size);
rd_cnt += xid_cp_size;
}
@@ -309,7 +309,7 @@ deq_rec::decode(rec_hdr_t& h, void* rptr
// Entire header and xid fit within this page, tail split
std::memcpy(_buff, (char*)rptr + rd_cnt, _deq_hdr._xidsize);
rd_cnt += _deq_hdr._xidsize;
- const std::size_t tail_rem = (max_size_dblks * JRNL_DBLK_SIZE_BYTES) - rd_cnt;
+ const std::size_t tail_rem = (max_size_dblks * QLS_DBLK_SIZE_BYTES) - rd_cnt;
if (tail_rem)
{
std::memcpy((void*)&_deq_tail, (char*)rptr + rd_cnt, tail_rem);
@@ -319,7 +319,7 @@ deq_rec::decode(rec_hdr_t& h, void* rptr
else
{
// Header fits within this page, xid split
- const std::size_t xid_cp_size = (max_size_dblks * JRNL_DBLK_SIZE_BYTES) - rd_cnt;
+ const std::size_t xid_cp_size = (max_size_dblks * QLS_DBLK_SIZE_BYTES) - rd_cnt;
std::memcpy(_buff, (char*)rptr + rd_cnt, xid_cp_size);
rd_cnt += xid_cp_size;
}
@@ -381,7 +381,7 @@ deq_rec::rcv_decode(rec_hdr_t h, std::if
return false;
}
}
- ifsp->ignore(rec_size_dblks() * JRNL_DBLK_SIZE_BYTES - rec_size());
+ ifsp->ignore(rec_size_dblks() * QLS_DBLK_SIZE_BYTES - rec_size());
if (_deq_hdr._xidsize)
chk_tail(); // Throws if tail invalid or record incomplete
assert(!ifsp->fail() && !ifsp->bad());
Modified: qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/enq_map.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/enq_map.cpp?rev=1534383&r1=1534382&r2=1534383&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/enq_map.cpp (original)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/enq_map.cpp Mon Oct 21 21:26:10 2013
@@ -47,13 +47,13 @@ enq_map::~enq_map() {}
short
-enq_map::insert_pfid(const uint64_t rid, const uint16_t pfid, const std::streampos file_posn)
+enq_map::insert_pfid(const uint64_t rid, const uint64_t pfid, const std::streampos file_posn)
{
return insert_pfid(rid, pfid, file_posn, false);
}
short
-enq_map::insert_pfid(const uint64_t rid, const uint16_t pfid, const std::streampos file_posn, const bool locked)
+enq_map::insert_pfid(const uint64_t rid, const uint64_t pfid, const std::streampos file_posn, const bool locked)
{
std::pair<emap_itr, bool> ret;
emap_data_struct_t rec(pfid, file_posn, locked);
@@ -67,7 +67,7 @@ enq_map::insert_pfid(const uint64_t rid,
}
short
-enq_map::get_pfid(const uint64_t rid, int16_t& pfid)
+enq_map::get_pfid(const uint64_t rid, uint64_t& pfid)
{
slock s(_mutex);
emap_itr itr = _map.find(rid);
@@ -80,7 +80,7 @@ enq_map::get_pfid(const uint64_t rid, in
}
short
-enq_map::get_remove_pfid(const uint64_t rid, int16_t& pfid, const bool txn_flag)
+enq_map::get_remove_pfid(const uint64_t rid, uint64_t& pfid, const bool txn_flag)
{
slock s(_mutex);
emap_itr itr = _map.find(rid);
@@ -173,7 +173,7 @@ enq_map::rid_list(std::vector<uint64_t>&
}
void
-enq_map::pfid_list(std::vector<uint16_t>& fv)
+enq_map::pfid_list(std::vector<uint64_t>& fv)
{
fv.clear();
{
Modified: qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/enq_map.h
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/enq_map.h?rev=1534383&r1=1534382&r2=1534383&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/enq_map.h (original)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/enq_map.h Mon Oct 21 21:26:10 2013
@@ -72,11 +72,11 @@ namespace qls_jrnl
static short EMAP_TRUE;
typedef struct emap_data_struct_t {
- uint16_t _pfid;
+ uint64_t _pfid;
std::streampos _file_posn;
bool _lock;
emap_data_struct_t() : _pfid(0), _file_posn(0), _lock(false) {}
- emap_data_struct_t(const uint16_t pfid, const std::streampos file_posn, const bool lock) : _pfid(pfid), _file_posn(file_posn), _lock(lock) {}
+ emap_data_struct_t(const uint64_t pfid, const std::streampos file_posn, const bool lock) : _pfid(pfid), _file_posn(file_posn), _lock(lock) {}
} emqp_data_struct_t;
typedef std::pair<uint64_t, emap_data_struct_t> emap_param;
typedef std::map<uint64_t, emap_data_struct_t> emap;
@@ -85,19 +85,15 @@ namespace qls_jrnl
private:
emap _map;
smutex _mutex;
-// std::vector<uint32_t> _pfid_enq_cnt;
public:
enq_map();
virtual ~enq_map();
-// void set_num_jfiles(const uint16_t num_jfiles);
-// inline uint32_t get_enq_cnt(const uint16_t pfid) const { return _pfid_enq_cnt.at(pfid); };
-
- short insert_pfid(const uint64_t rid, const uint16_t pfid, const std::streampos file_posn); // 0=ok; -3=duplicate rid;
- short insert_pfid(const uint64_t rid, const uint16_t pfid, const std::streampos file_posn, const bool locked); // 0=ok; -3=duplicate rid;
- short get_pfid(const uint64_t rid, int16_t& pfid); // >=0=pfid; -1=rid not found; -2=locked
- short get_remove_pfid(const uint64_t rid, int16_t& pfid, const bool txn_flag = false); // >=0=pfid; -1=rid not found; -2=locked
+ short insert_pfid(const uint64_t rid, const uint64_t pfid, const std::streampos file_posn); // 0=ok; -3=duplicate rid;
+ short insert_pfid(const uint64_t rid, const uint64_t pfid, const std::streampos file_posn, const bool locked); // 0=ok; -3=duplicate rid;
+ short get_pfid(const uint64_t rid, uint64_t& pfid); // >=0=pfid; -1=rid not found; -2=locked
+ short get_remove_pfid(const uint64_t rid, uint64_t& pfid, const bool txn_flag = false); // >=0=pfid; -1=rid not found; -2=locked
short get_file_posn(const uint64_t rid, std::streampos& file_posn); // -1=rid not found; -2=locked
short get_data(const uint64_t rid, emap_data_struct_t& eds);
bool is_enqueued(const uint64_t rid, bool ignore_lock = false);
@@ -108,7 +104,7 @@ namespace qls_jrnl
inline bool empty() const { return _map.empty(); }
inline uint32_t size() const { return uint32_t(_map.size()); }
void rid_list(std::vector<uint64_t>& rv);
- void pfid_list(std::vector<uint16_t>& fv);
+ void pfid_list(std::vector<uint64_t>& fv);
};
}}
Modified: qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/enq_rec.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/enq_rec.cpp?rev=1534383&r1=1534382&r2=1534383&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/enq_rec.cpp (original)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/enq_rec.cpp Mon Oct 21 21:26:10 2013
@@ -109,8 +109,8 @@ enq_rec::encode(void* wptr, uint32_t rec
if (_xidp == 0)
assert(_enq_hdr._xidsize == 0);
- std::size_t rec_offs = rec_offs_dblks * JRNL_DBLK_SIZE_BYTES;
- std::size_t rem = max_size_dblks * JRNL_DBLK_SIZE_BYTES;
+ std::size_t rec_offs = rec_offs_dblks * QLS_DBLK_SIZE_BYTES;
+ std::size_t rem = max_size_dblks * QLS_DBLK_SIZE_BYTES;
std::size_t wr_cnt = 0;
if (rec_offs_dblks) // Continuation of split data record (over 2 or more pages)
{
@@ -181,10 +181,10 @@ enq_rec::encode(void* wptr, uint32_t rec
{
std::memcpy((char*)wptr + wr_cnt, (char*)&_enq_tail + rec_offs, wsize);
wr_cnt += wsize;
-#ifdef RHM_CLEAN
- std::size_t rec_offs = rec_offs_dblks * JRNL_DBLK_SIZE_BYTES;
- std::size_t dblk_rec_size = size_dblks(rec_size() - rec_offs) * JRNL_DBLK_SIZE_BYTES;
- std::memset((char*)wptr + wr_cnt, RHM_CLEAN_CHAR, dblk_rec_size - wr_cnt);
+#ifdef QLS_CLEAN
+ std::size_t rec_offs = rec_offs_dblks * QLS_DBLK_SIZE_BYTES;
+ std::size_t dblk_rec_size = size_dblks(rec_size() - rec_offs) * QLS_DBLK_SIZE_BYTES;
+ std::memset((char*)wptr + wr_cnt, QLS_CLEAN_CHAR, dblk_rec_size - wr_cnt);
#endif
}
rec_offs -= sizeof(_enq_tail) - wsize;
@@ -237,9 +237,9 @@ enq_rec::encode(void* wptr, uint32_t rec
}
std::memcpy((char*)wptr + wr_cnt, (void*)&_enq_tail, sizeof(_enq_tail));
wr_cnt += sizeof(_enq_tail);
-#ifdef RHM_CLEAN
- std::size_t dblk_rec_size = size_dblks(rec_size()) * JRNL_DBLK_SIZE_BYTES;
- std::memset((char*)wptr + wr_cnt, RHM_CLEAN_CHAR, dblk_rec_size - wr_cnt);
+#ifdef QLS_CLEAN
+ std::size_t dblk_rec_size = size_dblks(rec_size()) * QLS_DBLK_SIZE_BYTES;
+ std::memset((char*)wptr + wr_cnt, QLS_CLEAN_CHAR, dblk_rec_size - wr_cnt);
#endif
}
}
@@ -260,7 +260,7 @@ enq_rec::decode(rec_hdr_t& h, void* rptr
const uint32_t hdr_xid_data_tail_size = hdr_xid_data_size + sizeof(rec_tail_t);
const uint32_t hdr_data_dblks = size_dblks(hdr_xid_data_size);
const uint32_t hdr_tail_dblks = size_dblks(hdr_xid_data_tail_size);
- const std::size_t rec_offs = rec_offs_dblks * JRNL_DBLK_SIZE_BYTES;
+ const std::size_t rec_offs = rec_offs_dblks * QLS_DBLK_SIZE_BYTES;
const std::size_t offs = rec_offs - sizeof(enq_hdr_t);
if (hdr_tail_dblks - rec_offs_dblks <= max_size_dblks)
@@ -331,7 +331,7 @@ enq_rec::decode(rec_hdr_t& h, void* rptr
std::memcpy((char*)_buff + offs, rptr, data_rem);
rd_cnt += data_rem;
}
- const std::size_t tail_rem = (max_size_dblks * JRNL_DBLK_SIZE_BYTES) - rd_cnt;
+ const std::size_t tail_rem = (max_size_dblks * QLS_DBLK_SIZE_BYTES) - rd_cnt;
if (tail_rem)
{
std::memcpy((void*)&_enq_tail, ((char*)rptr + rd_cnt), tail_rem);
@@ -341,7 +341,7 @@ enq_rec::decode(rec_hdr_t& h, void* rptr
else
{
// Since xid and data are contiguous, both fit within current page - copy whole page
- const std::size_t data_cp_size = (max_size_dblks * JRNL_DBLK_SIZE_BYTES);
+ const std::size_t data_cp_size = (max_size_dblks * QLS_DBLK_SIZE_BYTES);
std::memcpy((char*)_buff + offs, rptr, data_cp_size);
rd_cnt += data_cp_size;
}
@@ -405,7 +405,7 @@ enq_rec::decode(rec_hdr_t& h, void* rptr
_enq_hdr._dsize);
rd_cnt += _enq_hdr._dsize;
}
- const std::size_t tail_rem = (max_size_dblks * JRNL_DBLK_SIZE_BYTES) - rd_cnt;
+ const std::size_t tail_rem = (max_size_dblks * QLS_DBLK_SIZE_BYTES) - rd_cnt;
if (tail_rem)
{
std::memcpy((void*)&_enq_tail, (char*)rptr + rd_cnt, tail_rem);
@@ -422,7 +422,7 @@ enq_rec::decode(rec_hdr_t& h, void* rptr
}
if (_enq_hdr._dsize && !::is_enq_external(&_enq_hdr))
{
- const std::size_t data_cp_size = (max_size_dblks * JRNL_DBLK_SIZE_BYTES) - rd_cnt;
+ const std::size_t data_cp_size = (max_size_dblks * QLS_DBLK_SIZE_BYTES) - rd_cnt;
std::memcpy((char*)_buff + _enq_hdr._xidsize, (char*)rptr + rd_cnt, data_cp_size);
rd_cnt += data_cp_size;
}
@@ -430,7 +430,7 @@ enq_rec::decode(rec_hdr_t& h, void* rptr
else
{
// Header fits within this page, xid split or separated
- const std::size_t data_cp_size = (max_size_dblks * JRNL_DBLK_SIZE_BYTES) - rd_cnt;
+ const std::size_t data_cp_size = (max_size_dblks * QLS_DBLK_SIZE_BYTES) - rd_cnt;
std::memcpy(_buff, (char*)rptr + rd_cnt, data_cp_size);
rd_cnt += data_cp_size;
}
@@ -516,7 +516,7 @@ enq_rec::rcv_decode(rec_hdr_t h, std::if
return false;
}
}
- ifsp->ignore(rec_size_dblks() * JRNL_DBLK_SIZE_BYTES - rec_size());
+ ifsp->ignore(rec_size_dblks() * QLS_DBLK_SIZE_BYTES - rec_size());
chk_tail(); // Throws if tail invalid or record incomplete
assert(!ifsp->fail() && !ifsp->bad());
return true;
Modified: qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/enums.h
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/enums.h?rev=1534383&r1=1534382&r2=1534383&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/enums.h (original)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/enums.h Mon Oct 21 21:26:10 2013
@@ -40,9 +40,9 @@ namespace qls_jrnl
// RHM_IORES_RCINVALID, ///< Read page cache is invalid (ie obsolete or uninitialized)
// RHM_IORES_ENQCAPTHRESH, ///< Enqueue capacity threshold (limit) reached.
// RHM_IORES_FULL, ///< During write operations, the journal files are full.
- RHM_IORES_BUSY, ///< Another blocking operation is in progress.
- RHM_IORES_TXPENDING, ///< Operation blocked by pending transaction.
- RHM_IORES_NOTIMPL ///< Function is not implemented.
+// RHM_IORES_BUSY, ///< Another blocking operation is in progress.
+ RHM_IORES_TXPENDING ///< Operation blocked by pending transaction.
+// RHM_IORES_NOTIMPL ///< Function is not implemented.
};
typedef _iores iores;
@@ -57,9 +57,9 @@ namespace qls_jrnl
// case RHM_IORES_RCINVALID: return "RHM_IORES_RCINVALID";
// case RHM_IORES_ENQCAPTHRESH: return "RHM_IORES_ENQCAPTHRESH";
// case RHM_IORES_FULL: return "RHM_IORES_FULL";
- case RHM_IORES_BUSY: return "RHM_IORES_BUSY";
+// case RHM_IORES_BUSY: return "RHM_IORES_BUSY";
case RHM_IORES_TXPENDING: return "RHM_IORES_TXPENDING";
- case RHM_IORES_NOTIMPL: return "RHM_IORES_NOTIMPL";
+// case RHM_IORES_NOTIMPL: return "RHM_IORES_NOTIMPL";
}
return "<iores unknown>";
}
Modified: qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/jcfg.h
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/jcfg.h?rev=1534383&r1=1534382&r2=1534383&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/jcfg.h (original)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/jcfg.h Mon Oct 21 21:26:10 2013
@@ -22,48 +22,37 @@
#ifndef QPID_LEGACYSTORE_JRNL_JCFG_H
#define QPID_LEGACYSTORE_JRNL_JCFG_H
+#define QLS_SBLK_SIZE_BYTES 4096 /**< Disk softblock size in bytes, should match size used on disk media */
+#define QLS_AIO_ALIGN_BOUNDARY_BYTES QLS_SBLK_SIZE_BYTES /** Memory alignment boundary used for DMA */
/**
-* <b>Rule:</b> Data block size (JRNL_DBLK_SIZE_BYTES) MUST be a power of 2 AND
-* a power of 2 factor of the disk softblock size (JRNL_SBLK_SIZE_BYTES):
+* <b>Rule:</b> Data block size (QLS_DBLK_SIZE_BYTES) MUST be a power of 2 AND
+* a power of 2 factor of the disk softblock size (QLS_SBLK_SIZE_BYTES):
* <pre>
-* n * JRNL_DBLK_SIZE_BYTES == JRNL_SBLK_SIZE_BYTES (n = 1,2,4,8...)
+* n * QLS_DBLK_SIZE_BYTES == QLS_SBLK_SIZE_BYTES (n = 1,2,4,8...)
* </pre>
*/
-#define JRNL_SBLK_SIZE_BYTES 4096 /**< Disk softblock size in bytes */
-#define QLS_AIO_ALIGN_BOUNDARY JRNL_SBLK_SIZE_BYTES
-#define JRNL_DBLK_SIZE_BYTES 128 /**< Data block size in bytes (CANNOT BE LESS THAN 32!) */
-#define JRNL_SBLK_SIZE_DBLKS (JRNL_SBLK_SIZE_BYTES / JRNL_DBLK_SIZE_BYTES) /**< Disk softblock size in multiples of JRNL_DBLK_SIZE */
-#define JRNL_SBLK_SIZE_KIB (JRNL_SBLK_SIZE_BYTES / 1024) /**< Disk softblock size in KiB */
-//#define JRNL_MIN_FILE_SIZE 128 ///< Min. jrnl file size in sblks (excl. file_hdr)
-//#define JRNL_MAX_FILE_SIZE 4194176 ///< Max. jrnl file size in sblks (excl. file_hdr)
-//#define JRNL_MIN_NUM_FILES 4 ///< Min. number of journal files
-//#define JRNL_MAX_NUM_FILES 64 ///< Max. number of journal files
-//#define JRNL_ENQ_THRESHOLD 80 ///< Percent full when enqueue connection will be closed
-//
-//#define JRNL_RMGR_PAGE_SIZE 128 ///< Journal page size in softblocks
-//#define JRNL_RMGR_PAGES 16 ///< Number of pages to use in wmgr
-//
-#define JRNL_WMGR_DEF_PAGE_SIZE_KIB 32
-#define JRNL_WMGR_DEF_PAGE_SIZE_SBLKS (JRNL_WMGR_DEF_PAGE_SIZE_KIB / JRNL_SBLK_SIZE_KIB) ///< Journal write page size in softblocks (default)
-#define JRNL_WMGR_DEF_PAGES 32 ///< Number of pages to use in wmgr (default)
-//
-#define JRNL_WMGR_MAXDTOKPP 1024 ///< Max. dtoks (data blocks) per page in wmgr
-#define JRNL_WMGR_MAXWAITUS 100 ///< Max. wait time (us) before submitting AIO
-//
-//#define JRNL_INFO_EXTENSION "jinf" ///< Extension for journal info files
-//#define JRNL_DATA_EXTENSION "jdat" ///< Extension for journal data files
-#define QLS_JRNL_FILE_EXTENSION ".jrnl" /**< Extension for journal data files */
-#define QLS_TXA_MAGIC 0x61534c51 /**< ("RHMa" in little endian) Magic for dtx abort hdrs */
-#define QLS_TXC_MAGIC 0x63534c51 /**< ("RHMc" in little endian) Magic for dtx commit hdrs */
-#define QLS_DEQ_MAGIC 0x64534c51 /**< ("QLSd" in little endian) Magic for deq rec hdrs */
-#define QLS_ENQ_MAGIC 0x65534c51 /**< ("QLSe" in little endian) Magic for enq rec hdrs */
-#define QLS_FILE_MAGIC 0x66534c51 /**< ("QLSf" in little endian) Magic for file hdrs */
-#define QLS_EMPTY_MAGIC 0x78534c51 /**< ("QLSx" in little endian) Magic for empty dblk */
-#define QLS_JRNL_VERSION 2 /**< Version (of file layout) */
-#define QLS_JRNL_FHDR_RES_SIZE_SBLKS 1 /**< Journal file header reserved size in sblks (as defined by JRNL_SBLK_SIZE_BYTES) */
-#define QLS_CLEAN_CHAR 0xff /**< Char used to clear empty space on disk */
-//
-//#define RHM_LENDIAN_FLAG 0 ///< Value of little endian flag on disk
-//#define RHM_BENDIAN_FLAG 1 ///< Value of big endian flag on disk
+#define QLS_DBLK_SIZE_BYTES 128 /**< Data block size in bytes (CANNOT BE LESS THAN 32!) */
+#define QLS_SBLK_SIZE_DBLKS (QLS_SBLK_SIZE_BYTES / QLS_DBLK_SIZE_BYTES) /**< Disk softblock size in multiples of QLS_DBLK_SIZE_BYTES */
+#define QLS_SBLK_SIZE_KIB (QLS_SBLK_SIZE_BYTES / 1024) /**< Disk softblock size in KiB */
-#endif // ifndef QPID_LEGACYSTORE_JRNL_JCFG_H
+#define QLS_WMGR_DEF_PAGE_SIZE_KIB 32 /**< Journal write page size in KiB (default) */
+#define QLS_WMGR_DEF_PAGE_SIZE_SBLKS (QLS_WMGR_DEF_PAGE_SIZE_KIB / QLS_SBLK_SIZE_KIB) /**< Journal write page size in softblocks (default) */
+#define QLS_WMGR_DEF_PAGES 32 /**< Number of pages to use in wmgr (default) */
+
+#define QLS_WMGR_MAXDTOKPP 1024 /**< Max. dtoks (data blocks) per page in wmgr */
+#define QLS_WMGR_MAXWAITUS 100 /**< Max. wait time (us) before submitting AIO */
+
+#define QLS_JRNL_FILE_EXTENSION ".jrnl" /**< Extension for journal data files */
+#define QLS_TXA_MAGIC 0x61534c51 /**< ("RHMa" in little endian) Magic for dtx abort hdrs */
+#define QLS_TXC_MAGIC 0x63534c51 /**< ("RHMc" in little endian) Magic for dtx commit hdrs */
+#define QLS_DEQ_MAGIC 0x64534c51 /**< ("QLSd" in little endian) Magic for deq rec hdrs */
+#define QLS_ENQ_MAGIC 0x65534c51 /**< ("QLSe" in little endian) Magic for enq rec hdrs */
+#define QLS_FILE_MAGIC 0x66534c51 /**< ("QLSf" in little endian) Magic for file hdrs */
+#define QLS_EMPTY_MAGIC 0x78534c51 /**< ("QLSx" in little endian) Magic for empty dblk */
+#define QLS_JRNL_VERSION 2 /**< Version (of file layout) */
+#define QLS_JRNL_FHDR_RES_SIZE_SBLKS 1 /**< Journal file header reserved size in sblks (as defined by QLS_SBLK_SIZE_BYTES) */
+
+#define QLS_CLEAN /**< If defined, writes QLS_CLEAN_CHAR to all filled areas on disk */
+#define QLS_CLEAN_CHAR 0xff /**< Char used to clear empty space on disk */
+
+#endif /* ifndef QPID_LEGACYSTORE_JRNL_JCFG_H */
Modified: qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/jcntl.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/jcntl.cpp?rev=1534383&r1=1534382&r2=1534383&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/jcntl.cpp (original)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/jcntl.cpp Mon Oct 21 21:26:10 2013
@@ -31,10 +31,8 @@
#include <iostream>
#include <qpid/linearstore/jrnl/EmptyFilePool.h>
#include <qpid/linearstore/jrnl/EmptyFilePoolManager.h>
-//#include "qpid/linearstore/jrnl/file_hdr.h"
#include "qpid/linearstore/jrnl/jerrno.h"
-//#include "qpid/linearstore/jrnl/jinf.h"
-//#include "qpid/linearstore/jrnl/JournalFileController.h"
+#include "qpid/linearstore/jrnl/JournalLog.h"
#include "qpid/linearstore/jrnl/utils/enq_hdr.h"
#include <limits>
#include <sstream>
@@ -66,25 +64,21 @@ bool jcntl::init_statics()
// Functions
-jcntl::jcntl(const std::string& jid, const std::string& jdir/*, const std::string& base_filename*/):
+jcntl::jcntl(const std::string& jid,
+ const std::string& jdir,
+ JournalLog& jrnl_log):
_jid(jid),
- _jdir(jdir/*, base_filename*/),
-// _base_filename(base_filename),
+ _jdir(jdir),
_init_flag(false),
_stop_flag(false),
_readonly_flag(false),
-// _autostop(true),
+ _jrnl_log(jrnl_log),
_linearFileController(*this),
_emptyFilePoolPtr(0),
-// _jfsize_sblks(0),
-// _lpmgr(),
_emap(),
_tmap(),
-// _rrfc(&_lpmgr),
-// _wrfc(&_lpmgr),
-// _rmgr(this, _emap, _tmap/*, _rrfc*/),
- _wmgr(this, _emap, _tmap, _linearFileController/*, _wrfc*/),
- _rcvdat()
+ _wmgr(this, _emap, _tmap, _linearFileController),
+ _recoveryManager(_jdir.dirname(), _jid, _emap, _tmap, jrnl_log)
{}
jcntl::~jcntl()
@@ -92,14 +86,14 @@ jcntl::~jcntl()
if (_init_flag && !_stop_flag)
try { stop(true); }
catch (const jexception& e) { std::cerr << e << std::endl; }
-// _lpmgr.finalize();
_linearFileController.finalize();
}
void
-jcntl::initialize(/*const uint16_t num_jfiles, const bool ae, const uint16_t ae_max_jfiles,
- const uint32_t jfsize_sblks,*/ EmptyFilePool* efpp, const uint16_t wcache_num_pages, const uint32_t wcache_pgsize_sblks,
- aio_callback* const cbp)
+jcntl::initialize(EmptyFilePool* efpp,
+ const uint16_t wcache_num_pages,
+ const uint32_t wcache_pgsize_sblks,
+ aio_callback* const cbp)
{
_init_flag = false;
_stop_flag = false;
@@ -126,14 +120,14 @@ jcntl::initialize(/*const uint16_t num_j
_jdir.clear_dir();
// _lpmgr.initialize(num_jfiles, ae, ae_max_jfiles, this, &new_fcntl); // Creates new journal files
- _linearFileController.initialize(_jdir.dirname(), efpp);
+ _linearFileController.initialize(_jdir.dirname(), efpp, 0ULL);
_linearFileController.pullEmptyFileFromEfp();
- std::cout << _linearFileController.status(2);
+// std::cout << _linearFileController.status(2);
// _wrfc.initialize(_jfsize_sblks);
// _rrfc.initialize();
// _rrfc.set_findex(0);
// _rmgr.initialize(cbp);
- _wmgr.initialize(cbp, wcache_pgsize_sblks, wcache_num_pages, JRNL_WMGR_MAXDTOKPP, JRNL_WMGR_MAXWAITUS);
+ _wmgr.initialize(cbp, wcache_pgsize_sblks, wcache_num_pages, QLS_WMGR_MAXDTOKPP, QLS_WMGR_MAXWAITUS);
// Write info file (<basename>.jinf) to disk
// write_infofile();
@@ -142,7 +136,7 @@ jcntl::initialize(/*const uint16_t num_j
}
void
-jcntl::recover(EmptyFilePoolManager* efpm,
+jcntl::recover(EmptyFilePoolManager* efpmp,
const uint16_t wcache_num_pages,
const uint32_t wcache_pgsize_sblks,
aio_callback* const cbp,
@@ -170,21 +164,26 @@ jcntl::recover(EmptyFilePoolManager* efp
_jdir.verify_dir();
// _rcvdat.reset(num_jfiles/*, ae, ae_max_jfiles*/);
- rcvr_janalyze(prep_txn_list_ptr, efpm);
- highest_rid = _rcvdat._h_rid;
+// rcvr_janalyze(prep_txn_list_ptr, efpm);
+ efpIdentity_t efpIdentity;
+ _recoveryManager.analyzeJournals(prep_txn_list_ptr, efpmp, &_emptyFilePoolPtr);
+
+ highest_rid = _recoveryManager.getHighestRecordId();
// if (_rcvdat._jfull)
// throw jexception(jerrno::JERR_JCNTL_RECOVERJFULL, "jcntl", "recover");
- this->log(/*LOG_DEBUG*/LOG_INFO, _jid, _rcvdat.to_log(_jid));
+ _jrnl_log.log(/*LOG_DEBUG*/JournalLog::LOG_INFO, _jid, _recoveryManager.toString(_jid, true));
// _lpmgr.recover(_rcvdat, this, &new_fcntl);
- _linearFileController.initialize(_jdir.dirname(), _emptyFilePoolPtr);
+ _linearFileController.initialize(_jdir.dirname(), _emptyFilePoolPtr, _recoveryManager.getHighestFileNumber());
+// _linearFileController.setFileNumberCounter(_recoveryManager.getHighestFileNumber());
+ _recoveryManager.setLinearFileControllerJournals(&qpid::qls_jrnl::LinearFileController::addJournalFile, &_linearFileController);
// _wrfc.initialize(_jfsize_sblks, &_rcvdat);
// _rrfc.initialize();
// _rrfc.set_findex(_rcvdat.ffid());
// _rmgr.initialize(cbp);
- _wmgr.initialize(cbp, wcache_pgsize_sblks, wcache_num_pages, JRNL_WMGR_MAXDTOKPP, JRNL_WMGR_MAXWAITUS,
- (_rcvdat._lffull ? 0 : _rcvdat._eo));
+ _wmgr.initialize(cbp, wcache_pgsize_sblks, wcache_num_pages, QLS_WMGR_MAXDTOKPP, QLS_WMGR_MAXWAITUS,
+ (_recoveryManager.isLastFileFull() ? 0 : _recoveryManager.getEndOffset()));
_readonly_flag = true;
_init_flag = true;
@@ -214,8 +213,11 @@ jcntl::delete_jrnl_files()
iores
-jcntl::enqueue_data_record(const void* const data_buff, const std::size_t tot_data_len,
- const std::size_t this_data_len, data_tok* dtokp, const bool transient)
+jcntl::enqueue_data_record(const void* const data_buff,
+ const std::size_t tot_data_len,
+ const std::size_t this_data_len,
+ data_tok* dtokp,
+ const bool transient)
{
iores r;
check_wstatus("enqueue_data_record");
@@ -228,7 +230,9 @@ jcntl::enqueue_data_record(const void* c
}
iores
-jcntl::enqueue_extern_data_record(const std::size_t tot_data_len, data_tok* dtokp, const bool transient)
+jcntl::enqueue_extern_data_record(const std::size_t tot_data_len,
+ data_tok* dtokp,
+ const bool transient)
{
iores r;
check_wstatus("enqueue_extern_data_record");
@@ -240,9 +244,12 @@ jcntl::enqueue_extern_data_record(const
}
iores
-jcntl::enqueue_txn_data_record(const void* const data_buff, const std::size_t tot_data_len,
- const std::size_t this_data_len, data_tok* dtokp, const std::string& xid,
- const bool transient)
+jcntl::enqueue_txn_data_record(const void* const data_buff,
+ const std::size_t tot_data_len,
+ const std::size_t this_data_len,
+ data_tok* dtokp,
+ const std::string& xid,
+ const bool transient)
{
iores r;
check_wstatus("enqueue_tx_data_record");
@@ -255,8 +262,10 @@ jcntl::enqueue_txn_data_record(const voi
}
iores
-jcntl::enqueue_extern_txn_data_record(const std::size_t tot_data_len, data_tok* dtokp,
- const std::string& xid, const bool transient)
+jcntl::enqueue_extern_txn_data_record(const std::size_t tot_data_len,
+ data_tok* dtokp,
+ const std::string& xid,
+ const bool transient)
{
iores r;
check_wstatus("enqueue_extern_txn_data_record");
@@ -268,27 +277,21 @@ jcntl::enqueue_extern_txn_data_record(co
return r;
}
-/* TODO
-iores
-jcntl::get_data_record(const uint64_t& rid, const std::size_t& dsize, const std::size_t& dsize_avail,
- const void** const data, bool auto_discard)
-{
- check_rstatus("get_data_record");
- return _rmgr.get(rid, dsize, dsize_avail, data, auto_discard);
-} */
-
-/* TODO
iores
-jcntl::discard_data_record(data_tok* const dtokp)
-{
- check_rstatus("discard_data_record");
- return _rmgr.discard(dtokp);
-} */
-
-iores
-jcntl::read_data_record(void** const datapp, std::size_t& dsize, void** const xidpp, std::size_t& xidsize,
- bool& transient, bool& external, data_tok* const dtokp, bool /*ignore_pending_txns*/)
+jcntl::read_data_record(void** const datapp,
+ std::size_t& dsize,
+ void** const xidpp,
+ std::size_t& xidsize,
+ bool& transient,
+ bool& external,
+ data_tok* const dtokp,
+ bool ignore_pending_txns)
{
+ check_rstatus("read_data");
+ if (_recoveryManager.readNextRemainingRecord(datapp, dsize, xidpp, xidsize, transient, external, dtokp, ignore_pending_txns))
+ return RHM_IORES_SUCCESS;
+ return RHM_IORES_EMPTY;
+/*
if (!dtokp->is_readable()) {
std::ostringstream oss;
oss << std::hex << std::setfill('0');
@@ -303,18 +306,18 @@ jcntl::read_data_record(void** const dat
for (std::vector<uint64_t>::const_iterator i=ridl.begin(); i!=ridl.end(); ++i) {
short res = _emap.get_data(*i, eds);
if (res == enq_map::EMAP_OK) {
- std::ifstream ifs(_rcvdat._fm[eds._pfid].c_str(), std::ifstream::in | std::ifstream::binary);
+ std::ifstream ifs(_recoveryManager._fm[eds._pfid].c_str(), std::ifstream::in | std::ifstream::binary);
if (!ifs.good()) {
std::ostringstream oss;
- oss << "rid=" << (*i) << " pfid=" << eds._pfid << " file=" << _rcvdat._fm[eds._pfid] << " file_posn=" << eds._file_posn;
- throw jexception(jerrno::JERR_JCNTL_OPENRD, oss.str(), "jcntl", "read_data_record");
+ oss << "rid=" << (*i) << " pfid=" << eds._pfid << " file=" << _recoveryManager._fm[eds._pfid] << " file_posn=" << eds._file_posn;
+ throw jexception(jerrno::JERR_RCVM_OPENRD, oss.str(), "jcntl", "read_data_record");
}
ifs.seekg(eds._file_posn, std::ifstream::beg);
::enq_hdr_t eh;
ifs.read((char*)&eh, sizeof(::enq_hdr_t));
if (!::validate_enq_hdr(&eh, QLS_ENQ_MAGIC, QLS_JRNL_VERSION, *i)) {
std::ostringstream oss;
- oss << "rid=" << (*i) << " pfid=" << eds._pfid << " file=" << _rcvdat._fm[eds._pfid] << " file_posn=" << eds._file_posn;
+ oss << "rid=" << (*i) << " pfid=" << eds._pfid << " file=" << _recoveryManager._fm[eds._pfid] << " file_posn=" << eds._file_posn;
throw jexception(jerrno::JERR_JCNTL_INVALIDENQHDR, oss.str(), "jcntl", "read_data_record");
}
dsize = eh._dsize;
@@ -335,6 +338,7 @@ jcntl::read_data_record(void** const dat
}
}
}
+*/
/*
check_rstatus("read_data");
iores res = _rmgr.read(datapp, dsize, xidpp, xidsize, transient, external, dtokp, ignore_pending_txns);
@@ -354,7 +358,8 @@ jcntl::read_data_record(void** const dat
}
iores
-jcntl::dequeue_data_record(data_tok* const dtokp, const bool txn_coml_commit)
+jcntl::dequeue_data_record(data_tok* const dtokp,
+ const bool txn_coml_commit)
{
iores r;
check_wstatus("dequeue_data");
@@ -366,7 +371,9 @@ jcntl::dequeue_data_record(data_tok* con
}
iores
-jcntl::dequeue_txn_data_record(data_tok* const dtokp, const std::string& xid, const bool txn_coml_commit)
+jcntl::dequeue_txn_data_record(data_tok* const dtokp,
+ const std::string& xid,
+ const bool txn_coml_commit)
{
iores r;
check_wstatus("dequeue_data");
@@ -378,7 +385,8 @@ jcntl::dequeue_txn_data_record(data_tok*
}
iores
-jcntl::txn_abort(data_tok* const dtokp, const std::string& xid)
+jcntl::txn_abort(data_tok* const dtokp,
+ const std::string& xid)
{
iores r;
check_wstatus("txn_abort");
@@ -390,7 +398,8 @@ jcntl::txn_abort(data_tok* const dtokp,
}
iores
-jcntl::txn_commit(data_tok* const dtokp, const std::string& xid)
+jcntl::txn_commit(data_tok* const dtokp,
+ const std::string& xid)
{
iores r;
check_wstatus("txn_commit");
@@ -415,18 +424,9 @@ jcntl::get_wr_events(timespec* const tim
stlock t(_wr_mutex);
if (!t.locked())
return jerrno::LOCK_TAKEN;
- int32_t res = _wmgr.get_events(pmgr::UNUSED, timeout);
- return res;
+ return _wmgr.get_events(timeout, false);
}
-/*
-int32_t
-jcntl::get_rd_events(timespec* const timeout)
-{
- return _rmgr.get_events(pmgr::AIO_COMPLETE, timeout);
-}
-*/
-
void
jcntl::stop(const bool block_till_aio_cmpl)
{
@@ -462,56 +462,6 @@ jcntl::flush(const bool block_till_aio_c
return res;
}
-/*
-void
-jcntl::log(log_level_t ll, const std::string& log_stmt) const
-{
- log(ll, log_stmt.c_str());
-}
-
-void
-jcntl::log(log_level_t ll, const char* const log_stmt) const
-{
- if (ll > LOG_INFO)
- {
- std::cout << log_level_str(ll) << ": Journal \"" << _jid << "\": " << log_stmt << std::endl;
- }
-}
-*/
-
-/*
-void
-jcntl::chk_wr_frot()
-{
- if (_wrfc.index() == _rrfc.index())
- _rmgr.invalidate();
-}
-*/
-
-void
-jcntl::fhdr_wr_sync(const uint16_t /*lid*/)
-{
-/*
- fcntl* fcntlp = _lpmgr.get_fcntlp(lid);
- while (fcntlp->wr_fhdr_aio_outstanding())
- {
- if (get_wr_events(&_aio_cmpl_timeout) == jerrno::AIO_TIMEOUT)
- throw jexception(jerrno::JERR_JCNTL_AIOCMPLWAIT, "jcntl", "fhdr_wr_sync");
- }
-*/
-}
-
-/*
-fcntl*
-jcntl::new_fcntl(jcntl* const jcp, const uint16_t lid, const uint16_t fid, const rcvdat* const rdp)
-{
- if (!jcp) return 0;
- std::ostringstream oss;
- oss << jcp->jrnl_dir() << "/" << jcp->base_filename();
- return new fcntl(oss.str(), fid, lid, jcp->jfsize_sblks(), rdp);
-}
-*/
-
// Protected/Private functions
void
@@ -561,11 +511,15 @@ jcntl::handle_aio_wait(const iores res,
{
while (_wmgr.curr_pg_blocked())
{
- if (_wmgr.get_events(pmgr::UNUSED, &_aio_cmpl_timeout) == jerrno::AIO_TIMEOUT)
+ if (_wmgr.get_aio_evt_rem() == 0) {
+std::cout << "&&&&&& jcntl::handle_aio_wait() " << _wmgr.status_str() << std::endl; // DEBUG
+ throw jexception("_wmgr.curr_pg_blocked() with no events remaining"); // TODO - complete exception
+ }
+ if (_wmgr.get_events(&_aio_cmpl_timeout, false) == jerrno::AIO_TIMEOUT)
{
std::ostringstream oss;
oss << "get_events() returned JERR_JCNTL_AIOCMPLWAIT; wmgr_status: " << _wmgr.status_str();
- this->log(LOG_CRITICAL, _jid, oss.str());
+ _jrnl_log.log(JournalLog::LOG_CRITICAL, _jid, oss.str());
throw jexception(jerrno::JERR_JCNTL_AIOCMPLWAIT, "jcntl", "handle_aio_wait");
}
}
@@ -592,417 +546,4 @@ jcntl::handle_aio_wait(const iores res,
return false;
}
-
-// static
-void
-jcntl::rcvr_read_jfile(const std::string& jfn, ::file_hdr_t* fh, std::string& queueName) {
- const std::size_t headerBlockSize = QLS_JRNL_FHDR_RES_SIZE_SBLKS * JRNL_SBLK_SIZE_KIB * 1024;
- char buffer[headerBlockSize];
- std::ifstream ifs(jfn.c_str(), std::ifstream::in | std::ifstream::binary);
- if (!ifs.good()) {
- std::ostringstream oss;
- oss << "File=" << jfn;
- throw jexception(jerrno::JERR_JCNTL_OPENRD, oss.str(), "jcntl", "rcvr_read_jfile");
- }
- ifs.read(buffer, headerBlockSize);
- if (!ifs) {
- std::streamsize s = ifs.gcount();
- ifs.close();
- std::ostringstream oss;
- oss << "File=" << jfn << "; attempted_read_size=" << headerBlockSize << "; actual_read_size=" << s;
- throw jexception(jerrno::JERR_JCNTL_READ, oss.str(), "jcntl", "rcvr_read_jfile");
- }
- ifs.close();
- ::memcpy(fh, buffer, sizeof(::file_hdr_t));
- queueName.assign(buffer + sizeof(::file_hdr_t), fh->_queue_name_len);
-}
-
-
-void jcntl::rcvr_analyze_fhdrs(EmptyFilePoolManager* efpmp) {
- std::string headerQueueName;
- ::file_hdr_t fh;
- efpIdentity_t efpid;
-// std::map<uint64_t, std::string> fileMap;
- std::vector<std::string> dirList;
- jdir::read_dir(_jdir.dirname(), dirList, false, true, false, true);
- for (std::vector<std::string>::iterator i = dirList.begin(); i != dirList.end(); ++i) {
- rcvr_read_jfile(*i, &fh, headerQueueName);
- if (headerQueueName.compare(_jid) != 0) {
- std::ostringstream oss;
- oss << "Journal file " << (*i) << " belongs to queue \"" << headerQueueName << "\": ignoring";
- log(LOG_WARN, _jid, oss.str());
- } else {
- _rcvdat._fm[fh._file_number] = *i;
- efpid.first = fh._efp_partition;
- efpid.second = fh._file_size_kib;
- }
- }
- _rcvdat._jfl.clear();
- for (std::map<uint64_t, std::string>::iterator i=_rcvdat._fm.begin(); i!=_rcvdat._fm.end(); ++i) {
- _rcvdat._jfl.push_back(i->second);
- }
- _rcvdat._enq_cnt_list.resize(_rcvdat._jfl.size(), 0);
- _emptyFilePoolPtr = efpmp->getEmptyFilePool(efpid);
-}
-
-
-void jcntl::rcvr_janalyze(const std::vector<std::string>* prep_txn_list_ptr, EmptyFilePoolManager* efpmp) {
- // Analyze file headers of existing journal files
- rcvr_analyze_fhdrs(efpmp);
-
- // Restore all read and write pointers and transactions
- if (!_rcvdat._jempty)
- {
- uint16_t fid = 0;
- std::ifstream ifs;
- //bool lowi = rd._owi; // local copy of owi to be used during analysis
- while (rcvr_get_next_record(fid, &ifs)) ;
- if (ifs.is_open()) ifs.close();
-
- // Remove all txns from tmap that are not in the prepared list
- if (prep_txn_list_ptr)
- {
- std::vector<std::string> xid_list;
- _tmap.xid_list(xid_list);
- for (std::vector<std::string>::iterator itr = xid_list.begin(); itr != xid_list.end(); itr++)
- {
- std::vector<std::string>::const_iterator pitr =
- std::find(prep_txn_list_ptr->begin(), prep_txn_list_ptr->end(), *itr);
- if (pitr == prep_txn_list_ptr->end()) // not found in prepared list
- {
- txn_data_list tdl = _tmap.get_remove_tdata_list(*itr); // tdl will be empty if xid not found
- // Unlock any affected enqueues in emap
- for (tdl_itr i=tdl.begin(); i<tdl.end(); i++)
- {
- if (i->_enq_flag) // enq op - decrement enqueue count
- _rcvdat._enq_cnt_list[i->_pfid]--;
- else if (_emap.is_enqueued(i->_drid, true)) // deq op - unlock enq record
- {
- int16_t ret = _emap.unlock(i->_drid);
- if (ret < enq_map::EMAP_OK) // fail
- {
- // enq_map::unlock()'s only error is enq_map::EMAP_RID_NOT_FOUND
- std::ostringstream oss;
- oss << std::hex << "_emap.unlock(): drid=0x\"" << i->_drid;
- throw jexception(jerrno::JERR_MAP_NOTFOUND, oss.str(), "jcntl", "rcvr_janalyze");
- }
- }
- }
- }
- }
- }
-
- // Check for file full condition
- _rcvdat._lffull = _rcvdat._eo == _emptyFilePoolPtr->fileSize_kib() * 1024;
- }
-}
-
-
-bool
-jcntl::rcvr_get_next_record(uint16_t& fid, std::ifstream* ifsp)
-{
- std::size_t cum_size_read = 0;
- void* xidp = 0;
- rec_hdr_t h;
-
- bool hdr_ok = false;
- std::streampos file_pos;
- while (!hdr_ok)
- {
- if (!ifsp->is_open())
- {
- if (!jfile_cycle(fid, ifsp, true))
- return false;
- }
- file_pos = ifsp->tellg();
- ifsp->read((char*)&h, sizeof(rec_hdr_t));
- if (ifsp->gcount() == sizeof(rec_hdr_t))
- hdr_ok = true;
- else
- {
- if (!jfile_cycle(fid, ifsp, true))
- return false;
- }
- }
-
- switch(h._magic)
- {
- case QLS_ENQ_MAGIC:
- {
- std::cout << " e" << std::flush;
- enq_rec er;
- uint16_t start_fid = fid; // fid may increment in decode() if record folds over file boundary
- if (!decode(er, fid, ifsp, cum_size_read, h, file_pos))
- return false;
- if (!er.is_transient()) // Ignore transient msgs
- {
- _rcvdat._enq_cnt_list[start_fid]++;
- if (er.xid_size())
- {
- er.get_xid(&xidp);
- assert(xidp != 0);
- std::string xid((char*)xidp, er.xid_size());
- _tmap.insert_txn_data(xid, txn_data(h._rid, 0, start_fid, true));
- if (_tmap.set_aio_compl(xid, h._rid) < txn_map::TMAP_OK) // fail - xid or rid not found
- {
- std::ostringstream oss;
- oss << std::hex << "_tmap.set_aio_compl: txn_enq xid=\"" << xid << "\" rid=0x" << h._rid;
- throw jexception(jerrno::JERR_MAP_NOTFOUND, oss.str(), "jcntl", "rcvr_get_next_record");
- }
- std::free(xidp);
- }
- else
- {
- if (_emap.insert_pfid(h._rid, start_fid, file_pos) < enq_map::EMAP_OK) // fail
- {
- // The only error code emap::insert_pfid() returns is enq_map::EMAP_DUP_RID.
- std::ostringstream oss;
- oss << std::hex << "rid=0x" << h._rid << " _pfid=0x" << start_fid;
- throw jexception(jerrno::JERR_MAP_DUPLICATE, oss.str(), "jcntl", "rcvr_get_next_record");
- }
- }
- }
- }
- break;
- case QLS_DEQ_MAGIC:
- {
- std::cout << " d" << std::flush;
- deq_rec dr;
- uint16_t start_fid = fid; // fid may increment in decode() if record folds over file boundary
- if (!decode(dr, fid, ifsp, cum_size_read, h, file_pos))
- return false;
- if (dr.xid_size())
- {
- // If the enqueue is part of a pending txn, it will not yet be in emap
- _emap.lock(dr.deq_rid()); // ignore not found error
- dr.get_xid(&xidp);
- assert(xidp != 0);
- std::string xid((char*)xidp, dr.xid_size());
- _tmap.insert_txn_data(xid, txn_data(dr.rid(), dr.deq_rid(), start_fid, false,
- dr.is_txn_coml_commit()));
- if (_tmap.set_aio_compl(xid, dr.rid()) < txn_map::TMAP_OK) // fail - xid or rid not found
- {
- std::ostringstream oss;
- oss << std::hex << "_tmap.set_aio_compl: txn_deq xid=\"" << xid << "\" rid=0x" << dr.rid();
- throw jexception(jerrno::JERR_MAP_NOTFOUND, oss.str(), "jcntl", "rcvr_get_next_record");
- }
- std::free(xidp);
- }
- else
- {
- int16_t enq_fid;
- if (_emap.get_remove_pfid(dr.deq_rid(), enq_fid, true) == enq_map::EMAP_OK) // ignore not found error
- _rcvdat._enq_cnt_list[enq_fid]--;
- }
- }
- break;
- case QLS_TXA_MAGIC:
- {
- std::cout << " a" << std::flush;
- txn_rec ar;
- if (!decode(ar, fid, ifsp, cum_size_read, h, file_pos))
- return false;
- // Delete this txn from tmap, unlock any locked records in emap
- ar.get_xid(&xidp);
- assert(xidp != 0);
- std::string xid((char*)xidp, ar.xid_size());
- txn_data_list tdl = _tmap.get_remove_tdata_list(xid); // tdl will be empty if xid not found
- for (tdl_itr itr = tdl.begin(); itr != tdl.end(); itr++)
- {
- if (itr->_enq_flag)
- _rcvdat._enq_cnt_list[itr->_pfid]--;
- else
- _emap.unlock(itr->_drid); // ignore not found error
- }
- std::free(xidp);
- }
- break;
- case QLS_TXC_MAGIC:
- {
- std::cout << " t" << std::flush;
- txn_rec cr;
- if (!decode(cr, fid, ifsp, cum_size_read, h, file_pos))
- return false;
- // Delete this txn from tmap, process records into emap
- cr.get_xid(&xidp);
- assert(xidp != 0);
- std::string xid((char*)xidp, cr.xid_size());
- txn_data_list tdl = _tmap.get_remove_tdata_list(xid); // tdl will be empty if xid not found
- for (tdl_itr itr = tdl.begin(); itr != tdl.end(); itr++)
- {
- if (itr->_enq_flag) // txn enqueue
- {
- if (_emap.insert_pfid(itr->_rid, itr->_pfid, file_pos) < enq_map::EMAP_OK) // fail
- {
- // The only error code emap::insert_pfid() returns is enq_map::EMAP_DUP_RID.
- std::ostringstream oss;
- oss << std::hex << "rid=0x" << itr->_rid << " _pfid=0x" << itr->_pfid;
- throw jexception(jerrno::JERR_MAP_DUPLICATE, oss.str(), "jcntl", "rcvr_get_next_record");
- }
- }
- else // txn dequeue
- {
- int16_t enq_fid;
- if (_emap.get_remove_pfid(itr->_drid, enq_fid, true) == enq_map::EMAP_OK) // ignore not found error
- _rcvdat._enq_cnt_list[enq_fid]--;
- }
- }
- std::free(xidp);
- }
- break;
- case QLS_EMPTY_MAGIC:
- {
- std::cout << " x" << std::flush;
- uint32_t rec_dblks = jrec::size_dblks(sizeof(rec_hdr_t));
- ifsp->ignore(rec_dblks * JRNL_DBLK_SIZE_BYTES - sizeof(rec_hdr_t));
- assert(!ifsp->fail() && !ifsp->bad());
- if (!jfile_cycle(fid, ifsp, false))
- return false;
- }
- break;
- case 0:
- std::cout << " 0" << std::endl << std::flush;
- check_journal_alignment(fid, file_pos);
- return false;
- default:
- std::cout << " ?" << std::endl << std::flush;
- // Stop as this is the overwrite boundary.
- check_journal_alignment(fid, file_pos);
- return false;
- }
- return true;
-}
-
-
-bool
-jcntl::decode(jrec& rec, uint16_t& fid, std::ifstream* ifsp, std::size_t& cum_size_read,
- rec_hdr_t& h, std::streampos& file_offs)
-{
- uint16_t start_fid = fid;
- std::streampos start_file_offs = file_offs;
-
- if (_rcvdat._h_rid == 0)
- _rcvdat._h_rid = h._rid;
- else if (h._rid - _rcvdat._h_rid < 0x8000000000000000ULL) // RFC 1982 comparison for unsigned 64-bit
- _rcvdat._h_rid = h._rid;
-
- bool done = false;
- while (!done)
- {
- try { done = rec.rcv_decode(h, ifsp, cum_size_read); }
- catch (const jexception& e)
- {
-// TODO - review this logic and tidy up how rd._lfid is assigned. See new jinf.get_end_file() fn.
-// Original
-// if (e.err_code() != jerrno::JERR_JREC_BADRECTAIL ||
-// fid != (rd._ffid ? rd._ffid - 1 : _num_jfiles - 1)) throw;
-// Tried this, but did not work
-// if (e.err_code() != jerrno::JERR_JREC_BADRECTAIL || h._magic != 0) throw;
- check_journal_alignment(start_fid, start_file_offs);
-// rd._lfid = start_fid;
- return false;
- }
- if (!done && !jfile_cycle(fid, ifsp, /*lowi, rd,*/ false))
- {
- check_journal_alignment(start_fid, start_file_offs);
- return false;
- }
- }
- return true;
-}
-
-
-bool
-jcntl::jfile_cycle(uint16_t& fid, std::ifstream* ifsp, const bool jump_fro)
-{
- if (ifsp->is_open())
- {
- if (ifsp->eof() || !ifsp->good())
- {
- ifsp->clear();
- _rcvdat._eo = ifsp->tellg(); // remember file offset before closing
- assert(_rcvdat._eo != std::numeric_limits<std::size_t>::max()); // Check for error code -1
- ifsp->close();
- if (++fid == _rcvdat._jfl.size()) // used up all known journal files
- return false;
- }
- }
- if (!ifsp->is_open())
- {
- ifsp->clear(); // clear eof flag, req'd for older versions of c++
- ifsp->open(_rcvdat._jfl[fid].c_str(), std::ios_base::in | std::ios_base::binary);
- if (!ifsp->good())
- throw jexception(jerrno::JERR__FILEIO, _rcvdat._jfl[fid], "jcntl", "jfile_cycle");
-
- // Read file header
- std::cout << " F" << fid << std::flush;
- file_hdr_t fhdr;
- ifsp->read((char*)&fhdr, sizeof(fhdr));
- assert(ifsp->good());
- if (fhdr._rhdr._magic == QLS_FILE_MAGIC)
- {
- if (!_rcvdat._fro)
- _rcvdat._fro = fhdr._fro;
- std::streamoff foffs = jump_fro ? fhdr._fro : JRNL_SBLK_SIZE_BYTES;
- ifsp->seekg(foffs);
- }
- else
- {
- ifsp->close();
- if (fid == 0) {
- _rcvdat._jempty = true;
- }
- return false;
- }
- }
- return true;
-}
-
-
-void
-jcntl::check_journal_alignment(const uint16_t fid, std::streampos& file_pos/*, rcvdat& rd*/)
-{
- unsigned sblk_offs = file_pos % JRNL_SBLK_SIZE_BYTES;
- if (sblk_offs)
- {
- {
- std::ostringstream oss;
- oss << std::hex << "Bad record alignment found at fid=0x" << fid;
- oss << " offs=0x" << file_pos << " (likely journal overwrite boundary); " << std::dec;
- oss << (JRNL_SBLK_SIZE_DBLKS - (sblk_offs/JRNL_DBLK_SIZE_BYTES)) << " filler record(s) required.";
- this->log(LOG_WARN, _jid, oss.str());
- }
- const uint32_t xmagic = QLS_EMPTY_MAGIC;
- std::ostringstream oss;
- oss << _jdir.dirname() << "/" /*<< _base_filename*/ << "."; // TODO linear journal name
- oss << std::hex << std::setfill('0') << std::setw(4) << fid << QLS_JRNL_FILE_EXTENSION;
- std::ofstream ofsp(oss.str().c_str(),
- std::ios_base::in | std::ios_base::out | std::ios_base::binary);
- if (!ofsp.good())
- throw jexception(jerrno::JERR__FILEIO, oss.str(), "jcntl", "check_journal_alignment");
- ofsp.seekp(file_pos);
- void* buff = std::malloc(JRNL_DBLK_SIZE_BYTES);
- assert(buff != 0);
- std::memcpy(buff, (const void*)&xmagic, sizeof(xmagic));
- // Normally, RHM_CLEAN must be set before these fills are done, but this is a recover
- // situation (i.e. performance is not an issue), and it makes the location of the write
- // clear should inspection of the file be required.
- std::memset((char*)buff + sizeof(xmagic), QLS_CLEAN_CHAR, JRNL_DBLK_SIZE_BYTES - sizeof(xmagic));
-
- while (file_pos % JRNL_SBLK_SIZE_BYTES)
- {
- ofsp.write((const char*)buff, JRNL_DBLK_SIZE_BYTES);
- assert(!ofsp.fail());
- std::ostringstream oss;
- oss << std::hex << "Recover phase write: Wrote filler record: fid=0x" << fid << " offs=0x" << file_pos;
- this->log(LOG_NOTICE, _jid, oss.str());
- file_pos = ofsp.tellp();
- }
- ofsp.close();
- std::free(buff);
- this->log(LOG_INFO, _jid, "Bad record alignment fixed.");
- }
- _rcvdat._eo = file_pos;
-}
-
}}
Modified: qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/jcntl.h
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/jcntl.h?rev=1534383&r1=1534382&r2=1534383&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/jcntl.h (original)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/jcntl.h Mon Oct 21 21:26:10 2013
@@ -32,16 +32,11 @@ namespace qls_jrnl
#include <cstddef>
#include <deque>
#include <qpid/linearstore/jrnl/LinearFileController.h>
-#include <qpid/linearstore/jrnl/JournalLog.h>
#include "qpid/linearstore/jrnl/jdir.h"
-//#include "qpid/linearstore/jrnl/fcntl.h"
-//#include "qpid/linearstore/jrnl/lpmgr.h"
-#include "qpid/linearstore/jrnl/rcvdat.h"
+#include "qpid/linearstore/jrnl/RecoveryManager.h"
#include "qpid/linearstore/jrnl/slock.h"
#include "qpid/linearstore/jrnl/smutex.h"
-#include "qpid/linearstore/jrnl/rmgr.h"
#include "qpid/linearstore/jrnl/wmgr.h"
-//#include "qpid/linearstore/jrnl/wrfc.h"
namespace qpid
{
@@ -60,7 +55,7 @@ namespace qls_jrnl
* which is used per data block written to the journal, and is used to track its status through
* the AIO enqueue, read and dequeue process.
*/
- class jcntl : public JournalLog
+ class jcntl
{
protected:
/**
@@ -82,16 +77,6 @@ namespace qls_jrnl
jdir _jdir;
/**
- * \brief Base filename
- *
- * This string contains the base filename used for the journal files. The filenames will
- * start with this base, and have various sections added to it to derive the final file names
- * that will be written to disk. No file separator characters should be included here, but
- * all other legal filename characters are valid.
- */
-// std::string _base_filename;
-
- /**
* \brief Initialized flag
*
* This flag starts out set to false, is set to true once this object has been initialized,
@@ -120,21 +105,14 @@ namespace qls_jrnl
*/
bool _readonly_flag;
- /**
- * \brief If set, calls stop() if the jouranl write pointer overruns dequeue low water
- * marker. If not set, then attempts to write will throw exceptions until the journal
- * file low water marker moves to the next journal file.
- */
- //bool _autostop; ///< Autostop flag - stops journal when overrun occurs
-
// Journal control structures
+ JournalLog& _jrnl_log; ///< Ref to Journal Log instance
LinearFileController _linearFileController; ///< Linear File Controller
EmptyFilePool* _emptyFilePoolPtr; ///< Pointer to Empty File Pool for this queue
enq_map _emap; ///< Enqueue map for low water mark management
txn_map _tmap; ///< Transaction map open transactions
- //rmgr _rmgr; ///< Read page manager which manages AIO
wmgr _wmgr; ///< Write page manager which manages AIO
- rcvdat _rcvdat; ///< Recovery data used for recovery
+ RecoveryManager _recoveryManager; ///< Recovery data used for recovery
smutex _wr_mutex; ///< Mutex for journal writes
public:
@@ -150,7 +128,9 @@ namespace qls_jrnl
* \param jdir The directory which will contain the journal files.
* \param base_filename The string which will be used to start all journal filenames.
*/
- jcntl(const std::string& jid, const std::string& jdir/*, const std::string& base_filename*/);
+ jcntl(const std::string& jid,
+ const std::string& jdir,
+ JournalLog& jrnl_log);
/**
* \brief Destructor.
@@ -158,6 +138,7 @@ namespace qls_jrnl
virtual ~jcntl();
inline const std::string& id() const { return _jid; }
+
inline const std::string& jrnl_dir() const { return _jdir.dirname(); }
/**
@@ -191,9 +172,10 @@ namespace qls_jrnl
*
* \exception TODO
*/
- void initialize(/*const uint16_t num_jfiles, const bool auto_expand, const uint16_t ae_max_jfiles,
- const uint32_t jfsize_sblks,*/EmptyFilePool* efpp, const uint16_t wcache_num_pages, const uint32_t wcache_pgsize_sblks,
- aio_callback* const cbp);
+ void initialize(EmptyFilePool* efpp,
+ const uint16_t wcache_num_pages,
+ const uint32_t wcache_pgsize_sblks,
+ aio_callback* const cbp);
/**
* /brief Initialize journal by recovering state from previously written journal.
@@ -294,11 +276,15 @@ namespace qls_jrnl
*
* \exception TODO
*/
- iores enqueue_data_record(const void* const data_buff, const std::size_t tot_data_len,
- const std::size_t this_data_len, data_tok* dtokp, const bool transient = false);
-
- iores enqueue_extern_data_record(const std::size_t tot_data_len, data_tok* dtokp,
- const bool transient = false);
+ iores enqueue_data_record(const void* const data_buff,
+ const std::size_t tot_data_len,
+ const std::size_t this_data_len,
+ data_tok* dtokp,
+ const bool transient = false);
+
+ iores enqueue_extern_data_record(const std::size_t tot_data_len,
+ data_tok* dtokp,
+ const bool transient = false);
/**
* \brief Enqueue data.
@@ -313,84 +299,17 @@ namespace qls_jrnl
*
* \exception TODO
*/
- iores enqueue_txn_data_record(const void* const data_buff, const std::size_t tot_data_len,
- const std::size_t this_data_len, data_tok* dtokp, const std::string& xid,
- const bool transient = false);
- iores enqueue_extern_txn_data_record(const std::size_t tot_data_len, data_tok* dtokp,
- const std::string& xid, const bool transient = false);
-
- /* TODO
- **
- * \brief Retrieve details of next record to be read without consuming the record.
- *
- * Retrieve information about current read record. A pointer to the data is returned, along
- * with the data size and available data size. Data is considered "available" when the AIO
- * operations to fill page-cache pages from disk have returned, and is ready for consumption.
- *
- * If <i>dsize_avail</i> < <i>dsize</i>, then not all of the data is available or part of
- * the data is in non-contiguous memory, and a subsequent call will update both the pointer
- * and <i>dsize_avail</i> if more pages have returned from AIO.
- *
- * The <i>dsize_avail</i> parameter will return the amount of data from this record that is
- * available in the page cache as contiguous memory, even if it spans page cache boundaries.
- * However, if a record spans the end of the page cache and continues at the beginning, even
- * if both parts are ready for consumption, then this must be divided into at least two
- * get_data_record() operations, as the data is contained in at least two non-contiguous
- * segments of the page cache.
- *
- * Once all the available data for a record is exposed, it can not be read again using
- * this function. It must be consumed prior to getting the next record. This can be done by
- * calling discard_data_record() or read_data_record(). However, if parameter
- * <i>auto_discard</i> is set to <b><i>true</i></b>, then this record will be automatically
- * consumed when the entire record has become available without having to explicitly call
- * discard_next_data_record() or read_data_record().
- *
- * If the current record is an open transactional record, then it cannot be read until it is
- * committed. If it is aborted, it can never be read. Under this condition, get_data_record()
- * will return RHM_IORES_TXPENDING, the data pointer will be set to NULL and all data
- * lengths will be set to 0.
- *
- * Example: Read a record of 30k. Assume a read page cache of 10 pages of size 10k starting
- * at address base_ptr (page0 = base_ptr, page1 = page_ptr+10k, etc.). The first 15k of
- * the record falls at the end of the page cache, the remaining 15k folded to the beginning.
- * The current page (page 8) containing 5k is available, the remaining pages which contain
- * this record are pending AIO return:
- * <pre>
- * call dsize
- * no. dsize avail data ptr Return Comment
- * ----+-----+-----+------------+--------+--------------------------------------------------
- * 1 30k 5k base_ptr+85k SUCCESS Initial call, read first 5k
- * 2 30k 0k base_ptr+90k AIO_WAIT AIO still pending; no further pages avail
- * 3 30k 10k base_ptr+90k SUCCESS AIO now returned; now read till end of page cache
- * 4 30k 15k base_ptr SUCCESS data_ptr now pointing to start of page cache
- * </pre>
- *
- * \param rid Reference that returns the record ID (rid)
- * \param dsize Reference that returns the total data size of the record data .
- * \param dsize_avail Reference that returns the amount of the data that is available for
- * consumption.
- * \param data Pointer to data pointer which will point to the first byte of the next record
- * data.
- * \param auto_discard If <b><i>true</i></b>, automatically discard the record being read if
- * the entire record is available (i.e. dsize == dsize_avail). Otherwise
- * discard_next_data_record() must be explicitly called.
- *
- * \exception TODO
- *
- // *** NOT YET IMPLEMENTED ***
- iores get_data_record(const uint64_t& rid, const std::size_t& dsize,
- const std::size_t& dsize_avail, const void** const data, bool auto_discard = false);
- */
-
- /* TODO
- **
- * \brief Discard (skip) next record to be read without reading or retrieving it.
- *
- * \exception TODO
- *
- // *** NOT YET IMPLEMENTED ***
- iores discard_data_record(data_tok* const dtokp);
- */
+ iores enqueue_txn_data_record(const void* const data_buff,
+ const std::size_t tot_data_len,
+ const std::size_t this_data_len,
+ data_tok* dtokp,
+ const std::string& xid,
+ const bool transient = false);
+
+ iores enqueue_extern_txn_data_record(const std::size_t tot_data_len,
+ data_tok* dtokp,
+ const std::string& xid,
+ const bool transient = false);
/**
* \brief Reads data from the journal. It is the responsibility of the reader to free
@@ -434,9 +353,14 @@ namespace qls_jrnl
*
* \exception TODO
*/
- iores read_data_record(void** const datapp, std::size_t& dsize, void** const xidpp,
- std::size_t& xidsize, bool& transient, bool& external, data_tok* const dtokp,
- bool ignore_pending_txns = false);
+ iores read_data_record(void** const datapp,
+ std::size_t& dsize,
+ void** const xidpp,
+ std::size_t& xidsize,
+ bool& transient,
+ bool& external,
+ data_tok* const dtokp,
+ bool ignore_pending_txns = false);
/**
* \brief Dequeues (marks as no longer needed) data record in journal.
@@ -455,7 +379,8 @@ namespace qls_jrnl
*
* \exception TODO
*/
- iores dequeue_data_record(data_tok* const dtokp, const bool txn_coml_commit = false);
+ iores dequeue_data_record(data_tok* const dtokp,
+ const bool txn_coml_commit = false);
/**
* \brief Dequeues (marks as no longer needed) data record in journal.
@@ -476,7 +401,9 @@ namespace qls_jrnl
*
* \exception TODO
*/
- iores dequeue_txn_data_record(data_tok* const dtokp, const std::string& xid, const bool txn_coml_commit = false);
+ iores dequeue_txn_data_record(data_tok* const dtokp,
+ const std::string& xid,
+ const bool txn_coml_commit = false);
/**
* \brief Abort the transaction for all records enqueued or dequeued with the matching xid.
@@ -491,7 +418,8 @@ namespace qls_jrnl
*
* \exception TODO
*/
- iores txn_abort(data_tok* const dtokp, const std::string& xid);
+ iores txn_abort(data_tok* const dtokp,
+ const std::string& xid);
/**
* \brief Commit the transaction for all records enqueued or dequeued with the matching xid.
@@ -506,7 +434,8 @@ namespace qls_jrnl
*
* \exception TODO
*/
- iores txn_commit(data_tok* const dtokp, const std::string& xid);
+ iores txn_commit(data_tok* const dtokp,
+ const std::string& xid);
/**
* \brief Check whether all the enqueue records for the given xid have reached disk.
@@ -527,15 +456,6 @@ namespace qls_jrnl
int32_t get_wr_events(timespec* const timeout);
/**
- * \brief Forces a check for returned AIO read events.
- *
- * Forces a check for returned AIO read events. This is normally performed by read_data()
- * operations, but if these operations cease, then this call needs to be made to force the
- * processing of any outstanding AIO operations.
- */
-// int32_t get_rd_events(timespec* const timeout);
-
- /**
* \brief Stop the journal from accepting any further requests to read or write data.
*
* This operation is used to stop the journal. This is the normal mechanism for bringing the
@@ -555,27 +475,14 @@ namespace qls_jrnl
*/
iores flush(const bool block_till_aio_cmpl = false);
- inline uint32_t get_enq_cnt() const { return _emap.size(); } // TODO: Thread safe?
+ inline uint32_t get_enq_cnt() const { return _emap.size(); } // TODO: _emap: Thread safe?
inline uint32_t get_wr_aio_evt_rem() const { slock l(_wr_mutex); return _wmgr.get_aio_evt_rem(); }
-// inline uint32_t get_rd_aio_evt_rem() const { return _rmgr.get_aio_evt_rem(); }
-
- inline uint32_t get_wr_outstanding_aio_dblks() const;
- /*{ return _wrfc.aio_outstanding_dblks(); }*/
+ uint32_t get_wr_outstanding_aio_dblks() const;
-// inline uint32_t get_wr_outstanding_aio_dblks(uint16_t lfid) const;
-// { return _lpmgr.get_fcntlp(lfid)->wr_aio_outstanding_dblks(); }
+ uint32_t get_rd_outstanding_aio_dblks() const;
- inline uint32_t get_rd_outstanding_aio_dblks() const;
-// { return _rrfc.aio_outstanding_dblks(); }
-
-// inline uint32_t get_rd_outstanding_aio_dblks(uint16_t lfid) const;
-// { return _lpmgr.get_fcntlp(lfid)->rd_aio_outstanding_dblks(); }
-
-// inline uint16_t get_rd_fid() const { return _rrfc.index(); }
-// inline uint16_t get_wr_fid() const { return _wrfc.index(); }
-// uint16_t get_earliest_fid();
LinearFileController& getLinearFileControllerRef();
/**
@@ -583,13 +490,20 @@ namespace qls_jrnl
* false if the rid is transactionally enqueued and is not committed, or if it is
* locked (i.e. transactionally dequeued, but the dequeue has not been committed).
*/
- inline bool is_enqueued(const uint64_t rid, bool ignore_lock = false)
- { return _emap.is_enqueued(rid, ignore_lock); }
- inline bool is_locked(const uint64_t rid)
- { if (_emap.is_enqueued(rid, true) < enq_map::EMAP_OK) return false; return _emap.is_locked(rid) == enq_map::EMAP_TRUE; }
+ inline bool is_enqueued(const uint64_t rid, bool ignore_lock = false) { return _emap.is_enqueued(rid, ignore_lock); }
+
+ inline bool is_locked(const uint64_t rid) {
+ if (_emap.is_enqueued(rid, true) < enq_map::EMAP_OK)
+ return false;
+ return _emap.is_locked(rid) == enq_map::EMAP_TRUE;
+ }
+
inline void enq_rid_list(std::vector<uint64_t>& rids) { _emap.rid_list(rids); }
+
inline void enq_xid_list(std::vector<std::string>& xids) { _tmap.xid_list(xids); }
+
inline uint32_t get_open_txn_cnt() const { return _tmap.size(); }
+
// TODO Make this a const, but txn_map must support const first.
inline txn_map& get_txn_map() { return _tmap; }
@@ -626,41 +540,10 @@ namespace qls_jrnl
*/
inline const std::string& dirname() const { return _jdir.dirname(); }
- /**
- * \brief Get the journal base filename.
- *
- * Get the journal base filename as set during initialization. This is the prefix used in all
- * journal files of this instance. Note that if more than one instance of the journal shares
- * the same directory, their base filenames <b>MUST</b> be different or else the instances
- * will overwrite one another.
- */
-// inline const std::string& base_filename() const { return _base_filename; }
-
-// inline uint16_t num_jfiles() const; { return _lpmgr.num_jfiles(); }
-
-// inline fcntl* get_fcntlp(const uint16_t lfid) const { return _lpmgr.get_fcntlp(lfid); }
-
-// inline uint32_t jfsize_sblks() const { return _jfsize_sblks; }
-
- // Logging
-// virtual void log(log_level_t level, const std::string& log_stmt) const;
-// virtual void log(log_level_t level, const char* const log_stmt) const;
-
- // FIXME these are _rmgr to _wmgr interactions, remove when _rmgr contains ref to _wmgr:
- //void chk_wr_frot();
- inline uint32_t unflushed_dblks() { return _wmgr.unflushed_dblks(); }
- void fhdr_wr_sync(const uint16_t lid);
- inline uint32_t wr_subm_cnt_dblks(const uint16_t lfid) const; /*{ return _lpmgr.get_fcntlp(lfid)->wr_subm_cnt_dblks(); }*/
-
// Management instrumentation callbacks
inline virtual void instr_incr_outstanding_aio_cnt() {}
inline virtual void instr_decr_outstanding_aio_cnt() {}
- /**
- * /brief Static function for creating new fcntl objects for use with obj_arr.
- */
-// static fcntl* new_fcntl(jcntl* const jcp, const uint16_t lid, const uint16_t fid, const rcvdat* const rdp);
-
protected:
static bool _init;
static bool init_statics();
@@ -676,11 +559,6 @@ namespace qls_jrnl
void check_rstatus(const char* fn_name) const;
/**
- * \brief Write info file <basefilename>.jinf to disk
- */
-// void write_infofile() const;
-
- /**
* \brief Call that blocks while waiting for all outstanding AIOs to complete
*/
void aio_cmpl_wait();
@@ -690,27 +568,6 @@ namespace qls_jrnl
* AIO wait conditions to clear.
*/
bool handle_aio_wait(const iores res, iores& resout, const data_tok* dtp);
-
- /**
- * \brief Analyze journal for recovery.
- */
- static void rcvr_read_jfile(const std::string& jfn, ::file_hdr_t* fh, std::string& queueName);
-
- void rcvr_analyze_fhdrs(EmptyFilePoolManager* efpmp);
-
- void rcvr_janalyze(const std::vector<std::string>* prep_txn_list_ptr, EmptyFilePoolManager* efpmp);
-
- bool rcvr_get_next_record(uint16_t& fid, std::ifstream* ifsp/*, bool& lowi, rcvdat& rd*/);
-
- bool decode(jrec& rec, uint16_t& fid, std::ifstream* ifsp, std::size_t& cum_size_read,
- rec_hdr_t& h, /*bool& lowi, rcvdat& rd,*/ std::streampos& rec_offset);
-
- bool jfile_cycle(uint16_t& fid, std::ifstream* ifsp, /*bool& lowi, rcvdat& rd,*/ const bool jump_fro);
-
- //bool check_owi(const uint16_t fid, rec_hdr_t& h, bool& lowi, rcvdat& rd,
- // std::streampos& read_pos);
-
- void check_journal_alignment(const uint16_t fid, std::streampos& rec_offset/*, rcvdat& rd*/);
};
}}
Modified: qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/jerrno.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/jerrno.cpp?rev=1534383&r1=1534382&r2=1534383&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/jerrno.cpp (original)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/jerrno.cpp Mon Oct 21 21:26:10 2013
@@ -50,8 +50,6 @@ const uint32_t jerrno::JERR_JCNTL_READON
const uint32_t jerrno::JERR_JCNTL_AIOCMPLWAIT = 0x0202;
const uint32_t jerrno::JERR_JCNTL_UNKNOWNMAGIC = 0x0203;
const uint32_t jerrno::JERR_JCNTL_NOTRECOVERED = 0x0204;
-const uint32_t jerrno::JERR_JCNTL_OPENRD = 0x0205;
-const uint32_t jerrno::JERR_JCNTL_READ = 0x0206;
const uint32_t jerrno::JERR_JCNTL_ENQSTATE = 0x0207;
const uint32_t jerrno::JERR_JCNTL_INVALIDENQHDR = 0x0208;
@@ -90,6 +88,11 @@ const uint32_t jerrno::JERR_WMGR_DEQDISC
const uint32_t jerrno::JERR_WMGR_DEQRIDNOTENQ = 0x0805;
const uint32_t jerrno::JERR_WMGR_BADFH = 0x0806;
+// class RecoveryManager
+const uint32_t jerrno::JERR_RCVM_OPENRD = 0x0900;
+const uint32_t jerrno::JERR_RCVM_READ = 0x0901;
+const uint32_t jerrno::JERR_RCVM_WRITE = 0x0902;
+
//// class rmgr
//const uint32_t jerrno::JERR_RMGR_UNKNOWNMAGIC = 0x0900;
//const uint32_t jerrno::JERR_RMGR_RIDMISMATCH = 0x0901;
@@ -143,8 +146,6 @@ jerrno::__init()
_err_map[JERR_JCNTL_AIOCMPLWAIT] = "JERR_JCNTL_AIOCMPLWAIT: Timeout waiting for AIOs to complete.";
_err_map[JERR_JCNTL_UNKNOWNMAGIC] = "JERR_JCNTL_UNKNOWNMAGIC: Found record with unknown magic.";
_err_map[JERR_JCNTL_NOTRECOVERED] = "JERR_JCNTL_NOTRECOVERED: Operation requires recover() to be run first.";
- _err_map[JERR_JCNTL_OPENRD] = "JERR_JCNTL_OPENRD: Unable to open file for write";
- _err_map[JERR_JCNTL_READ] = "JERR_JCNTL_READ: Read error: no or insufficient data to read";
_err_map[JERR_JCNTL_ENQSTATE] = "JERR_JCNTL_ENQSTATE: Read error: Record not in ENQ state";
_err_map[JERR_JCNTL_INVALIDENQHDR] = "JERR_JCNTL_INVALIDENQHDR: Invalid ENQ header";
@@ -182,6 +183,10 @@ jerrno::__init()
_err_map[JERR_WMGR_DEQRIDNOTENQ] = "JERR_WMGR_DEQRIDNOTENQ: Dequeue rid is not enqueued.";
_err_map[JERR_WMGR_BADFH] = "JERR_WMGR_BADFH: Bad file handle.";
+ // class RecoveryManager
+ _err_map[JERR_RCVM_OPENRD] = "JERR_JCNTL_OPENRD: Unable to open file for write";
+ _err_map[JERR_RCVM_READ] = "JERR_JCNTL_READ: Read error: no or insufficient data to read";
+ _err_map[JERR_RCVM_WRITE] = "JERR_RCVM_WRITE: Write error";
// // class rmgr
// _err_map[JERR_RMGR_UNKNOWNMAGIC] = "JERR_RMGR_UNKNOWNMAGIC: Found record with unknown magic.";
// _err_map[JERR_RMGR_RIDMISMATCH] = "JERR_RMGR_RIDMISMATCH: RID mismatch between current record and dtok RID";
Modified: qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/jerrno.h
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/jerrno.h?rev=1534383&r1=1534382&r2=1534383&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/jerrno.h (original)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/jerrno.h Mon Oct 21 21:26:10 2013
@@ -69,8 +69,6 @@ namespace qls_jrnl
static const uint32_t JERR_JCNTL_AIOCMPLWAIT; ///< Timeout waiting for AIOs to complete
static const uint32_t JERR_JCNTL_UNKNOWNMAGIC; ///< Found record with unknown magic
static const uint32_t JERR_JCNTL_NOTRECOVERED; ///< Req' recover() to be called first
- static const uint32_t JERR_JCNTL_OPENRD; ///< Unable to open file for read
- static const uint32_t JERR_JCNTL_READ; ///< Read error: no or insufficient data to read
static const uint32_t JERR_JCNTL_ENQSTATE; ///< Read error: Record not in ENQ state
static const uint32_t JERR_JCNTL_INVALIDENQHDR;///< Invalid ENQ header
@@ -108,6 +106,11 @@ namespace qls_jrnl
static const uint32_t JERR_WMGR_DEQRIDNOTENQ; ///< Deq. rid not enqueued
static const uint32_t JERR_WMGR_BADFH; ///< Bad file handle
+ // class RecoveryManager
+ static const uint32_t JERR_RCVM_OPENRD; ///< Unable to open file for read
+ static const uint32_t JERR_RCVM_READ; ///< Read error: no or insufficient data to read
+ static const uint32_t JERR_RCVM_WRITE; ///< Write error
+
// // class rmgr
// static const uint32_t JERR_RMGR_UNKNOWNMAGIC; ///< Found record with unknown magic
// static const uint32_t JERR_RMGR_RIDMISMATCH; ///< RID mismatch between rec and dtok
Modified: qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/jrec.h
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/jrec.h?rev=1534383&r1=1534382&r2=1534383&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/jrec.h (original)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/jrec.h Mon Oct 21 21:26:10 2013
@@ -148,9 +148,9 @@ namespace qls_jrnl
virtual std::size_t rec_size() const = 0;
inline virtual uint32_t rec_size_dblks() const { return size_dblks(rec_size()); }
static inline uint32_t size_dblks(const std::size_t size)
- { return size_blks(size, JRNL_DBLK_SIZE_BYTES); }
+ { return size_blks(size, QLS_DBLK_SIZE_BYTES); }
static inline uint32_t size_sblks(const std::size_t size)
- { return size_blks(size, JRNL_SBLK_SIZE_BYTES); }
+ { return size_blks(size, QLS_SBLK_SIZE_BYTES); }
static inline uint32_t size_blks(const std::size_t size, const std::size_t blksize)
{ return (size + blksize - 1)/blksize; }
virtual uint64_t rid() const = 0;
Modified: qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/pmgr.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/pmgr.cpp?rev=1534383&r1=1534382&r2=1534383&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/pmgr.cpp (original)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/pmgr.cpp Mon Oct 21 21:26:10 2013
@@ -41,12 +41,11 @@ pmgr::page_cb::page_cb(uint16_t index):
_wdblks(0),
_rdblks(0),
_pdtokl(0),
-// _wfh(0),
-// _rfh(0),
_jfp(0),
_pbuff(0)
{}
+// TODO: almost identical to pmgr::page_state_str() below - resolve
const char*
pmgr::page_cb::state_str() const
{
@@ -58,14 +57,12 @@ pmgr::page_cb::state_str() const
return "IN_USE";
case AIO_PENDING:
return "AIO_PENDING";
- case AIO_COMPLETE:
- return "AIO_COMPLETE";
}
return "<unknown>";
}
// static
-const uint32_t pmgr::_sblkSizeBytes = JRNL_SBLK_SIZE_BYTES;
+const uint32_t pmgr::_sblkSizeBytes = QLS_SBLK_SIZE_BYTES;
pmgr::pmgr(jcntl* jc, enq_map& emap, txn_map& tmap):
_cache_pgsize_sblks(0),
@@ -109,11 +106,11 @@ pmgr::initialize(aio_callback* const cbp
// 1. Allocate page memory (as a single block)
std::size_t cache_pgsize = _cache_num_pages * _cache_pgsize_sblks * _sblkSizeBytes;
- if (::posix_memalign(&_page_base_ptr, QLS_AIO_ALIGN_BOUNDARY, cache_pgsize))
+ if (::posix_memalign(&_page_base_ptr, QLS_AIO_ALIGN_BOUNDARY_BYTES, cache_pgsize))
{
clean();
std::ostringstream oss;
- oss << "posix_memalign(): alignment=" << QLS_AIO_ALIGN_BOUNDARY << " size=" << cache_pgsize;
+ oss << "posix_memalign(): alignment=" << QLS_AIO_ALIGN_BOUNDARY_BYTES << " size=" << cache_pgsize;
oss << FORMAT_SYSERR(errno);
throw jexception(jerrno::JERR__MALLOC, oss.str(), "pmgr", "initialize");
}
@@ -186,6 +183,7 @@ pmgr::clean()
_aio_event_arr = 0;
}
+// TODO: almost identical to pmgr::page_cb::state_str() above - resolve
const char*
pmgr::page_state_str(page_state ps)
{
@@ -197,8 +195,6 @@ pmgr::page_state_str(page_state ps)
return "IN_USE";
case AIO_PENDING:
return "AIO_PENDING";
- case AIO_COMPLETE:
- return "AIO_COMPLETE";
}
return "<page_state unknown>";
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org