You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kp...@apache.org on 2013/11/14 21:39:34 UTC
svn commit: r1542066 [3/3] - in /qpid/trunk/qpid/cpp/src/qpid: legacystore/
linearstore/ linearstore/jrnl/ linearstore/jrnl/utils/
Modified: qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/jrec.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/jrec.h?rev=1542066&r1=1542065&r2=1542066&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/jrec.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/jrec.h Thu Nov 14 20:39:32 2013
@@ -19,152 +19,105 @@
*
*/
-#ifndef QPID_LEGACYSTORE_JRNL_JREC_H
-#define QPID_LEGACYSTORE_JRNL_JREC_H
-
-namespace qpid
-{
-namespace qls_jrnl
-{
-class jrec;
-}}
+#ifndef QPID_LINEARSTORE_JOURNAL_JREC_H
+#define QPID_LINEARSTORE_JOURNAL_JREC_H
#include <cstddef>
#include <fstream>
#include "qpid/linearstore/jrnl/jcfg.h"
-#include "qpid/linearstore/jrnl/utils/rec_hdr.h"
-#include "qpid/linearstore/jrnl/utils/rec_tail.h"
+#include <stdint.h>
#include <string>
-namespace qpid
-{
-namespace qls_jrnl
+struct rec_hdr_t;
+struct rec_tail_t;
+
+namespace qpid {
+namespace linearstore {
+namespace journal {
+
+/**
+* \class jrec
+* \brief Abstract class for all file jrecords, both data and log. This class establishes
+* the common data format and structure for these jrecords.
+*/
+class jrec
{
+public:
+ jrec();
+ virtual ~jrec();
/**
- * \class jrec
- * \brief Abstract class for all file jrecords, both data and log. This class establishes
- * the common data format and structure for these jrecords.
+ * \brief Encode this instance of jrec into the write buffer at the disk-block-aligned
+ * pointer wptr starting at position rec_offs_dblks in the encoded record to a
+ * maximum size of max_size_dblks.
+ *
+ * This call encodes the content of the data contianed in this instance of jrec into a
+ * disk-softblock-aligned (defined by JRNL_SBLK_SIZE) buffer pointed to by parameter
+ * wptr. No more than paramter max_size_dblks data-blocks may be written to the buffer.
+ * The parameter rec_offs_dblks is the offset in data-blocks within the fully encoded
+ * data block this instance represents at which to start encoding.
+ *
+ * Encoding entails writing the record header (struct enq_hdr), the data and the record tail
+ * (struct enq_tail). The record must be data-block-aligned (defined by JRNL_DBLK_SIZE),
+ * thus any remaining space in the final data-block is ignored; the returned value is the
+ * number of data-blocks consumed from the page by the encode action. Provided the initial
+ * alignment requirements are met, records may be of arbitrary size and may span multiple
+ * data-blocks, disk-blocks and/or pages.
+ *
+ * Since the record size in data-blocks is known, the general usage pattern is to call
+ * encode() as many times as is needed to fully encode the data. Each call to encode()
+ * will encode as much of the record as it can to what remains of the current page cache,
+ * and will return the number of data-blocks actually encoded.
+ *
+ * <b>Example:</b> Assume that record r1 was previously written to page 0, and that this
+ * is an instance representing record r2. Being larger than the page size ps, r2 would span
+ * multiple pages as follows:
+ * <pre>
+ * |<---ps--->|
+ * +----------+----------+----------+----...
+ * | |r2a| r2b | r2c | |
+ * |<-r1-><----------r2----------> |
+ * +----------+----------+----------+----...
+ * page: p0 p1 p2
+ * </pre>
+ * Encoding record r2 will require multiple calls to encode; one for each page which
+ * is involved. Record r2 is divided logically into sections r2a, r2b and r2c at the
+ * points where the page boundaries intersect with the record. Assuming a page size
+ * of ps, the page boundary pointers are represented by their names p0, p1... and the
+ * sizes of the record segments are represented by their names r1, r2a, r2b..., the calls
+ * should be as follows:
+ * <pre>
+ * encode(p0+r1, 0, ps-r1); (returns r2a data-blocks)
+ * encode(p1, r2a, ps); (returns r2b data-blocks which equals ps)
+ * encode(p2, r2a+r2b, ps); (returns r2c data-blocks)
+ * </pre>
+ *
+ * \param wptr Data-block-aligned pointer to position in page buffer where encoding is to
+ * take place.
+ * \param rec_offs_dblks Offset in data-blocks within record from which to start encoding.
+ * \param max_size_dblks Maximum number of data-blocks to write to pointer wptr.
+ * \returns Number of data-blocks encoded.
*/
- class jrec
- {
- public:
- jrec();
- virtual ~jrec();
-
- /**
- * \brief Encode this instance of jrec into the write buffer at the disk-block-aligned
- * pointer wptr starting at position rec_offs_dblks in the encoded record to a
- * maximum size of max_size_dblks.
- *
- * This call encodes the content of the data contianed in this instance of jrec into a
- * disk-softblock-aligned (defined by JRNL_SBLK_SIZE) buffer pointed to by parameter
- * wptr. No more than paramter max_size_dblks data-blocks may be written to the buffer.
- * The parameter rec_offs_dblks is the offset in data-blocks within the fully encoded
- * data block this instance represents at which to start encoding.
- *
- * Encoding entails writing the record header (struct enq_hdr), the data and the record tail
- * (struct enq_tail). The record must be data-block-aligned (defined by JRNL_DBLK_SIZE),
- * thus any remaining space in the final data-block is ignored; the returned value is the
- * number of data-blocks consumed from the page by the encode action. Provided the initial
- * alignment requirements are met, records may be of arbitrary size and may span multiple
- * data-blocks, disk-blocks and/or pages.
- *
- * Since the record size in data-blocks is known, the general usage pattern is to call
- * encode() as many times as is needed to fully encode the data. Each call to encode()
- * will encode as much of the record as it can to what remains of the current page cache,
- * and will return the number of data-blocks actually encoded.
- *
- * <b>Example:</b> Assume that record r1 was previously written to page 0, and that this
- * is an instance representing record r2. Being larger than the page size ps, r2 would span
- * multiple pages as follows:
- * <pre>
- * |<---ps--->|
- * +----------+----------+----------+----...
- * | |r2a| r2b | r2c | |
- * |<-r1-><----------r2----------> |
- * +----------+----------+----------+----...
- * page: p0 p1 p2
- * </pre>
- * Encoding record r2 will require multiple calls to encode; one for each page which
- * is involved. Record r2 is divided logically into sections r2a, r2b and r2c at the
- * points where the page boundaries intersect with the record. Assuming a page size
- * of ps, the page boundary pointers are represented by their names p0, p1... and the
- * sizes of the record segments are represented by their names r1, r2a, r2b..., the calls
- * should be as follows:
- * <pre>
- * encode(p0+r1, 0, ps-r1); (returns r2a data-blocks)
- * encode(p1, r2a, ps); (returns r2b data-blocks which equals ps)
- * encode(p2, r2a+r2b, ps); (returns r2c data-blocks)
- * </pre>
- *
- * \param wptr Data-block-aligned pointer to position in page buffer where encoding is to
- * take place.
- * \param rec_offs_dblks Offset in data-blocks within record from which to start encoding.
- * \param max_size_dblks Maximum number of data-blocks to write to pointer wptr.
- * \returns Number of data-blocks encoded.
- */
- virtual uint32_t encode(void* wptr, uint32_t rec_offs_dblks,
- uint32_t max_size_dblks) = 0;
-
- /**
- * \brief Decode into this instance of jrec from the read buffer at the disk-block-aligned
- * pointer rptr starting at position jrec_offs_dblks in the encoded record to a
- * maximum size of max_size_blks.
- *
- * This call decodes a record in the page buffer pointed to by the data-block-aligned
- * (defined by JRNL_DBLK_SIZE) parameter rptr into this instance of jrec. No more than
- * paramter max_size_dblks data-blocks may be read from the buffer. The parameter
- * jrec_offs_dblks is the offset in data-blocks within the encoded record at which to start
- * decoding.
- *
- * Decoding entails reading the record header, the data and the tail. The record is
- * data-block-aligned (defined by JRNL_DBLK_SIZE); the returned value is the number of
- * data-blocks read from the buffer by the decode action. As the record data size is only
- * known once the header is read, the number of calls required to complete reading the
- * record will depend on the vlaues within this instance which are set when the
- * header is decoded.
- *
- * A non-zero value for jrec_offs_dblks implies that this is not the first call to
- * decode and the record data will be appended at this offset.
- *
- * \param h Reference to instance of struct hdr, already read from page buffer and used
- * to determine record type
- * \param rptr Data-block-aligned pointer to position in page buffer where decoding is to
- * begin.
- * \param rec_offs_dblks Offset within record from which to start appending the decoded
- * record.
- * \param max_size_dblks Maximum number of data-blocks to read from pointer rptr.
- * \returns Number of data-blocks read (consumed).
- */
- virtual uint32_t decode(rec_hdr_t& h, void* rptr, uint32_t rec_offs_dblks,
- uint32_t max_size_dblks) = 0;
-
- virtual bool rcv_decode(rec_hdr_t h, std::ifstream* ifsp, std::size_t& rec_offs) = 0;
-
- virtual std::string& str(std::string& str) const = 0;
- virtual std::size_t data_size() const = 0;
- virtual std::size_t xid_size() const = 0;
- virtual std::size_t rec_size() const = 0;
- inline virtual uint32_t rec_size_dblks() const { return size_dblks(rec_size()); }
- static inline uint32_t size_dblks(const std::size_t size)
- { return size_blks(size, QLS_DBLK_SIZE_BYTES); }
- static inline uint32_t size_sblks(const std::size_t size)
- { return size_blks(size, QLS_SBLK_SIZE_BYTES); }
- static inline uint32_t size_blks(const std::size_t size, const std::size_t blksize)
- { return (size + blksize - 1)/blksize; }
- virtual uint64_t rid() const = 0;
-
- protected:
- virtual void chk_hdr() const = 0;
- virtual void chk_hdr(uint64_t rid) const = 0;
- virtual void chk_tail() const = 0;
- static void chk_hdr(const rec_hdr_t& hdr);
- static void chk_rid(const rec_hdr_t& hdr, uint64_t rid);
- static void chk_tail(const rec_tail_t& tail, const rec_hdr_t& hdr);
- virtual void clean() = 0;
- }; // class jrec
+ virtual uint32_t encode(void* wptr, uint32_t rec_offs_dblks, uint32_t max_size_dblks) = 0;
+ virtual bool decode(::rec_hdr_t& h, std::ifstream* ifsp, std::size_t& rec_offs) = 0;
+
+ virtual std::string& str(std::string& str) const = 0;
+ virtual std::size_t data_size() const = 0;
+ virtual std::size_t xid_size() const = 0;
+ virtual std::size_t rec_size() const = 0;
+ inline virtual uint32_t rec_size_dblks() const { return size_dblks(rec_size()); }
+ static inline uint32_t size_dblks(const std::size_t size)
+ { return size_blks(size, QLS_DBLK_SIZE_BYTES); }
+ static inline uint32_t size_sblks(const std::size_t size)
+ { return size_blks(size, QLS_SBLK_SIZE_BYTES); }
+ static inline uint32_t size_blks(const std::size_t size, const std::size_t blksize)
+ { return (size + blksize - 1)/blksize; }
+ virtual uint64_t rid() const = 0;
+
+protected:
+ virtual void clean() = 0;
+};
-}}
+}}}
-#endif // ifndef QPID_LEGACYSTORE_JRNL_JREC_H
+#endif // ifndef QPID_LINEARSTORE_JRNL_JREC_H
Modified: qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/pmgr.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/pmgr.cpp?rev=1542066&r1=1542065&r2=1542066&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/pmgr.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/pmgr.cpp Thu Nov 14 20:39:32 2013
@@ -29,10 +29,9 @@
#include "qpid/linearstore/jrnl/jerrno.h"
#include <sstream>
-namespace qpid
-{
-namespace qls_jrnl
-{
+namespace qpid {
+namespace linearstore {
+namespace journal {
pmgr::page_cb::page_cb(uint16_t index):
_index(index),
@@ -199,4 +198,4 @@ pmgr::page_state_str(page_state ps)
return "<page_state unknown>";
}
-}}
+}}}
Modified: qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/pmgr.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/pmgr.h?rev=1542066&r1=1542065&r2=1542066&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/pmgr.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/pmgr.h Thu Nov 14 20:39:32 2013
@@ -19,16 +19,15 @@
*
*/
-#ifndef QPID_LEGACYSTORE_JRNL_PMGR_H
-#define QPID_LEGACYSTORE_JRNL_PMGR_H
+#ifndef QPID_LINEARSTORE_JOURNAL_PMGR_H
+#define QPID_LINEARSTORE_JOURNAL_PMGR_H
-namespace qpid
-{
-namespace qls_jrnl
-{
+namespace qpid {
+namespace linearstore {
+namespace journal {
class pmgr;
class jcntl;
-}}
+}}}
#include <deque>
#include "qpid/linearstore/jrnl/aio.h"
@@ -40,11 +39,11 @@ namespace qls_jrnl
#include "qpid/linearstore/jrnl/txn_map.h"
#include "qpid/linearstore/jrnl/txn_rec.h"
-namespace qpid
-{
-namespace qls_jrnl
-{
-class JournalFile;
+namespace qpid {
+namespace linearstore {
+namespace journal {
+
+ class JournalFile;
/**
* \brief Abstract class for managing either read or write page cache of arbitrary size and
@@ -122,6 +121,6 @@ class JournalFile;
virtual void clean();
};
-}}
+}}}
-#endif // ifndef QPID_LEGACYSTORE_JRNL_PMGR_H
+#endif // ifndef QPID_LINEARSTORE_JOURNAL_PMGR_H
Modified: qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/slock.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/slock.h?rev=1542066&r1=1542065&r2=1542066&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/slock.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/slock.h Thu Nov 14 20:39:32 2013
@@ -19,17 +19,16 @@
*
*/
-#ifndef QPID_LEGACYSTORE_JRNL_SLOCK_H
-#define QPID_LEGACYSTORE_JRNL_SLOCK_H
+#ifndef QPID_LINEARSTORE_JOURNAL_SLOCK_H
+#define QPID_LINEARSTORE_JOURNAL_SLOCK_H
#include "qpid/linearstore/jrnl/jexception.h"
#include "qpid/linearstore/jrnl/smutex.h"
#include <pthread.h>
-namespace qpid
-{
-namespace qls_jrnl
-{
+namespace qpid {
+namespace linearstore {
+namespace journal {
// Ultra-simple scoped lock class, auto-releases mutex when it goes out-of-scope
class slock
@@ -68,6 +67,6 @@ namespace qls_jrnl
inline bool locked() const { return _locked; }
};
-}}
+}}}
-#endif // ifndef QPID_LEGACYSTORE_JRNL_SLOCK_H
+#endif // ifndef QPID_LINEARSTORE_JOURNAL_SLOCK_H
Modified: qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/smutex.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/smutex.h?rev=1542066&r1=1542065&r2=1542066&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/smutex.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/smutex.h Thu Nov 14 20:39:32 2013
@@ -19,16 +19,15 @@
*
*/
-#ifndef QPID_LEGACYSTORE_JRNL_SMUTEX_H
-#define QPID_LEGACYSTORE_JRNL_SMUTEX_H
+#ifndef QPID_LINEARSTORE_JOURNAL_SMUTEX_H
+#define QPID_LINEARSTORE_JOURNAL_SMUTEX_H
#include "qpid/linearstore/jrnl/jexception.h"
#include <pthread.h>
-namespace qpid
-{
-namespace qls_jrnl
-{
+namespace qpid {
+namespace linearstore {
+namespace journal {
// Ultra-simple scoped mutex class that allows a posix mutex to be initialized and destroyed with error checks
class smutex
@@ -47,6 +46,6 @@ namespace qls_jrnl
inline pthread_mutex_t* get() const { return &_m; }
};
-}}
+}}}
-#endif // ifndef QPID_LEGACYSTORE_JRNL_SMUTEX_H
+#endif // ifndef QPID_LINEARSTORE_JOURNAL_SMUTEX_H
Modified: qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/time_ns.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/time_ns.cpp?rev=1542066&r1=1542065&r2=1542066&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/time_ns.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/time_ns.cpp Thu Nov 14 20:39:32 2013
@@ -23,10 +23,9 @@
#include <sstream>
-namespace qpid
-{
-namespace qls_jrnl
-{
+namespace qpid {
+namespace linearstore {
+namespace journal {
const std::string
time_ns::str(int precision) const
@@ -39,5 +38,4 @@ time_ns::str(int precision) const
return oss.str();
}
-
-}}
+}}}
Modified: qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/time_ns.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/time_ns.h?rev=1542066&r1=1542065&r2=1542066&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/time_ns.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/time_ns.h Thu Nov 14 20:39:32 2013
@@ -19,17 +19,16 @@
*
*/
-#ifndef QPID_LEGACYSTORE_JRNL_TIME_NS_H
-#define QPID_LEGACYSTORE_JRNL_TIME_NS_H
+#ifndef QPID_LINEARSTORE_JOURNAL_TIME_NS_H
+#define QPID_LINEARSTORE_JOURNAL_TIME_NS_H
#include <cerrno>
#include <ctime>
#include <string>
-namespace qpid
-{
-namespace qls_jrnl
-{
+namespace qpid {
+namespace linearstore {
+namespace journal {
struct time_ns : public timespec
{
@@ -88,6 +87,6 @@ struct time_ns : public timespec
{ if(tv_sec == rhs.tv_sec) return tv_nsec <= rhs.tv_nsec; return tv_sec <= rhs.tv_sec; }
};
-}}
+}}}
-#endif // ifndef QPID_LEGACYSTORE_JRNL_TIME_NS_H
+#endif // ifndef QPID_LINEARSTORE_JOURNAL_TIME_NS_H
Modified: qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/txn_map.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/txn_map.cpp?rev=1542066&r1=1542065&r2=1542066&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/txn_map.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/txn_map.cpp Thu Nov 14 20:39:32 2013
@@ -27,10 +27,9 @@
#include "qpid/linearstore/jrnl/slock.h"
#include <sstream>
-namespace qpid
-{
-namespace qls_jrnl
-{
+namespace qpid {
+namespace linearstore {
+namespace journal {
// return/error codes
int16_t txn_map::TMAP_RID_NOT_FOUND = -2;
@@ -231,4 +230,4 @@ txn_map::xid_list(std::vector<std::strin
}
}
-}}
+}}}
Modified: qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/txn_map.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/txn_map.h?rev=1542066&r1=1542065&r2=1542066&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/txn_map.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/txn_map.h Thu Nov 14 20:39:32 2013
@@ -19,15 +19,14 @@
*
*/
-#ifndef QPID_LEGACYSTORE_JRNL_TXN_MAP_H
-#define QPID_LEGACYSTORE_JRNL_TXN_MAP_H
+#ifndef QPID_LINEARSTORE_JOURNAL_TXN_MAP_H
+#define QPID_LINEARSTORE_JOURNAL_TXN_MAP_H
-namespace qpid
-{
-namespace qls_jrnl
-{
+namespace qpid {
+namespace linearstore {
+namespace journal {
class txn_map;
-}}
+}}}
#include "qpid/linearstore/jrnl/smutex.h"
#include <map>
@@ -35,10 +34,9 @@ namespace qls_jrnl
#include <string>
#include <vector>
-namespace qpid
-{
-namespace qls_jrnl
-{
+namespace qpid {
+namespace linearstore {
+namespace journal {
/**
* \struct txn_data_struct
@@ -141,6 +139,6 @@ namespace qls_jrnl
const txn_data_list get_tdata_list_nolock(const std::string& xid);
};
-}}
+}}}
-#endif // ifndef QPID_LEGACYSTORE_JRNL_TXN_MAP_H
+#endif // ifndef QPID_LINEARSTORE_JOURNAL_TXN_MAP_H
Modified: qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/txn_rec.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/txn_rec.cpp?rev=1542066&r1=1542065&r2=1542066&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/txn_rec.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/txn_rec.cpp Thu Nov 14 20:39:32 2013
@@ -22,38 +22,20 @@
#include "qpid/linearstore/jrnl/txn_rec.h"
#include <cassert>
-#include <cerrno>
-#include <cstdlib>
#include <cstring>
#include <iomanip>
-#include "qpid/linearstore/jrnl/jerrno.h"
#include "qpid/linearstore/jrnl/jexception.h"
-#include <sstream>
-namespace qpid
-{
-namespace qls_jrnl
-{
+namespace qpid {
+namespace linearstore {
+namespace journal {
txn_rec::txn_rec():
-// _txn_hdr(),
_xidp(0),
_buff(0)
-// _txn_tail()
{
- ::txn_hdr_init(&_txn_hdr, 0, QLS_JRNL_VERSION, 0, 0, 0);
- ::rec_tail_init(&_txn_tail, 0, 0, 0);
-}
-
-txn_rec::txn_rec(const uint32_t magic, const uint64_t rid, const void* const xidp,
- const std::size_t xidlen/*, const bool owi*/):
-// _txn_hdr(magic, RHM_JDAT_VERSION, rid, xidlen, owi),
- _xidp(xidp),
- _buff(0)
-// _txn_tail(_txn_hdr)
-{
- ::txn_hdr_init(&_txn_hdr, magic, QLS_JRNL_VERSION, 0, rid, xidlen);
- ::rec_tail_copy(&_txn_tail, &_txn_hdr._rhdr, 0);
+ ::txn_hdr_init(&_txn_hdr, 0, QLS_JRNL_VERSION, 0, 0, 0, 0);
+ ::rec_tail_init(&_txn_tail, 0, 0, 0, 0);
}
txn_rec::~txn_rec()
@@ -62,28 +44,17 @@ txn_rec::~txn_rec()
}
void
-txn_rec::reset(const uint32_t magic)
+txn_rec::reset(const bool commitFlag, const uint64_t serial, const uint64_t rid, const void* const xidp,
+ const std::size_t xidlen)
{
- _txn_hdr._rhdr._magic = magic;
- _txn_hdr._rhdr._rid = 0;
- _txn_hdr._xidsize = 0;
- _xidp = 0;
- _buff = 0;
- _txn_tail._xmagic = ~magic;
- _txn_tail._rid = 0;
-}
-
-void
-txn_rec::reset(const uint32_t magic, const uint64_t rid, const void* const xidp,
- const std::size_t xidlen/*, const bool owi*/)
-{
- _txn_hdr._rhdr._magic = magic;
+ _txn_hdr._rhdr._magic = commitFlag ? QLS_TXC_MAGIC : QLS_TXA_MAGIC;
+ _txn_hdr._rhdr._serial = serial;
_txn_hdr._rhdr._rid = rid;
-// _txn_hdr.set_owi(owi);
_txn_hdr._xidsize = xidlen;
_xidp = xidp;
_buff = 0;
- _txn_tail._xmagic = ~magic;
+ _txn_tail._xmagic = ~_txn_hdr._rhdr._magic;
+ _txn_tail._serial = serial;
_txn_tail._rid = rid;
}
@@ -195,132 +166,16 @@ txn_rec::encode(void* wptr, uint32_t rec
return size_dblks(wr_cnt);
}
-uint32_t
-txn_rec::decode(rec_hdr_t& h, void* rptr, uint32_t rec_offs_dblks, uint32_t max_size_dblks)
-{
- assert(rptr != 0);
- assert(max_size_dblks > 0);
-
- std::size_t rd_cnt = 0;
- if (rec_offs_dblks) // Continuation of record on new page
- {
- const uint32_t hdr_xid_dblks = size_dblks(sizeof(txn_hdr_t) + _txn_hdr._xidsize);
- const uint32_t hdr_xid_tail_dblks = size_dblks(sizeof(txn_hdr_t) + _txn_hdr._xidsize + sizeof(rec_tail_t));
- const std::size_t rec_offs = rec_offs_dblks * QLS_DBLK_SIZE_BYTES;
-
- if (hdr_xid_tail_dblks - rec_offs_dblks <= max_size_dblks)
- {
- // Remainder of xid fits within this page
- if (rec_offs - sizeof(txn_hdr_t) < _txn_hdr._xidsize)
- {
- // Part of xid still outstanding, copy remainder of xid and tail
- const std::size_t xid_offs = rec_offs - sizeof(txn_hdr_t);
- const std::size_t xid_rem = _txn_hdr._xidsize - xid_offs;
- std::memcpy((char*)_buff + xid_offs, rptr, xid_rem);
- rd_cnt = xid_rem;
- std::memcpy((void*)&_txn_tail, ((char*)rptr + rd_cnt), sizeof(_txn_tail));
- chk_tail();
- rd_cnt += sizeof(_txn_tail);
- }
- else
- {
- // Tail or part of tail only outstanding, complete tail
- const std::size_t tail_offs = rec_offs - sizeof(txn_hdr_t) - _txn_hdr._xidsize;
- const std::size_t tail_rem = sizeof(rec_tail_t) - tail_offs;
- std::memcpy((char*)&_txn_tail + tail_offs, rptr, tail_rem);
- chk_tail();
- rd_cnt = tail_rem;
- }
- }
- else if (hdr_xid_dblks - rec_offs_dblks <= max_size_dblks)
- {
- // Remainder of xid fits within this page, tail split
- const std::size_t xid_offs = rec_offs - sizeof(txn_hdr_t);
- const std::size_t xid_rem = _txn_hdr._xidsize - xid_offs;
- std::memcpy((char*)_buff + xid_offs, rptr, xid_rem);
- rd_cnt += xid_rem;
- const std::size_t tail_rem = (max_size_dblks * QLS_DBLK_SIZE_BYTES) - rd_cnt;
- if (tail_rem)
- {
- std::memcpy((void*)&_txn_tail, ((char*)rptr + xid_rem), tail_rem);
- rd_cnt += tail_rem;
- }
- }
- else
- {
- // Remainder of xid split
- const std::size_t xid_cp_size = (max_size_dblks * QLS_DBLK_SIZE_BYTES);
- std::memcpy((char*)_buff + rec_offs - sizeof(txn_hdr_t), rptr, xid_cp_size);
- rd_cnt += xid_cp_size;
- }
- }
- else // Start of record
- {
- // Get and check header
- //_txn_hdr.hdr_copy(h);
- ::rec_hdr_copy(&_txn_hdr._rhdr, &h);
- rd_cnt = sizeof(rec_hdr_t);
-#if defined(JRNL_BIG_ENDIAN) && defined(JRNL_32_BIT)
- rd_cnt += sizeof(uint32_t); // Filler 0
-#endif
- _txn_hdr._xidsize = *(std::size_t*)((char*)rptr + rd_cnt);
- rd_cnt = sizeof(txn_hdr_t);
- chk_hdr();
- _buff = std::malloc(_txn_hdr._xidsize);
- MALLOC_CHK(_buff, "_buff", "txn_rec", "decode");
- const uint32_t hdr_xid_dblks = size_dblks(sizeof(txn_hdr_t) + _txn_hdr._xidsize);
- const uint32_t hdr_xid_tail_dblks = size_dblks(sizeof(txn_hdr_t) + _txn_hdr._xidsize +
- sizeof(rec_tail_t));
-
- // Check if record (header + xid + tail) fits within this page, we can check the
- // tail before the expense of copying data to memory
- if (hdr_xid_tail_dblks <= max_size_dblks)
- {
- // Entire header, xid and tail fits within this page
- std::memcpy(_buff, (char*)rptr + rd_cnt, _txn_hdr._xidsize);
- rd_cnt += _txn_hdr._xidsize;
- std::memcpy((void*)&_txn_tail, (char*)rptr + rd_cnt, sizeof(_txn_tail));
- rd_cnt += sizeof(_txn_tail);
- chk_tail();
- }
- else if (hdr_xid_dblks <= max_size_dblks)
- {
- // Entire header and xid fit within this page, tail split
- std::memcpy(_buff, (char*)rptr + rd_cnt, _txn_hdr._xidsize);
- rd_cnt += _txn_hdr._xidsize;
- const std::size_t tail_rem = (max_size_dblks * QLS_DBLK_SIZE_BYTES) - rd_cnt;
- if (tail_rem)
- {
- std::memcpy((void*)&_txn_tail, (char*)rptr + rd_cnt, tail_rem);
- rd_cnt += tail_rem;
- }
- }
- else
- {
- // Header fits within this page, xid split
- const std::size_t xid_cp_size = (max_size_dblks * QLS_DBLK_SIZE_BYTES) - rd_cnt;
- std::memcpy(_buff, (char*)rptr + rd_cnt, xid_cp_size);
- rd_cnt += xid_cp_size;
- }
- }
- return size_dblks(rd_cnt);
-}
-
bool
-txn_rec::rcv_decode(rec_hdr_t h, std::ifstream* ifsp, std::size_t& rec_offs)
+txn_rec::decode(::rec_hdr_t& h, std::ifstream* ifsp, std::size_t& rec_offs)
{
+ uint32_t checksum = 0UL; // TODO: Add checksum math
if (rec_offs == 0)
{
// Read header, allocate for xid
//_txn_hdr.hdr_copy(h);
::rec_hdr_copy(&_txn_hdr._rhdr, &h);
-#if defined(JRNL_BIG_ENDIAN) && defined(JRNL_32_BIT)
- ifsp->ignore(sizeof(uint32_t)); // _filler0
-#endif
- ifsp->read((char*)&_txn_hdr._xidsize, sizeof(std::size_t));
-#if defined(JRNL_LITTLE_ENDIAN) && defined(JRNL_32_BIT)
- ifsp->ignore(sizeof(uint32_t)); // _filler0
-#endif
+ ifsp->read((char*)&_txn_hdr._xidsize, sizeof(_txn_hdr._xidsize));
rec_offs = sizeof(txn_hdr_t);
_buff = std::malloc(_txn_hdr._xidsize);
MALLOC_CHK(_buff, "_buff", "txn_rec", "rcv_decode");
@@ -358,8 +213,22 @@ txn_rec::rcv_decode(rec_hdr_t h, std::if
}
}
ifsp->ignore(rec_size_dblks() * QLS_DBLK_SIZE_BYTES - rec_size());
- chk_tail(); // Throws if tail invalid or record incomplete
+ if (::rec_tail_check(&_txn_tail, &_txn_hdr._rhdr, 0)) { // TODO: add checksum
+ throw jexception(jerrno::JERR_JREC_BADRECTAIL); // TODO: complete exception detail
+ }
assert(!ifsp->fail() && !ifsp->bad());
+ int res = ::rec_tail_check(&_txn_tail, &_txn_hdr._rhdr, checksum);
+ if (res != 0) {
+ std::stringstream oss;
+ switch (res) {
+ case 1: oss << std::hex << "Magic: expected 0x" << ~_txn_hdr._rhdr._magic << "; found 0x" << _txn_tail._xmagic; break;
+ case 2: oss << std::hex << "Serial: expected 0x" << _txn_hdr._rhdr._serial << "; found 0x" << _txn_tail._serial; break;
+ case 3: oss << std::hex << "Record Id: expected 0x" << _txn_hdr._rhdr._rid << "; found 0x" << _txn_tail._rid; break;
+ case 4: oss << std::hex << "Checksum: expected 0x" << checksum << "; found 0x" << _txn_tail._checksum; break;
+ default: oss << "Unknown error " << res;
+ }
+ throw jexception(jerrno::JERR_JREC_BADRECTAIL, oss.str(), "txn_rec", "decode"); // TODO: Don't throw exception, log info
+ }
return true;
}
@@ -403,38 +272,9 @@ txn_rec::rec_size() const
}
void
-txn_rec::chk_hdr() const
-{
- jrec::chk_hdr(_txn_hdr._rhdr);
- if (_txn_hdr._rhdr._magic != QLS_TXA_MAGIC && _txn_hdr._rhdr._magic != QLS_TXC_MAGIC)
- {
- std::ostringstream oss;
- oss << std::hex << std::setfill('0');
- oss << "dtx magic: rid=0x" << std::setw(16) << _txn_hdr._rhdr._rid;
- oss << ": expected=(0x" << std::setw(8) << QLS_TXA_MAGIC;
- oss << " or 0x" << QLS_TXC_MAGIC;
- oss << ") read=0x" << std::setw(2) << (int)_txn_hdr._rhdr._magic;
- throw jexception(jerrno::JERR_JREC_BADRECHDR, oss.str(), "txn_rec", "chk_hdr");
- }
-}
-
-void
-txn_rec::chk_hdr(uint64_t rid) const
-{
- chk_hdr();
- jrec::chk_rid(_txn_hdr._rhdr, rid);
-}
-
-void
-txn_rec::chk_tail() const
-{
- jrec::chk_tail(_txn_tail, _txn_hdr._rhdr);
-}
-
-void
txn_rec::clean()
{
// clean up allocated memory here
}
-}}
+}}}
Modified: qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/txn_rec.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/txn_rec.h?rev=1542066&r1=1542065&r2=1542066&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/txn_rec.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/txn_rec.h Thu Nov 14 20:39:32 2013
@@ -19,70 +19,49 @@
*
*/
-#ifndef QPID_LEGACYSTORE_JRNL_TXN_REC_H
-#define QPID_LEGACYSTORE_JRNL_TXN_REC_H
+#ifndef QPID_LINEARSTORE_JOURNAL_TXN_REC_H
+#define QPID_LINEARSTORE_JOURNAL_TXN_REC_H
-namespace qpid
-{
-namespace qls_jrnl
-{
-class txn_rec;
-}}
-
-#include <cstddef>
#include "qpid/linearstore/jrnl/jrec.h"
#include "qpid/linearstore/jrnl/utils/txn_hdr.h"
+#include "qpid/linearstore/jrnl/utils/rec_tail.h"
-namespace qpid
+namespace qpid {
+namespace linearstore {
+namespace journal {
+
+/**
+* \class txn_rec
+* \brief Class to handle a single journal commit or abort record.
+*/
+class txn_rec : public jrec
{
-namespace qls_jrnl
-{
-
- /**
- * \class txn_rec
- * \brief Class to handle a single journal DTX commit or abort record.
- */
- class txn_rec : public jrec
- {
- private:
- txn_hdr_t _txn_hdr; ///< transaction header
- const void* _xidp; ///< xid pointer for encoding (writing to disk)
- void* _buff; ///< Pointer to buffer to receive data read from disk
- rec_tail_t _txn_tail; ///< Record tail
-
- public:
- // constructor used for read operations and xid must have memory allocated
- txn_rec();
- // constructor used for write operations, where xid already exists
- txn_rec(const uint32_t magic, const uint64_t rid, const void* const xidp,
- const std::size_t xidlen/*, const bool owi*/);
- virtual ~txn_rec();
-
- // Prepare instance for use in reading data from journal
- void reset(const uint32_t magic);
- // Prepare instance for use in writing data to journal
- void reset(const uint32_t magic, const uint64_t rid, const void* const xidp,
- const std::size_t xidlen/*, const bool owi*/);
- uint32_t encode(void* wptr, uint32_t rec_offs_dblks, uint32_t max_size_dblks);
- uint32_t decode(rec_hdr_t& h, void* rptr, uint32_t rec_offs_dblks,
- uint32_t max_size_dblks);
- // Decode used for recover
- bool rcv_decode(rec_hdr_t h, std::ifstream* ifsp, std::size_t& rec_offs);
-
- std::size_t get_xid(void** const xidpp);
- std::string& str(std::string& str) const;
- inline std::size_t data_size() const { return 0; } // This record never carries data
- std::size_t xid_size() const;
- std::size_t rec_size() const;
- inline uint64_t rid() const { return _txn_hdr._rhdr._rid; }
-
- private:
- void chk_hdr() const;
- void chk_hdr(uint64_t rid) const;
- void chk_tail() const;
- virtual void clean();
- }; // class txn_rec
+private:
+ ::txn_hdr_t _txn_hdr; ///< Local instance of transaction header struct
+ const void* _xidp; ///< xid pointer for encoding (writing to disk)
+ void* _buff; ///< Pointer to buffer to receive data read from disk
+ ::rec_tail_t _txn_tail; ///< Local instance of enqueue tail struct
+
+public:
+ txn_rec();
+ virtual ~txn_rec();
+
+ void reset(const bool commitFlag, const uint64_t serial, const uint64_t rid, const void* const xidp,
+ const std::size_t xidlen);
+ uint32_t encode(void* wptr, uint32_t rec_offs_dblks, uint32_t max_size_dblks);
+ bool decode(::rec_hdr_t& h, std::ifstream* ifsp, std::size_t& rec_offs);
+
+ std::size_t get_xid(void** const xidpp);
+ std::string& str(std::string& str) const;
+ inline std::size_t data_size() const { return 0; } // This record never carries data
+ std::size_t xid_size() const;
+ std::size_t rec_size() const;
+ inline uint64_t rid() const { return _txn_hdr._rhdr._rid; }
+
+private:
+ virtual void clean();
+};
-}}
+}}}
-#endif // ifndef QPID_LEGACYSTORE_JRNL_TXN_REC_H
+#endif // ifndef QPID_LINEARSTORE_JOURNAL_TXN_REC_H
Modified: qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/utils/deq_hdr.c
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/utils/deq_hdr.c?rev=1542066&r1=1542065&r2=1542066&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/utils/deq_hdr.c (original)
+++ qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/utils/deq_hdr.c Thu Nov 14 20:39:32 2013
@@ -23,9 +23,9 @@
/*static const uint16_t DEQ_HDR_TXNCMPLCOMMIT_MASK = 0x10;*/
-void deq_hdr_init(deq_hdr_t* dest, const uint32_t magic, const uint16_t version, const uint16_t uflag, const uint64_t
- rid, const uint64_t deq_rid, const uint64_t xidsize) {
- rec_hdr_init(&dest->_rhdr, magic, version, uflag, rid);
+void deq_hdr_init(deq_hdr_t* dest, const uint32_t magic, const uint16_t version, const uint16_t uflag,
+ const uint64_t serial, const uint64_t rid, const uint64_t deq_rid, const uint64_t xidsize) {
+ rec_hdr_init(&dest->_rhdr, magic, version, uflag, serial, rid);
dest->_deq_rid = deq_rid;
dest->_xidsize = xidsize;
}
Modified: qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/utils/deq_hdr.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/utils/deq_hdr.h?rev=1542066&r1=1542065&r2=1542066&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/utils/deq_hdr.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/utils/deq_hdr.h Thu Nov 14 20:39:32 2013
@@ -1,5 +1,5 @@
-#ifndef QPID_LINEARSTORE_JRNL_UTILS_DEQ_HDR_H
-#define QPID_LINEARSTORE_JRNL_UTILS_DEQ_HDR_H
+#ifndef QPID_LINEARSTORE_JOURNAL_UTILS_DEQ_HDR_H
+#define QPID_LINEARSTORE_JOURNAL_UTILS_DEQ_HDR_H
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
@@ -42,12 +42,14 @@ extern "C"{
* The rid field below is the rid of the dequeue record itself; the deq-rid field is the rid of a
* previous enqueue record being dequeued by this record.
*
- * Record header info in binary format (32 bytes):
+ * Record header info in binary format (40 bytes):
* <pre>
* 0 7
* +---+---+---+---+---+---+---+---+ -+
* | magic | ver | flags | |
- * +---+---+---+---+---+---+---+---+ | struct rec_hdr_t
+ * +---+---+---+---+---+---+---+---+ |
+ * | serial | | struct rec_hdr_t
+ * +---+---+---+---+---+---+---+---+ |
* | rid | |
* +---+---+---+---+---+---+---+---+ -+
* | deq-rid |
@@ -67,7 +69,7 @@ typedef struct deq_hdr_t {
static const uint16_t DEQ_HDR_TXNCMPLCOMMIT_MASK = 0x10;
void deq_hdr_init(deq_hdr_t* dest, const uint32_t magic, const uint16_t version, const uint16_t uflag,
- const uint64_t rid, const uint64_t deq_rid, const uint64_t xidsize);
+ const uint64_t serial, const uint64_t rid, const uint64_t deq_rid, const uint64_t xidsize);
void deq_hdr_copy(deq_hdr_t* dest, const deq_hdr_t* src);
bool is_txn_coml_commit(const deq_hdr_t *dh);
void set_txn_coml_commit(deq_hdr_t *dh, const bool commit);
@@ -78,4 +80,4 @@ void set_txn_coml_commit(deq_hdr_t *dh,
}
#endif
-#endif /* ifndef QPID_LINEARSTORE_JRNL_UTILS_DEQ_HDR_H */
+#endif /* ifndef QPID_LINEARSTORE_JOURNAL_UTILS_DEQ_HDR_H */
Modified: qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/utils/enq_hdr.c
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/utils/enq_hdr.c?rev=1542066&r1=1542065&r2=1542066&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/utils/enq_hdr.c (original)
+++ qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/utils/enq_hdr.c Thu Nov 14 20:39:32 2013
@@ -25,8 +25,8 @@
//static const uint16_t ENQ_HDR_EXTERNAL_MASK = 0x20;
void enq_hdr_init(enq_hdr_t* dest, const uint32_t magic, const uint16_t version, const uint16_t uflag,
- const uint64_t rid, const uint64_t xidsize, const uint64_t dsize) {
- rec_hdr_init(&dest->_rhdr, magic, version, uflag, rid);
+ const uint64_t serial, const uint64_t rid, const uint64_t xidsize, const uint64_t dsize) {
+ rec_hdr_init(&dest->_rhdr, magic, version, uflag, serial, rid);
dest->_xidsize = xidsize;
dest->_dsize = dsize;
}
Modified: qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/utils/enq_hdr.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/utils/enq_hdr.h?rev=1542066&r1=1542065&r2=1542066&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/utils/enq_hdr.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/utils/enq_hdr.h Thu Nov 14 20:39:32 2013
@@ -1,5 +1,5 @@
-#ifndef QPID_LINEARSTORE_JRNL_UTILS_ENQ_HDR_H
-#define QPID_LINEARSTORE_JRNL_UTILS_ENQ_HDR_H
+#ifndef QPID_LINEARSTORE_JOURNAL_UTILS_ENQ_HDR_H
+#define QPID_LINEARSTORE_JOURNAL_UTILS_ENQ_HDR_H
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
@@ -38,12 +38,14 @@ extern "C"{
*
* This header precedes all enqueue data in journal files.
*
- * Record header info in binary format (32 bytes):
+ * Record header info in binary format (40 bytes):
* <pre>
* 0 7
* +---+---+---+---+---+---+---+---+ -+
* | magic | ver | flags | |
- * +---+---+---+---+---+---+---+---+ | struct rec_hdr_t
+ * +---+---+---+---+---+---+---+---+ |
+ * | serial | | struct rec_hdr_t
+ * +---+---+---+---+---+---+---+---+ |
* | rid | |
* +---+---+---+---+---+---+---+---+ -+
* | xidsize |
@@ -64,7 +66,7 @@ static const uint16_t ENQ_HDR_TRANSIENT_
static const uint16_t ENQ_HDR_EXTERNAL_MASK = 0x20;
void enq_hdr_init(enq_hdr_t* dest, const uint32_t magic, const uint16_t version, const uint16_t uflag,
- const uint64_t rid, const uint64_t xidsize, const uint64_t dsize);
+ const uint64_t serial, const uint64_t rid, const uint64_t xidsize, const uint64_t dsize);
void enq_hdr_copy(enq_hdr_t* dest, const enq_hdr_t* src);
bool is_enq_transient(const enq_hdr_t *eh);
void set_enq_transient(enq_hdr_t *eh, const bool transient);
@@ -78,4 +80,4 @@ bool validate_enq_hdr(enq_hdr_t *eh, con
}
#endif
-#endif /* ifndef QPID_LINEARSTORE_JRNL_UTILS_ENQ_HDR_H */
+#endif /* ifndef QPID_LINEARSTORE_JOURNAL_UTILS_ENQ_HDR_H */
Modified: qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/utils/file_hdr.c
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/utils/file_hdr.c?rev=1542066&r1=1542065&r2=1542066&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/utils/file_hdr.c (original)
+++ qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/utils/file_hdr.c Thu Nov 14 20:39:32 2013
@@ -23,8 +23,8 @@
#include <string.h>
void file_hdr_create(file_hdr_t* dest, const uint32_t magic, const uint16_t version, const uint16_t fhdr_size_sblks,
- const uint16_t efp_partition, const uint64_t file_size) {
- rec_hdr_init(&dest->_rhdr, magic, version, 0, 0);
+ const uint16_t efp_partition, const uint64_t file_size) {
+ rec_hdr_init(&dest->_rhdr, magic, version, 0, 0, 0);
dest->_fhdr_size_sblks = fhdr_size_sblks;
dest->_efp_partition = efp_partition;
dest->_reserved = 0;
@@ -36,10 +36,11 @@ void file_hdr_create(file_hdr_t* dest, c
dest->_queue_name_len = 0;
}
-int file_hdr_init(void* dest, const uint64_t dest_len, const uint16_t uflag, const uint64_t rid, const uint64_t fro,
- const uint64_t file_number, const uint16_t queue_name_len, const char* queue_name) {
+int file_hdr_init(void* dest, const uint64_t dest_len, const uint16_t uflag, const uint64_t serial, const uint64_t rid,
+ const uint64_t fro, const uint64_t file_number, const uint16_t queue_name_len, const char* queue_name) {
file_hdr_t* fhp = (file_hdr_t*)dest;
fhp->_rhdr._uflag = uflag;
+ fhp->_rhdr._serial = serial;
fhp->_rhdr._rid = rid;
fhp->_fro = fro;
fhp->_file_number = file_number;
@@ -54,6 +55,13 @@ int file_hdr_init(void* dest, const uint
return set_time_now(dest);
}
+int file_hdr_check(file_hdr_t* hdr, const uint32_t magic, const uint16_t version, const uint64_t data_size_kib) {
+ int res = rec_hdr_check_base(&hdr->_rhdr, magic, version);
+ if (res != 0) return 0;
+ if (hdr->_data_size_kib != data_size_kib) return 3;
+ return 0;
+}
+
void file_hdr_copy(file_hdr_t* dest, const file_hdr_t* src) {
rec_hdr_copy(&dest->_rhdr, &src->_rhdr);
dest->_fhdr_size_sblks = src->_fhdr_size_sblks; // Should this be copied?
@@ -84,6 +92,23 @@ int is_file_hdr_reset(file_hdr_t* target
target->_queue_name_len == 0;
}
+/*
+uint64_t random_64() {
+ int randomData = open("/dev/random", O_RDONLY);
+ if (randomData < 0) {
+ return 0ULL;
+ }
+ uint64_t randomNumber;
+ size_t size = sizeof(randomNumber);
+ ssize_t result = read(randomData, (char*)&randomNumber, size);
+ if (result != size) {
+ randomNumber = 0ULL;
+ }
+ close(randomData);
+ return randomNumber;
+}
+*/
+
int set_time_now(file_hdr_t *fh)
{
struct timespec ts;
Modified: qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/utils/file_hdr.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/utils/file_hdr.h?rev=1542066&r1=1542065&r2=1542066&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/utils/file_hdr.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/utils/file_hdr.h Thu Nov 14 20:39:32 2013
@@ -1,5 +1,5 @@
-#ifndef QPID_LINEARSTORE_JRNL_UTILS_FILE_HDR_H
-#define QPID_LINEARSTORE_JRNL_UTILS_FILE_HDR_H
+#ifndef QPID_LINEARSTORE_JOURNAL_UTILS_FILE_HDR_H
+#define QPID_LINEARSTORE_JOURNAL_UTILS_FILE_HDR_H
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
@@ -41,17 +41,19 @@ extern "C"{
* block in the file. The record ID and offset are updated on each overwrite of the
* file.
*
- * File header info in binary format (66 bytes + size of file name in octets):
+ * File header info in binary format (74 bytes + size of file name in octets):
* <pre>
* 0 7
* +---+---+---+---+---+---+---+---+ -+
* | magic | ver | flags | |
- * +---+---+---+---+---+---+---+---+ | struct rec_hdr_t
- * | first rid in file | |
+ * +---+---+---+---+---+---+---+---+ |
+ * | serial | | struct rec_hdr_t
+ * +---+---+---+---+---+---+---+---+ |
+ * | rid | |
* +---+---+---+---+---+---+---+---+ -+
* | fs | partn | reserved |
* +---+---+---+---+---+---+---+---+
- * | file-size |
+ * | data-size |
* +---+---+---+---+---+---+---+---+
* | fro |
* +---+---+---+---+---+---+---+---+
@@ -87,10 +89,11 @@ typedef struct file_hdr_t {
uint16_t _queue_name_len; /**< Length of the queue name in octets, which follows this struct in the header */
} file_hdr_t;
-void file_hdr_create(file_hdr_t* dest, const uint32_t magic, const uint16_t version, const uint16_t fhdr_size_sblks,
- const uint16_t efp_partition, const uint64_t file_size);
-int file_hdr_init(void* dest, const uint64_t dest_len, const uint16_t uflag, const uint64_t rid, const uint64_t fro,
- const uint64_t file_number, const uint16_t queue_name_len, const char* queue_name);
+void file_hdr_create(file_hdr_t* dest, const uint32_t magic, const uint16_t version,
+ const uint16_t fhdr_size_sblks, const uint16_t efp_partition, const uint64_t file_size);
+int file_hdr_init(void* dest, const uint64_t dest_len, const uint16_t uflag, const uint64_t serial, const uint64_t rid,
+ const uint64_t fro, const uint64_t file_number, const uint16_t queue_name_len, const char* queue_name);
+int file_hdr_check(file_hdr_t* hdr, const uint32_t magic, const uint16_t version, const uint64_t data_size_kib);
void file_hdr_reset(file_hdr_t* target);
int is_file_hdr_reset(file_hdr_t* target);
void file_hdr_copy(file_hdr_t* dest, const file_hdr_t* src);
@@ -103,4 +106,4 @@ void set_time(file_hdr_t *fh, struct tim
}
#endif
-#endif /* ifndef QPID_LINEARSTORE_JRNL_UTILS_FILE_HDR_H */
+#endif /* ifndef QPID_LINEARSTORE_JOURNAL_UTILS_FILE_HDR_H */
Modified: qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/utils/rec_hdr.c
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/utils/rec_hdr.c?rev=1542066&r1=1542065&r2=1542066&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/utils/rec_hdr.c (original)
+++ qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/utils/rec_hdr.c Thu Nov 14 20:39:32 2013
@@ -1,9 +1,10 @@
#include "rec_hdr.h"
-void rec_hdr_init(rec_hdr_t* dest, const uint32_t magic, const uint16_t version, const uint16_t uflag, const uint64_t rid) {
+void rec_hdr_init(rec_hdr_t* dest, const uint32_t magic, const uint16_t version, const uint16_t uflag, const uint64_t serial, const uint64_t rid) {
dest->_magic = magic;
dest->_version = version;
dest->_uflag = uflag;
+ dest->_serial = serial;
dest->_rid = rid;
}
@@ -11,5 +12,19 @@ void rec_hdr_copy(rec_hdr_t* dest, const
dest->_magic = src->_magic;
dest->_version = src->_version;
dest->_uflag = src->_uflag;
+ dest->_serial = src->_serial;
dest->_rid = src->_rid;
}
+
+int rec_hdr_check_base(rec_hdr_t* header, const uint32_t magic, const uint16_t version) {
+ if (header->_magic != magic) return 1;
+ if (header->_version != version) return 2;
+ return 0;
+}
+
+int rec_hdr_check(rec_hdr_t* header, const uint32_t magic, const uint16_t version, const uint64_t serial) {
+ int res = rec_hdr_check_base(header, magic, version);
+ if (res != 0) return res;
+ if (header->_serial != serial) return 3;
+ return 0;
+}
Modified: qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/utils/rec_hdr.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/utils/rec_hdr.h?rev=1542066&r1=1542065&r2=1542066&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/utils/rec_hdr.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/utils/rec_hdr.h Thu Nov 14 20:39:32 2013
@@ -1,5 +1,5 @@
-#ifndef QPID_LINEARSTORE_JRNL_UTILS_REC_HDR_H
-#define QPID_LINEARSTORE_JRNL_UTILS_REC_HDR_H
+#ifndef QPID_LINEARSTORE_JOURNAL_UTILS_REC_HDR_H
+#define QPID_LINEARSTORE_JOURNAL_UTILS_REC_HDR_H
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
@@ -34,11 +34,13 @@ extern "C"{
* This includes identification for the file type, the encoding version, endian
* indicator and a record ID.
*
- * File header info in binary format (16 bytes):
+ * File header info in binary format (24 bytes):
* <pre>
* 0 7
* +---+---+---+---+---+---+---+---+
- * | magic | ver | flags |
+ * | magic | ver | uflag |
+ * +---+---+---+---+---+---+---+---+
+ * | serial |
* +---+---+---+---+---+---+---+---+
* | rid |
* +---+---+---+---+---+---+---+---+
@@ -52,11 +54,14 @@ typedef struct rec_hdr_t {
uint32_t _magic; /**< File type identifier (magic number) */
uint16_t _version; /**< File encoding version */
uint16_t _uflag; /**< User-defined flags */
+ uint64_t _serial; /**< Serial number for this journal file */
uint64_t _rid; /**< Record ID (rotating 64-bit counter) */
} rec_hdr_t;
-void rec_hdr_init(rec_hdr_t* dest, const uint32_t magic, const uint16_t version, const uint16_t uflag, const uint64_t rid);
+void rec_hdr_init(rec_hdr_t* dest, const uint32_t magic, const uint16_t version, const uint16_t uflag, const uint64_t serial, const uint64_t rid);
void rec_hdr_copy(rec_hdr_t* dest, const rec_hdr_t* src);
+int rec_hdr_check_base(rec_hdr_t* header, const uint32_t magic, const uint16_t version);
+int rec_hdr_check(rec_hdr_t* header, const uint32_t magic, const uint16_t version, const uint64_t serial);
#pragma pack()
@@ -64,4 +69,4 @@ void rec_hdr_copy(rec_hdr_t* dest, const
}
#endif
-#endif /* ifndef QPID_LINEARSTORE_JRNL_UTILS_REC_HDR_H */
+#endif /* ifndef QPID_LINEARSTORE_JOURNAL_UTILS_REC_HDR_H */
Modified: qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/utils/rec_tail.c
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/utils/rec_tail.c?rev=1542066&r1=1542065&r2=1542066&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/utils/rec_tail.c (original)
+++ qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/utils/rec_tail.c Thu Nov 14 20:39:32 2013
@@ -21,14 +21,25 @@
#include "rec_tail.h"
-void rec_tail_init(rec_tail_t* dest, const uint32_t xmagic, const uint32_t checksum, const uint64_t rid) {
+void rec_tail_init(rec_tail_t* dest, const uint32_t xmagic, const uint32_t checksum, const uint64_t serial,
+ const uint64_t rid) {
dest->_xmagic = xmagic;
dest->_checksum = checksum;
+ dest->_serial = serial;
dest->_rid = rid;
}
void rec_tail_copy(rec_tail_t* dest, const rec_hdr_t* src, const uint32_t checksum) {
dest->_xmagic = ~(src->_magic);
dest->_checksum = checksum;
+ dest->_serial = src->_serial;
dest->_rid = src->_rid;
}
+
+int rec_tail_check(const rec_tail_t* tail, const rec_hdr_t* header, const uint32_t checksum) {
+ if (tail->_xmagic != ~header->_magic) return 1;
+ if (tail->_serial != header->_serial) return 2;
+ if (tail->_rid != header->_rid) return 3;
+ if (tail->_checksum != checksum) return 4;
+ return 0;
+}
Modified: qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/utils/rec_tail.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/utils/rec_tail.h?rev=1542066&r1=1542065&r2=1542066&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/utils/rec_tail.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/utils/rec_tail.h Thu Nov 14 20:39:32 2013
@@ -1,5 +1,5 @@
-#ifndef QPID_LEGACYSTORE_JRNL_UTILS_REC_TAIL_H
-#define QPID_LEGACYSTORE_JRNL_UTILS_REC_TAIL_H
+#ifndef QPID_LINEARSTORE_JOURNAL_UTILS_REC_TAIL_H
+#define QPID_LINEARSTORE_JOURNAL_UTILS_REC_TAIL_H
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
@@ -41,26 +41,32 @@ extern "C"{
* The checksum is used to verify the xid and/or data portion of the record
* on recovery, and excludes the header and tail.
*
- * Record header info in binary format (16 bytes):
+ * Record header info in binary format (24 bytes):
* <pre>
* 0 7
* +---+---+---+---+---+---+---+---+
* | ~(magic) | checksum |
* +---+---+---+---+---+---+---+---+
+ * | serial |
+ * +---+---+---+---+---+---+---+---+
* | rid |
* +---+---+---+---+---+---+---+---+
*
- * rid = Record ID
+ * ~(magic) = 1's compliment of magic of matching record header
+ * rid = Record ID of matching record header
* </pre>
*/
typedef struct rec_tail_t {
uint32_t _xmagic; /**< Binary inverse (1's complement) of hdr magic number */
- uint32_t _checksum; /**< Checksum of xid and data */
- uint64_t _rid; /**< ID (rotating 64-bit counter) */
+ uint32_t _checksum; /**< Checksum of xid and data (excluding header itself) */
+ uint64_t _serial; /**< Serial number for this journal file */
+ uint64_t _rid; /**< Record ID (rotating 64-bit counter) */
} rec_tail_t;
-void rec_tail_init(rec_tail_t* dest, const uint32_t xmagic, const uint32_t checksum, const uint64_t rid);
+void rec_tail_init(rec_tail_t* dest, const uint32_t xmagic, const uint32_t checksum, const uint64_t serial,
+ const uint64_t rid);
void rec_tail_copy(rec_tail_t* dest, const rec_hdr_t* src, const uint32_t checksum);
+int rec_tail_check(const rec_tail_t* tail, const rec_hdr_t* header, const uint32_t checksum);
#pragma pack()
@@ -68,4 +74,4 @@ void rec_tail_copy(rec_tail_t* dest, con
}
#endif
-#endif /* ifnedf QPID_LEGACYSTORE_JRNL_UTILS_REC_TAIL_H */
+#endif /* ifnedf QPID_LINEARSTORE_JOURNAL_UTILS_REC_TAIL_H */
Modified: qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/utils/txn_hdr.c
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/utils/txn_hdr.c?rev=1542066&r1=1542065&r2=1542066&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/utils/txn_hdr.c (original)
+++ qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/utils/txn_hdr.c Thu Nov 14 20:39:32 2013
@@ -22,8 +22,8 @@
#include "txn_hdr.h"
void txn_hdr_init(txn_hdr_t* dest, const uint32_t magic, const uint16_t version, const uint16_t uflag,
- const uint64_t rid, const uint64_t xidsize) {
- rec_hdr_init(&dest->_rhdr, magic, version, uflag, rid);
+ const uint64_t serial, const uint64_t rid, const uint64_t xidsize) {
+ rec_hdr_init(&dest->_rhdr, magic, version, uflag, serial, rid);
dest->_xidsize = xidsize;
}
Modified: qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/utils/txn_hdr.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/utils/txn_hdr.h?rev=1542066&r1=1542065&r2=1542066&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/utils/txn_hdr.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/utils/txn_hdr.h Thu Nov 14 20:39:32 2013
@@ -1,5 +1,5 @@
-#ifndef QPID_LINEARSTORE_JRNL_UTILS_TXN_HDR_H
-#define QPID_LINEARSTORE_JRNL_UTILS_TXN_HDR_H
+#ifndef QPID_LINEARSTORE_JOURNAL_UTILS_TXN_HDR_H
+#define QPID_LINEARSTORE_JOURNAL_UTILS_TXN_HDR_H
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
@@ -40,12 +40,14 @@ extern "C"{
* Note that this record had its own rid distinct from the rids of the record(s) making up the
* transaction it is committing or aborting.
*
- * Record header info in binary format (24 bytes):
+ * Record header info in binary format (32 bytes):
* <pre>
* 0 7
* +---+---+---+---+---+---+---+---+ -+
- * | magic | v | e | flags | |
- * +---+---+---+---+---+---+---+---+ | struct rec_hdr_t
+ * | magic | ver | flags | |
+ * +---+---+---+---+---+---+---+---+ |
+ * | serial | | struct rec_hdr_t
+ * +---+---+---+---+---+---+---+---+ |
* | rid | |
* +---+---+---+---+---+---+---+---+ -+
* | xidsize |
@@ -58,7 +60,7 @@ typedef struct txn_hdr_t {
} txn_hdr_t;
void txn_hdr_init(txn_hdr_t* dest, const uint32_t magic, const uint16_t version, const uint16_t uflag,
- const uint64_t rid, const uint64_t xidsize);
+ const uint64_t serial, const uint64_t rid, const uint64_t xidsize);
void txn_hdr_copy(txn_hdr_t* dest, const txn_hdr_t* src);
#pragma pack()
@@ -67,4 +69,4 @@ void txn_hdr_copy(txn_hdr_t* dest, const
}
#endif
-#endif /* ifndef QPID_LINEARSTORE_JRNL_UTILS_TXN_HDR_H */
+#endif /* ifndef QPID_LINEARSTORE_JOURNAL_UTILS_TXN_HDR_H */
Modified: qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/wmgr.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/wmgr.cpp?rev=1542066&r1=1542065&r2=1542066&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/wmgr.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/wmgr.cpp Thu Nov 14 20:39:32 2013
@@ -35,10 +35,9 @@
//#include <iostream> // DEBUG
-namespace qpid
-{
-namespace qls_jrnl
-{
+namespace qpid {
+namespace linearstore {
+namespace journal {
wmgr::wmgr(jcntl* jc,
enq_map& emap,
@@ -150,7 +149,7 @@ wmgr::enqueue(const void* const data_buf
}
uint64_t rid = (dtokp->external_rid() | cont) ? dtokp->rid() : _lfc.getNextRecordId();
- _enq_rec.reset(rid, data_buff, tot_data_len, xid_ptr, xid_len, transient, external);
+ _enq_rec.reset(_lfc.getCurrentSerial(), rid, data_buff, tot_data_len, xid_ptr, xid_len, transient, external);
if (!cont)
{
dtokp->set_rid(rid);
@@ -265,7 +264,7 @@ wmgr::dequeue(data_tok* dtokp,
const bool ext_rid = dtokp->external_rid();
uint64_t rid = (ext_rid | cont) ? dtokp->rid() : _lfc.getNextRecordId();
uint64_t dequeue_rid = (ext_rid | cont) ? dtokp->dequeue_rid() : dtokp->rid();
- _deq_rec.reset(rid, dequeue_rid, xid_ptr, xid_len/*, _wrfc.owi()*/, txn_coml_commit);
+ _deq_rec.reset(_lfc.getCurrentSerial(), rid, dequeue_rid, xid_ptr, xid_len, txn_coml_commit);
if (!cont)
{
if (!ext_rid)
@@ -391,7 +390,7 @@ wmgr::abort(data_tok* dtokp,
}
uint64_t rid = (dtokp->external_rid() | cont) ? dtokp->rid() : _lfc.getNextRecordId();
- _txn_rec.reset(QLS_TXA_MAGIC, rid, xid_ptr, xid_len/*, _wrfc.owi()*/);
+ _txn_rec.reset(false, _lfc.getCurrentSerial(), rid, xid_ptr, xid_len);
if (!cont)
{
dtokp->set_rid(rid);
@@ -489,7 +488,7 @@ wmgr::commit(data_tok* dtokp,
}
uint64_t rid = (dtokp->external_rid() | cont) ? dtokp->rid() : _lfc.getNextRecordId();
- _txn_rec.reset(QLS_TXC_MAGIC, rid, xid_ptr, xid_len/*, _wrfc.owi()*/);
+ _txn_rec.reset(true, _lfc.getCurrentSerial(), rid, xid_ptr, xid_len);
if (!cont)
{
dtokp->set_rid(rid);
@@ -1033,4 +1032,4 @@ wmgr::status_str() const
const char* wmgr::_op_str[] = {"enqueue", "dequeue", "abort", "commit"};
-}}
+}}}
Modified: qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/wmgr.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/wmgr.h?rev=1542066&r1=1542065&r2=1542066&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/wmgr.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/wmgr.h Thu Nov 14 20:39:32 2013
@@ -19,15 +19,14 @@
*
*/
-#ifndef QPID_LEGACYSTORE_JRNL_WMGR_H
-#define QPID_LEGACYSTORE_JRNL_WMGR_H
+#ifndef QPID_LINEARSTORE_JOURNAL_WMGR_H
+#define QPID_LINEARSTORE_JOURNAL_WMGR_H
-namespace qpid
-{
-namespace qls_jrnl
-{
+namespace qpid {
+namespace linearstore {
+namespace journal {
class wmgr;
-}}
+}}}
#include <cstring>
#include "qpid/linearstore/jrnl/EmptyFilePoolTypes.h"
@@ -37,123 +36,123 @@ class wmgr;
class file_hdr_t;
-namespace qpid
+namespace qpid {
+namespace linearstore {
+namespace journal {
+
+class LinearFileController;
+
+/**
+* \brief Class for managing a write page cache of arbitrary size and number of pages.
+*
+* The write page cache works on the principle of caching the write data within a page until
+* that page is either full or flushed; this initiates a single AIO write operation to store
+* the data on disk.
+*
+* The maximum disk throughput is achieved by keeping the write operations of uniform size.
+* Waiting for a page cache to fill achieves this; and in high data volume/throughput situations
+* achieves the optimal disk throughput. Calling flush() forces a write of the current page cache
+* no matter how full it is, and disrupts the uniformity of the write operations. This should
+* normally only be done if throughput drops and there is a danger of a page of unwritten data
+* waiting around for excessive time.
+*
+* The usual tradeoff between data storage latency and throughput performance applies.
+*/
+class wmgr : public pmgr
{
-namespace qls_jrnl
-{
- class LinearFileController;
-
- /**
- * \brief Class for managing a write page cache of arbitrary size and number of pages.
- *
- * The write page cache works on the principle of caching the write data within a page until
- * that page is either full or flushed; this initiates a single AIO write operation to store
- * the data on disk.
- *
- * The maximum disk throughput is achieved by keeping the write operations of uniform size.
- * Waiting for a page cache to fill achieves this; and in high data volume/throughput situations
- * achieves the optimal disk throughput. Calling flush() forces a write of the current page cache
- * no matter how full it is, and disrupts the uniformity of the write operations. This should
- * normally only be done if throughput drops and there is a danger of a page of unwritten data
- * waiting around for excessive time.
- *
- * The usual tradeoff between data storage latency and throughput performance applies.
- */
- class wmgr : public pmgr
- {
- private:
- LinearFileController& _lfc; ///< Linear File Controller ref
- uint32_t _max_dtokpp; ///< Max data writes per page
- uint32_t _max_io_wait_us; ///< Max wait in microseconds till submit
- uint32_t _cached_offset_dblks; ///< Amount of unwritten data in page (dblocks)
- std::deque<data_tok*> _ddtokl; ///< Deferred dequeue data_tok list
-
- // TODO: Convert _enq_busy etc into a proper threadsafe lock
- // TODO: Convert to enum? Are these encodes mutually exclusive?
- bool _enq_busy; ///< Flag true if enqueue is in progress
- bool _deq_busy; ///< Flag true if dequeue is in progress
- bool _abort_busy; ///< Flag true if abort is in progress
- bool _commit_busy; ///< Flag true if commit is in progress
-
- enum _op_type { WMGR_ENQUEUE = 0, WMGR_DEQUEUE, WMGR_ABORT, WMGR_COMMIT };
- static const char* _op_str[];
-
- enq_rec _enq_rec; ///< Enqueue record used for encoding/decoding
- deq_rec _deq_rec; ///< Dequeue record used for encoding/decoding
- txn_rec _txn_rec; ///< Transaction record used for encoding/decoding
- std::set<std::string> _txn_pending_set; ///< Set containing xids of pending commits/aborts
-
- public:
- wmgr(jcntl* jc,
- enq_map& emap,
- txn_map& tmap,
- LinearFileController& lfc);
- wmgr(jcntl* jc,
- enq_map& emap,
- txn_map& tmap,
- LinearFileController& lfc,
- const uint32_t max_dtokpp,
- const uint32_t max_iowait_us);
- virtual ~wmgr();
-
- void initialize(aio_callback* const cbp,
- const uint32_t wcache_pgsize_sblks,
- const uint16_t wcache_num_pages,
- const uint32_t max_dtokpp,
- const uint32_t max_iowait_us,
- std::size_t eo = 0);
- iores enqueue(const void* const data_buff,
- const std::size_t tot_data_len,
- const std::size_t this_data_len,
- data_tok* dtokp,
- const void* const xid_ptr,
- const std::size_t xid_len,
- const bool transient,
- const bool external);
- iores dequeue(data_tok* dtokp,
- const void* const xid_ptr,
- const std::size_t xid_len,
- const bool txn_coml_commit);
- iores abort(data_tok* dtokp,
- const void* const xid_ptr,
- const std::size_t xid_len);
- iores commit(data_tok* dtokp,
- const void* const xid_ptr,
- const std::size_t xid_len);
- iores flush();
- int32_t get_events(timespec* const timeout,
- bool flush);
- bool is_txn_synced(const std::string& xid);
- inline bool curr_pg_blocked() const { return _page_cb_arr[_pg_index]._state != UNUSED; }
- inline uint32_t unflushed_dblks() { return _cached_offset_dblks; }
-
- // Debug aid
- const std::string status_str() const;
-
- private:
- void initialize(aio_callback* const cbp,
- const uint32_t wcache_pgsize_sblks,
- const uint16_t wcache_num_pages);
- iores pre_write_check(const _op_type op,
- const data_tok* const dtokp,
- const std::size_t xidsize = 0,
- const std::size_t dsize = 0,
- const bool external = false) const;
- void dequeue_check(const std::string& xid,
- const uint64_t drid);
- void file_header_check(const uint64_t rid,
- const bool cont,
- const uint32_t rec_dblks_rem);
- void flush_check(iores& res,
- bool& cont,
- bool& done, const uint64_t rid);
- iores write_flush();
- void get_next_file();
- void dblk_roundup();
- void rotate_page();
- void clean();
- };
+private:
+ LinearFileController& _lfc; ///< Linear File Controller ref
+ uint32_t _max_dtokpp; ///< Max data writes per page
+ uint32_t _max_io_wait_us; ///< Max wait in microseconds till submit
+ uint32_t _cached_offset_dblks; ///< Amount of unwritten data in page (dblocks)
+ std::deque<data_tok*> _ddtokl; ///< Deferred dequeue data_tok list
+
+ // TODO: Convert _enq_busy etc into a proper threadsafe lock
+ // TODO: Convert to enum? Are these encodes mutually exclusive?
+ bool _enq_busy; ///< Flag true if enqueue is in progress
+ bool _deq_busy; ///< Flag true if dequeue is in progress
+ bool _abort_busy; ///< Flag true if abort is in progress
+ bool _commit_busy; ///< Flag true if commit is in progress
+
+ enum _op_type { WMGR_ENQUEUE = 0, WMGR_DEQUEUE, WMGR_ABORT, WMGR_COMMIT };
+ static const char* _op_str[];
+
+ enq_rec _enq_rec; ///< Enqueue record used for encoding/decoding
+ deq_rec _deq_rec; ///< Dequeue record used for encoding/decoding
+ txn_rec _txn_rec; ///< Transaction record used for encoding/decoding
+ std::set<std::string> _txn_pending_set; ///< Set containing xids of pending commits/aborts
+
+public:
+ wmgr(jcntl* jc,
+ enq_map& emap,
+ txn_map& tmap,
+ LinearFileController& lfc);
+ wmgr(jcntl* jc,
+ enq_map& emap,
+ txn_map& tmap,
+ LinearFileController& lfc,
+ const uint32_t max_dtokpp,
+ const uint32_t max_iowait_us);
+ virtual ~wmgr();
+
+ void initialize(aio_callback* const cbp,
+ const uint32_t wcache_pgsize_sblks,
+ const uint16_t wcache_num_pages,
+ const uint32_t max_dtokpp,
+ const uint32_t max_iowait_us,
+ std::size_t eo = 0);
+ iores enqueue(const void* const data_buff,
+ const std::size_t tot_data_len,
+ const std::size_t this_data_len,
+ data_tok* dtokp,
+ const void* const xid_ptr,
+ const std::size_t xid_len,
+ const bool transient,
+ const bool external);
+ iores dequeue(data_tok* dtokp,
+ const void* const xid_ptr,
+ const std::size_t xid_len,
+ const bool txn_coml_commit);
+ iores abort(data_tok* dtokp,
+ const void* const xid_ptr,
+ const std::size_t xid_len);
+ iores commit(data_tok* dtokp,
+ const void* const xid_ptr,
+ const std::size_t xid_len);
+ iores flush();
+ int32_t get_events(timespec* const timeout,
+ bool flush);
+ bool is_txn_synced(const std::string& xid);
+ inline bool curr_pg_blocked() const { return _page_cb_arr[_pg_index]._state != UNUSED; }
+ inline uint32_t unflushed_dblks() { return _cached_offset_dblks; }
+
+ // Debug aid
+ const std::string status_str() const;
+
+private:
+ void initialize(aio_callback* const cbp,
+ const uint32_t wcache_pgsize_sblks,
+ const uint16_t wcache_num_pages);
+ iores pre_write_check(const _op_type op,
+ const data_tok* const dtokp,
+ const std::size_t xidsize = 0,
+ const std::size_t dsize = 0,
+ const bool external = false) const;
+ void dequeue_check(const std::string& xid,
+ const uint64_t drid);
+ void file_header_check(const uint64_t rid,
+ const bool cont,
+ const uint32_t rec_dblks_rem);
+ void flush_check(iores& res,
+ bool& cont,
+ bool& done, const uint64_t rid);
+ iores write_flush();
+ void get_next_file();
+ void dblk_roundup();
+ void rotate_page();
+ void clean();
+};
-}}
+}}}
-#endif // ifndef QPID_LEGACYSTORE_JRNL_WMGR_H
+#endif // ifndef QPID_LINEARSTORE_JOURNAL_WMGR_H
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org