You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kp...@apache.org on 2013/10/21 23:26:11 UTC

svn commit: r1534383 [4/4] - in /qpid/branches/linearstore/qpid/cpp/src: ./ qpid/broker/ qpid/legacystore/jrnl/ qpid/linearstore/ qpid/linearstore/jrnl/ qpid/linearstore/jrnl/utils/

Modified: qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/pmgr.h
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/pmgr.h?rev=1534383&r1=1534382&r2=1534383&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/pmgr.h (original)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/pmgr.h Mon Oct 21 21:26:10 2013
@@ -37,7 +37,6 @@ namespace qls_jrnl
 #include "qpid/linearstore/jrnl/deq_rec.h"
 #include "qpid/linearstore/jrnl/enq_map.h"
 #include "qpid/linearstore/jrnl/enq_rec.h"
-//#include "qpid/linearstore/jrnl/fcntl.h"
 #include "qpid/linearstore/jrnl/txn_map.h"
 #include "qpid/linearstore/jrnl/txn_rec.h"
 
@@ -61,8 +60,7 @@ class JournalFile;
         {
             UNUSED,                     ///< A page is uninitialized, contains no data.
             IN_USE,                     ///< Page is in use.
-            AIO_PENDING,                ///< An AIO request outstanding.
-            AIO_COMPLETE                ///< An AIO request is complete.
+            AIO_PENDING                 ///< An AIO request outstanding.
         };
 
         /**
@@ -77,8 +75,6 @@ class JournalFile;
             uint32_t _wdblks;           ///< Total number of dblks in page so far
             uint32_t _rdblks;           ///< Total number of dblks in page
             std::deque<data_tok*>* _pdtokl; ///< Page message tokens list
-            //fcntl* _wfh;                ///< File handle for incrementing write compl counts
-            //fcntl* _rfh;                ///< File handle for incrementing read compl counts
             JournalFile* _jfp;          ///< Journal file for incrementing compl counts
             void* _pbuff;               ///< Page buffer
 
@@ -113,7 +109,7 @@ class JournalFile;
         pmgr(jcntl* jc, enq_map& emap, txn_map& tmap);
         virtual ~pmgr();
 
-        virtual int32_t get_events(page_state state, timespec* const timeout, bool flush = false) = 0;
+        virtual int32_t get_events(timespec* const timeout, bool flush) = 0;
         inline uint32_t get_aio_evt_rem() const { return _aio_evt_rem; }
         static const char* page_state_str(page_state ps);
         inline uint32_t cache_pgsize_sblks() const { return _cache_pgsize_sblks; }

Modified: qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/txn_rec.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/txn_rec.cpp?rev=1534383&r1=1534382&r2=1534383&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/txn_rec.cpp (original)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/txn_rec.cpp Mon Oct 21 21:26:10 2013
@@ -94,8 +94,8 @@ txn_rec::encode(void* wptr, uint32_t rec
     assert(max_size_dblks > 0);
     assert(_xidp != 0 && _txn_hdr._xidsize > 0);
 
-    std::size_t rec_offs = rec_offs_dblks * JRNL_DBLK_SIZE_BYTES;
-    std::size_t rem = max_size_dblks * JRNL_DBLK_SIZE_BYTES;
+    std::size_t rec_offs = rec_offs_dblks * QLS_DBLK_SIZE_BYTES;
+    std::size_t rem = max_size_dblks * QLS_DBLK_SIZE_BYTES;
     std::size_t wr_cnt = 0;
     if (rec_offs_dblks) // Continuation of split dequeue record (over 2 or more pages)
     {
@@ -145,10 +145,10 @@ txn_rec::encode(void* wptr, uint32_t rec
             {
                 std::memcpy((char*)wptr + wr_cnt, (char*)&_txn_tail + rec_offs, wsize);
                 wr_cnt += wsize;
-#ifdef RHM_CLEAN
-                std::size_t rec_offs = rec_offs_dblks * JRNL_DBLK_SIZE_BYTES;
-                std::size_t dblk_rec_size = size_dblks(rec_size() - rec_offs) * JRNL_DBLK_SIZE_BYTES;
-                std::memset((char*)wptr + wr_cnt, RHM_CLEAN_CHAR, dblk_rec_size - wr_cnt);
+#ifdef QLS_CLEAN
+                std::size_t rec_offs = rec_offs_dblks * QLS_DBLK_SIZE_BYTES;
+                std::size_t dblk_rec_size = size_dblks(rec_size() - rec_offs) * QLS_DBLK_SIZE_BYTES;
+                std::memset((char*)wptr + wr_cnt, QLS_CLEAN_CHAR, dblk_rec_size - wr_cnt);
 #endif
             }
             rec_offs -= sizeof(_txn_tail) - wsize;
@@ -186,9 +186,9 @@ txn_rec::encode(void* wptr, uint32_t rec
             wr_cnt += _txn_hdr._xidsize;
             std::memcpy((char*)wptr + wr_cnt, (void*)&_txn_tail, sizeof(_txn_tail));
             wr_cnt += sizeof(_txn_tail);
-#ifdef RHM_CLEAN
-            std::size_t dblk_rec_size = size_dblks(rec_size()) * JRNL_DBLK_SIZE_BYTES;
-            std::memset((char*)wptr + wr_cnt, RHM_CLEAN_CHAR, dblk_rec_size - wr_cnt);
+#ifdef QLS_CLEAN
+            std::size_t dblk_rec_size = size_dblks(rec_size()) * QLS_DBLK_SIZE_BYTES;
+            std::memset((char*)wptr + wr_cnt, QLS_CLEAN_CHAR, dblk_rec_size - wr_cnt);
 #endif
         }
     }
@@ -206,7 +206,7 @@ txn_rec::decode(rec_hdr_t& h, void* rptr
     {
         const uint32_t hdr_xid_dblks = size_dblks(sizeof(txn_hdr_t) + _txn_hdr._xidsize);
         const uint32_t hdr_xid_tail_dblks = size_dblks(sizeof(txn_hdr_t) +  _txn_hdr._xidsize + sizeof(rec_tail_t));
-        const std::size_t rec_offs = rec_offs_dblks * JRNL_DBLK_SIZE_BYTES;
+        const std::size_t rec_offs = rec_offs_dblks * QLS_DBLK_SIZE_BYTES;
 
         if (hdr_xid_tail_dblks - rec_offs_dblks <= max_size_dblks)
         {
@@ -239,7 +239,7 @@ txn_rec::decode(rec_hdr_t& h, void* rptr
             const std::size_t xid_rem = _txn_hdr._xidsize - xid_offs;
             std::memcpy((char*)_buff + xid_offs, rptr, xid_rem);
             rd_cnt += xid_rem;
-            const std::size_t tail_rem = (max_size_dblks * JRNL_DBLK_SIZE_BYTES) - rd_cnt;
+            const std::size_t tail_rem = (max_size_dblks * QLS_DBLK_SIZE_BYTES) - rd_cnt;
             if (tail_rem)
             {
                 std::memcpy((void*)&_txn_tail, ((char*)rptr + xid_rem), tail_rem);
@@ -249,7 +249,7 @@ txn_rec::decode(rec_hdr_t& h, void* rptr
         else
         {
             // Remainder of xid split
-            const std::size_t xid_cp_size = (max_size_dblks * JRNL_DBLK_SIZE_BYTES);
+            const std::size_t xid_cp_size = (max_size_dblks * QLS_DBLK_SIZE_BYTES);
             std::memcpy((char*)_buff + rec_offs - sizeof(txn_hdr_t), rptr, xid_cp_size);
             rd_cnt += xid_cp_size;
         }
@@ -288,7 +288,7 @@ txn_rec::decode(rec_hdr_t& h, void* rptr
             // Entire header and xid fit within this page, tail split
             std::memcpy(_buff, (char*)rptr + rd_cnt, _txn_hdr._xidsize);
             rd_cnt += _txn_hdr._xidsize;
-            const std::size_t tail_rem = (max_size_dblks * JRNL_DBLK_SIZE_BYTES) - rd_cnt;
+            const std::size_t tail_rem = (max_size_dblks * QLS_DBLK_SIZE_BYTES) - rd_cnt;
             if (tail_rem)
             {
                 std::memcpy((void*)&_txn_tail, (char*)rptr + rd_cnt, tail_rem);
@@ -298,7 +298,7 @@ txn_rec::decode(rec_hdr_t& h, void* rptr
         else
         {
             // Header fits within this page, xid split
-            const std::size_t xid_cp_size = (max_size_dblks * JRNL_DBLK_SIZE_BYTES) - rd_cnt;
+            const std::size_t xid_cp_size = (max_size_dblks * QLS_DBLK_SIZE_BYTES) - rd_cnt;
             std::memcpy(_buff, (char*)rptr + rd_cnt, xid_cp_size);
             rd_cnt += xid_cp_size;
         }
@@ -357,7 +357,7 @@ txn_rec::rcv_decode(rec_hdr_t h, std::if
             return false;
         }
     }
-    ifsp->ignore(rec_size_dblks() * JRNL_DBLK_SIZE_BYTES - rec_size());
+    ifsp->ignore(rec_size_dblks() * QLS_DBLK_SIZE_BYTES - rec_size());
     chk_tail(); // Throws if tail invalid or record incomplete
     assert(!ifsp->fail() && !ifsp->bad());
     return true;

Modified: qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/utils/file_hdr.c
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/utils/file_hdr.c?rev=1534383&r1=1534382&r2=1534383&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/utils/file_hdr.c (original)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/utils/file_hdr.c Mon Oct 21 21:26:10 2013
@@ -73,7 +73,6 @@ void file_hdr_reset(file_hdr_t* target) 
     target->_ts_nsec = 0;
     target->_file_number = 0;
     target->_queue_name_len = 0;
-    memset(target + sizeof(file_hdr_t), 0, MAX_FILE_HDR_LEN - sizeof(file_hdr_t));
 }
 
 int is_file_hdr_reset(file_hdr_t* target) {

Modified: qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/wmgr.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/wmgr.cpp?rev=1534383&r1=1534382&r2=1534383&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/wmgr.cpp (original)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/wmgr.cpp Mon Oct 21 21:26:10 2013
@@ -26,20 +26,24 @@
 #include <cstdlib>
 #include <cstring>
 #include "qpid/linearstore/jrnl/utils/file_hdr.h"
+#include "qpid/linearstore/jrnl/jcfg.h"
 #include "qpid/linearstore/jrnl/jcntl.h"
 #include "qpid/linearstore/jrnl/jerrno.h"
 #include "qpid/linearstore/jrnl/JournalFile.h"
 #include <sstream>
 #include <stdint.h>
 
-//#include <iostream> // DEBUG
+#include <iostream> // DEBUG
 
 namespace qpid
 {
 namespace qls_jrnl
 {
 
-wmgr::wmgr(jcntl* jc, enq_map& emap, txn_map& tmap, LinearFileController& lfc):
+wmgr::wmgr(jcntl* jc,
+           enq_map& emap,
+           txn_map& tmap,
+           LinearFileController& lfc):
         pmgr(jc, emap, tmap),
         _lfc(lfc),
         _max_dtokpp(0),
@@ -52,8 +56,13 @@ wmgr::wmgr(jcntl* jc, enq_map& emap, txn
         _txn_pending_set()
 {}
 
-wmgr::wmgr(jcntl* jc, enq_map& emap, txn_map& tmap, LinearFileController& lfc, const uint32_t max_dtokpp, const uint32_t max_iowait_us):
-        pmgr(jc, emap, tmap /* , dtoklp */),
+wmgr::wmgr(jcntl* jc,
+           enq_map& emap,
+           txn_map& tmap,
+           LinearFileController& lfc,
+           const uint32_t max_dtokpp,
+           const uint32_t max_iowait_us):
+        pmgr(jc, emap, tmap),
         _lfc(lfc),
         _max_dtokpp(max_dtokpp),
         _max_io_wait_us(max_iowait_us),
@@ -71,9 +80,12 @@ wmgr::~wmgr()
 }
 
 void
-wmgr::initialize(aio_callback* const cbp, const uint32_t wcache_pgsize_sblks,
-        const uint16_t wcache_num_pages, const uint32_t max_dtokpp, const uint32_t max_iowait_us,
-        std::size_t eo)
+wmgr::initialize(aio_callback* const cbp,
+                 const uint32_t wcache_pgsize_sblks,
+                 const uint16_t wcache_num_pages,
+                 const uint32_t max_dtokpp,
+                 const uint32_t max_iowait_us,
+                 std::size_t eo)
 {
     _enq_busy = false;
     _deq_busy = false;
@@ -86,26 +98,38 @@ wmgr::initialize(aio_callback* const cbp
 
     if (eo)
     {
-        const uint32_t wr_pg_size_dblks = _cache_pgsize_sblks * JRNL_SBLK_SIZE_DBLKS;
-        uint32_t data_dblks = (eo / JRNL_DBLK_SIZE_BYTES) - 4; // 4 dblks for file hdr
+        const uint32_t wr_pg_size_dblks = _cache_pgsize_sblks * QLS_SBLK_SIZE_DBLKS;
+        uint32_t data_dblks = (eo / QLS_DBLK_SIZE_BYTES) - 4; // 4 dblks for file hdr
         _pg_cntr = data_dblks / wr_pg_size_dblks;
         _pg_offset_dblks = data_dblks - (_pg_cntr * wr_pg_size_dblks);
     }
 }
 
 iores
-wmgr::enqueue(const void* const data_buff, const std::size_t tot_data_len,
-        const std::size_t this_data_len, data_tok* dtokp, const void* const xid_ptr,
-        const std::size_t xid_len, const bool transient, const bool external)
+wmgr::enqueue(const void* const data_buff,
+              const std::size_t tot_data_len,
+              const std::size_t this_data_len,
+              data_tok* dtokp,
+              const void* const xid_ptr,
+              const std::size_t xid_len,
+              const bool transient,
+              const bool external)
 {
     if (xid_len)
         assert(xid_ptr != 0);
 
-    if (_deq_busy || _abort_busy || _commit_busy)
-        return RHM_IORES_BUSY;
+    if (_deq_busy || _abort_busy || _commit_busy) {
+        std::ostringstream oss;
+        oss << "RHM_IORES_BUSY: enqueue while part way through another op:";
+        oss << " _deq_busy=" << (_deq_busy?"T":"F");
+        oss << " _abort_busy=" << (_abort_busy?"T":"F");
+        oss << " _commit_busy=" << (_commit_busy?"T":"F");
+        throw jexception(oss.str()); // TODO: complete exception
+    }
 
-    if (this_data_len != tot_data_len && !external)
-        return RHM_IORES_NOTIMPL;
+    if (this_data_len != tot_data_len && !external) {
+        throw jexception("RHM_IORES_NOTIMPL: partial enqueues not implemented"); // TODO: complete exception;
+    }
 
     iores res = pre_write_check(WMGR_ENQUEUE, dtokp, xid_len, tot_data_len, external);
     if (res != RHM_IORES_SUCCESS)
@@ -124,9 +148,8 @@ wmgr::enqueue(const void* const data_buf
         }
     }
 
-    uint64_t rid = (dtokp->external_rid() | cont) ? dtokp->rid() : _lfc.getNextRecordId();/*_wrfc.get_incr_rid()*/
-    _enq_rec.reset(rid, data_buff, tot_data_len, xid_ptr, xid_len/*, _wrfc.owi()*/, transient,
-            external);
+    uint64_t rid = (dtokp->external_rid() | cont) ? dtokp->rid() : _lfc.getNextRecordId();
+    _enq_rec.reset(rid, data_buff, tot_data_len, xid_ptr, xid_len, transient, external);
     if (!cont)
     {
         dtokp->set_rid(rid);
@@ -137,14 +160,16 @@ wmgr::enqueue(const void* const data_buf
             dtokp->clear_xid();
         _enq_busy = true;
     }
+//std::cout << "---+++ wmgr::enqueue() ENQ rid=0x" << std::hex << rid << " " << std::dec << std::flush; // DEBUG
     bool done = false;
     while (!done)
     {
-        assert(_pg_offset_dblks < _cache_pgsize_sblks * JRNL_SBLK_SIZE_DBLKS);
-        void* wptr = (void*)((char*)_page_ptr_arr[_pg_index] + _pg_offset_dblks * JRNL_DBLK_SIZE_BYTES);
+//std::cout << "*" << std::flush; // DEBUG
+        assert(_pg_offset_dblks < _cache_pgsize_sblks * QLS_SBLK_SIZE_DBLKS);
+        void* wptr = (void*)((char*)_page_ptr_arr[_pg_index] + _pg_offset_dblks * QLS_DBLK_SIZE_BYTES);
         uint32_t data_offs_dblks = dtokp->dblocks_written();
         uint32_t ret = _enq_rec.encode(wptr, data_offs_dblks,
-                (_cache_pgsize_sblks * JRNL_SBLK_SIZE_DBLKS) - _pg_offset_dblks);
+                (_cache_pgsize_sblks * QLS_SBLK_SIZE_DBLKS) - _pg_offset_dblks);
 
         // Remember fid which contains the record header in case record is split over several files
         if (data_offs_dblks == 0) {
@@ -159,6 +184,7 @@ wmgr::enqueue(const void* const data_buf
         // Is the encoding of this record complete?
         if (dtokp->dblocks_written() >= _enq_rec.rec_size_dblks())
         {
+//std::cout << "!" << std::flush; // DEBUG
             // TODO: Incorrect - must set state to ENQ_CACHED; ENQ_SUBM is set when AIO returns.
             dtokp->set_wstate(data_tok::ENQ_SUBM);
             dtokp->set_dsize(tot_data_len);
@@ -166,7 +192,8 @@ wmgr::enqueue(const void* const data_buf
             // long multi-page messages have their token on the page containing the END of the
             // message. AIO callbacks will then only process this token when entire message is
             // enqueued.
-            _lfc.incrEnqueuedRecordCount();
+            _lfc.incrEnqueuedRecordCount(dtokp->fid());
+//std::cout << "[0x" << std::hex << _lfc.getEnqueuedRecordCount() << std::dec << std::flush; // DEBUG
 
             if (xid_len) // If part of transaction, add to transaction map
             {
@@ -185,26 +212,37 @@ wmgr::enqueue(const void* const data_buf
             }
 
             done = true;
-        }
-        else
+        } else {
+//std::cout << "$" << std::endl << std::flush; // DEBUG
             dtokp->set_wstate(data_tok::ENQ_PART);
+        }
 
         file_header_check(rid, cont, _enq_rec.rec_size_dblks() - data_offs_dblks);
-        flush_check(res, cont, done);
+        flush_check(res, cont, done, rid);
     }
     if (dtokp->wstate() >= data_tok::ENQ_SUBM)
         _enq_busy = false;
+//std::cout << " res=" << iores_str(res) << " _enq_busy=" << (_enq_busy?"T":"F") << std::endl << std::flush; // DEBUG
     return res;
 }
 
 iores
-wmgr::dequeue(data_tok* dtokp, const void* const xid_ptr, const std::size_t xid_len, const bool txn_coml_commit)
+wmgr::dequeue(data_tok* dtokp,
+              const void* const xid_ptr,
+              const std::size_t xid_len,
+              const bool txn_coml_commit)
 {
     if (xid_len)
         assert(xid_ptr != 0);
 
-    if (_enq_busy || _abort_busy || _commit_busy)
-        return RHM_IORES_BUSY;
+    if (_enq_busy || _abort_busy || _commit_busy) {
+        std::ostringstream oss;
+        oss << "RHM_IORES_BUSY: dequeue while part way through another op:";
+        oss << " _enq_busy=" << (_enq_busy?"T":"F");
+        oss << " _abort_busy=" << (_abort_busy?"T":"F");
+        oss << " _commit_busy=" << (_commit_busy?"T":"F");
+        throw jexception(oss.str()); // TODO: complete exception
+    }
 
     iores res = pre_write_check(WMGR_DEQUEUE, dtokp);
     if (res != RHM_IORES_SUCCESS)
@@ -224,7 +262,7 @@ wmgr::dequeue(data_tok* dtokp, const voi
     }
 
     const bool ext_rid = dtokp->external_rid();
-    uint64_t rid = (ext_rid | cont) ? dtokp->rid() : /*_wrfc.get_incr_rid()*/0; // TODO: replace for linearstore: _wrfc
+    uint64_t rid = (ext_rid | cont) ? dtokp->rid() : _lfc.getNextRecordId();
     uint64_t dequeue_rid = (ext_rid | cont) ? dtokp->dequeue_rid() : dtokp->rid();
     _deq_rec.reset(rid, dequeue_rid, xid_ptr, xid_len/*, _wrfc.owi()*/, txn_coml_commit);
     if (!cont)
@@ -242,14 +280,16 @@ wmgr::dequeue(data_tok* dtokp, const voi
         dtokp->set_dblocks_written(0); // Reset dblks_written from previous op
         _deq_busy = true;
     }
+//std::cout << "---+++ wmgr::dequeue() DEQ rid=0x" << std::hex << rid << " drid=0x" << dequeue_rid << " " << std::dec << std::flush; // DEBUG
     bool done = false;
     while (!done)
     {
-        assert(_pg_offset_dblks < _cache_pgsize_sblks * JRNL_SBLK_SIZE_DBLKS);
-        void* wptr = (void*)((char*)_page_ptr_arr[_pg_index] + _pg_offset_dblks * JRNL_DBLK_SIZE_BYTES);
+//std::cout << "*" << std::flush; // DEBUG
+        assert(_pg_offset_dblks < _cache_pgsize_sblks * QLS_SBLK_SIZE_DBLKS);
+        void* wptr = (void*)((char*)_page_ptr_arr[_pg_index] + _pg_offset_dblks * QLS_DBLK_SIZE_BYTES);
         uint32_t data_offs_dblks = dtokp->dblocks_written();
         uint32_t ret = _deq_rec.encode(wptr, data_offs_dblks,
-                (_cache_pgsize_sblks * JRNL_SBLK_SIZE_DBLKS) - _pg_offset_dblks);
+                (_cache_pgsize_sblks * QLS_SBLK_SIZE_DBLKS) - _pg_offset_dblks);
 
         // Remember fid which contains the record header in case record is split over several files
         if (data_offs_dblks == 0) {
@@ -264,6 +304,7 @@ wmgr::dequeue(data_tok* dtokp, const voi
         // Is the encoding of this record complete?
         if (dtokp->dblocks_written() >= _deq_rec.rec_size_dblks())
         {
+//std::cout << "!" << std::flush; // DEBUG
             // TODO: Incorrect - must set state to ENQ_CACHED; ENQ_SUBM is set when AIO returns.
             dtokp->set_wstate(data_tok::DEQ_SUBM);
 
@@ -276,7 +317,7 @@ wmgr::dequeue(data_tok* dtokp, const voi
             }
             else
             {
-                int16_t fid;
+                uint64_t fid;
                 short eres = _emap.get_remove_pfid(dtokp->dequeue_rid(), fid);
                 if (eres < enq_map::EMAP_OK) // fail
                 {
@@ -293,30 +334,43 @@ wmgr::dequeue(data_tok* dtokp, const voi
                         throw jexception(jerrno::JERR_MAP_LOCKED, oss.str(), "wmgr", "dequeue");
                     }
                 }
-                _lfc.decrEnqueuedRecordCount();
+//std::cout << "[0x" << std::hex << _lfc.getEnqueuedRecordCount(fid) << std::dec << std::flush; // DEBUG
+//try {
+                _lfc.decrEnqueuedRecordCount(fid);
+//} catch (std::exception& e) { std::cout << "***OOPS*** " << e.what() << " cfid=" << _lfc.getCurrentFileSeqNum() << " fid=" << fid << std::flush; throw; }
             }
 
             done = true;
-        }
-        else
+        } else {
+//std::cout << "$" << std::flush; // DEBUG
             dtokp->set_wstate(data_tok::DEQ_PART);
+        }
 
         file_header_check(rid, cont, _deq_rec.rec_size_dblks() - data_offs_dblks);
-        flush_check(res, cont, done);
+        flush_check(res, cont, done, rid);
     }
     if (dtokp->wstate() >= data_tok::DEQ_SUBM)
         _deq_busy = false;
+//std::cout << " res=" << iores_str(res) << " _deq_busy=" << (_deq_busy?"T":"F") << std::endl << std::flush; // DEBUG
     return res;
 }
 
 iores
-wmgr::abort(data_tok* dtokp, const void* const xid_ptr, const std::size_t xid_len)
+wmgr::abort(data_tok* dtokp,
+            const void* const xid_ptr,
+            const std::size_t xid_len)
 {
     // commit and abort MUST have a valid xid
     assert(xid_ptr != 0 && xid_len > 0);
 
-    if (_enq_busy || _deq_busy || _commit_busy)
-        return RHM_IORES_BUSY;
+    if (_enq_busy || _deq_busy || _commit_busy) {
+        std::ostringstream oss;
+        oss << "RHM_IORES_BUSY: abort while part way through another op:";
+        oss << " _enq_busy=" << (_enq_busy?"T":"F");
+        oss << " _deq_busy=" << (_deq_busy?"T":"F");
+        oss << " _commit_busy=" << (_commit_busy?"T":"F");
+        throw jexception(oss.str()); // TODO: complete exception
+    }
 
     iores res = pre_write_check(WMGR_ABORT, dtokp);
     if (res != RHM_IORES_SUCCESS)
@@ -348,11 +402,11 @@ wmgr::abort(data_tok* dtokp, const void*
     bool done = false;
     while (!done)
     {
-        assert(_pg_offset_dblks < _cache_pgsize_sblks * JRNL_SBLK_SIZE_DBLKS);
-        void* wptr = (void*)((char*)_page_ptr_arr[_pg_index] + _pg_offset_dblks * JRNL_DBLK_SIZE_BYTES);
+        assert(_pg_offset_dblks < _cache_pgsize_sblks * QLS_SBLK_SIZE_DBLKS);
+        void* wptr = (void*)((char*)_page_ptr_arr[_pg_index] + _pg_offset_dblks * QLS_DBLK_SIZE_BYTES);
         uint32_t data_offs_dblks = dtokp->dblocks_written();
         uint32_t ret = _txn_rec.encode(wptr, data_offs_dblks,
-                (_cache_pgsize_sblks * JRNL_SBLK_SIZE_DBLKS) - _pg_offset_dblks);
+                (_cache_pgsize_sblks * QLS_SBLK_SIZE_DBLKS) - _pg_offset_dblks);
 
         // Remember fid which contains the record header in case record is split over several files
         if (data_offs_dblks == 0)
@@ -392,7 +446,7 @@ wmgr::abort(data_tok* dtokp, const void*
             dtokp->set_wstate(data_tok::ABORT_PART);
 
         file_header_check(rid, cont, _txn_rec.rec_size_dblks() - data_offs_dblks);
-        flush_check(res, cont, done);
+        flush_check(res, cont, done, rid);
     }
     if (dtokp->wstate() >= data_tok::ABORT_SUBM)
         _abort_busy = false;
@@ -400,13 +454,21 @@ wmgr::abort(data_tok* dtokp, const void*
 }
 
 iores
-wmgr::commit(data_tok* dtokp, const void* const xid_ptr, const std::size_t xid_len)
+wmgr::commit(data_tok* dtokp,
+             const void* const xid_ptr,
+             const std::size_t xid_len)
 {
     // commit and abort MUST have a valid xid
     assert(xid_ptr != 0 && xid_len > 0);
 
-    if (_enq_busy || _deq_busy || _abort_busy)
-        return RHM_IORES_BUSY;
+    if (_enq_busy || _deq_busy || _abort_busy) {
+        std::ostringstream oss;
+        oss << "RHM_IORES_BUSY: commit while part way through another op:";
+        oss << " _enq_busy=" << (_enq_busy?"T":"F");
+        oss << " _deq_busy=" << (_deq_busy?"T":"F");
+        oss << " _abort_busy=" << (_abort_busy?"T":"F");
+        throw jexception(oss.str()); // TODO: complete exception
+    }
 
     iores res = pre_write_check(WMGR_COMMIT, dtokp);
     if (res != RHM_IORES_SUCCESS)
@@ -438,11 +500,11 @@ wmgr::commit(data_tok* dtokp, const void
     bool done = false;
     while (!done)
     {
-        assert(_pg_offset_dblks < _cache_pgsize_sblks * JRNL_SBLK_SIZE_DBLKS);
-        void* wptr = (void*)((char*)_page_ptr_arr[_pg_index] + _pg_offset_dblks * JRNL_DBLK_SIZE_BYTES);
+        assert(_pg_offset_dblks < _cache_pgsize_sblks * QLS_SBLK_SIZE_DBLKS);
+        void* wptr = (void*)((char*)_page_ptr_arr[_pg_index] + _pg_offset_dblks * QLS_DBLK_SIZE_BYTES);
         uint32_t data_offs_dblks = dtokp->dblocks_written();
         uint32_t ret = _txn_rec.encode(wptr, data_offs_dblks,
-                (_cache_pgsize_sblks * JRNL_SBLK_SIZE_DBLKS) - _pg_offset_dblks);
+                (_cache_pgsize_sblks * QLS_SBLK_SIZE_DBLKS) - _pg_offset_dblks);
 
         // Remember fid which contains the record header in case record is split over several files
         if (data_offs_dblks == 0)
@@ -475,7 +537,7 @@ wmgr::commit(data_tok* dtokp, const void
                 }
                 else // txn dequeue
                 {
-                    int16_t fid;
+                    uint64_t fid;
                     short eres = _emap.get_remove_pfid(itr->_drid, fid, true);
                     if (eres < enq_map::EMAP_OK) // fail
                     {
@@ -509,7 +571,7 @@ wmgr::commit(data_tok* dtokp, const void
             dtokp->set_wstate(data_tok::COMMIT_PART);
 
         file_header_check(rid, cont, _txn_rec.rec_size_dblks() - data_offs_dblks);
-        flush_check(res, cont, done);
+        flush_check(res, cont, done, rid);
     }
     if (dtokp->wstate() >= data_tok::COMMIT_SUBM)
         _commit_busy = false;
@@ -517,29 +579,34 @@ wmgr::commit(data_tok* dtokp, const void
 }
 
 void
-wmgr::file_header_check(const uint64_t rid, const bool cont, const uint32_t rec_dblks_rem)
+wmgr::file_header_check(const uint64_t rid,
+                        const bool cont,
+                        const uint32_t rec_dblks_rem)
 {
     if (_lfc.isEmpty()) // File never written (i.e. no header or data)
     {
         std::size_t fro = 0;
         if (cont) {
-            bool file_fit = rec_dblks_rem <= _lfc.dataSize_sblks() * JRNL_SBLK_SIZE_DBLKS; // Will fit within this journal file
-            bool file_full = rec_dblks_rem == _lfc.dataSize_sblks() * JRNL_SBLK_SIZE_DBLKS; // Will exactly fill this journal file
+            bool file_fit = rec_dblks_rem <= _lfc.dataSize_sblks() * QLS_SBLK_SIZE_DBLKS; // Will fit within this journal file
+            bool file_full = rec_dblks_rem == _lfc.dataSize_sblks() * QLS_SBLK_SIZE_DBLKS; // Will exactly fill this journal file
             if (file_fit && !file_full) {
-                fro = (rec_dblks_rem + (QLS_JRNL_FHDR_RES_SIZE_SBLKS * JRNL_SBLK_SIZE_DBLKS)) * JRNL_DBLK_SIZE_BYTES;
+                fro = (rec_dblks_rem + (QLS_JRNL_FHDR_RES_SIZE_SBLKS * QLS_SBLK_SIZE_DBLKS)) * QLS_DBLK_SIZE_BYTES;
             }
         } else {
-            fro = QLS_JRNL_FHDR_RES_SIZE_SBLKS * JRNL_SBLK_SIZE_BYTES;
+            fro = QLS_JRNL_FHDR_RES_SIZE_SBLKS * QLS_SBLK_SIZE_BYTES;
         }
         _lfc.asyncFileHeaderWrite(_ioctx, 0, rid, fro);
+        _aio_evt_rem++;
     }
 }
 
 void
-wmgr::flush_check(iores& res, bool& cont, bool& done)
+wmgr::flush_check(iores& res,
+                  bool& cont,
+                  bool& done, const uint64_t /*rid*/) // DEBUG
 {
     // Is page is full, flush
-    if (_pg_offset_dblks >= _cache_pgsize_sblks * JRNL_SBLK_SIZE_DBLKS)
+    if (_pg_offset_dblks >= _cache_pgsize_sblks * QLS_SBLK_SIZE_DBLKS)
     {
         res = write_flush();
         assert(res == RHM_IORES_SUCCESS);
@@ -558,6 +625,7 @@ wmgr::flush_check(iores& res, bool& cont
             if (!done) {
                 cont = true;
             }
+//std::cout << "***** wmgr::flush_check(): GET NEXT FILE: rid=0x" << std::hex << rid << std::dec << " res=" << iores_str(res) << " cont=" << (cont?"T":"F") << " done=" << (done?"T":"F") << std::endl; // DEBUG
         }
     }
 }
@@ -580,28 +648,28 @@ wmgr::write_flush()
     // Don't bother flushing an empty page or one that is still in state AIO_PENDING
     if (_cached_offset_dblks)
     {
-        if (_page_cb_arr[_pg_index]._state == AIO_PENDING)
+        if (_page_cb_arr[_pg_index]._state == AIO_PENDING) {
+//std::cout << "#"; // DEBUG
             res = RHM_IORES_PAGE_AIOWAIT;
-        else
-        {
+        } else {
             if (_page_cb_arr[_pg_index]._state != IN_USE)
             {
                 std::ostringstream oss;
                 oss << "pg_index=" << _pg_index << " state=" << _page_cb_arr[_pg_index].state_str();
-                throw jexception(jerrno::JERR_WMGR_BADPGSTATE, oss.str(), "wmgr",
-                        "write_flush");
+                throw jexception(jerrno::JERR_WMGR_BADPGSTATE, oss.str(), "wmgr", "write_flush");
             }
 
             // Send current page using AIO
 
-            // In manual flushes, dblks may not coincide with sblks, add filler records ("RHMx")
-            // if necessary.
+            // In manual flushes, dblks may not coincide with sblks, add filler records ("RHMx") if necessary.
             dblk_roundup();
 
-            std::size_t pg_offs = (_pg_offset_dblks - _cached_offset_dblks) * JRNL_DBLK_SIZE_BYTES;
+            std::size_t pg_offs = (_pg_offset_dblks - _cached_offset_dblks) * QLS_DBLK_SIZE_BYTES;
             aio_cb* aiocbp = &_aio_cb_arr[_pg_index];
             _lfc.asyncPageWrite(_ioctx, aiocbp, (char*)_page_ptr_arr[_pg_index] + pg_offs, _cached_offset_dblks);
+            _page_cb_arr[_pg_index]._state = AIO_PENDING;
             _aio_evt_rem++;
+//std::cout << "." << _aio_evt_rem << std::flush; // DEBUG
             _cached_offset_dblks = 0;
             _jc->instr_incr_outstanding_aio_cnt();
 
@@ -610,7 +678,7 @@ wmgr::write_flush()
                _page_cb_arr[_pg_index]._state = IN_USE;
         }
     }
-    get_events(UNUSED, 0);
+    get_events(0, false);
     if (_page_cb_arr[_pg_index]._state == UNUSED)
         _page_cb_arr[_pg_index]._state = IN_USE;
     return res;
@@ -620,17 +688,19 @@ void
 wmgr::get_next_file()
 {
     _pg_cntr = 0;
+//std::cout << "&&&&& wmgr::get_next_file(): " << status_str() << std::endl; // DEBUG
     _lfc.pullEmptyFileFromEfp();
 }
 
 int32_t
-wmgr::get_events(page_state state, timespec* const timeout, bool flush)
+wmgr::get_events(timespec* const timeout,
+                 bool flush)
 {
     if (_aio_evt_rem == 0) // no events to get
         return 0;
 
     int ret = 0;
-    if ((ret = aio::getevents(_ioctx, flush ? _aio_evt_rem : 1, _aio_evt_rem/*_cache_num_pages + _jc->num_jfiles()*/, _aio_event_arr, timeout)) < 0)
+    if ((ret = aio::getevents(_ioctx, flush ? _aio_evt_rem : 1, _aio_evt_rem, _aio_event_arr, timeout)) < 0)
     {
         if (ret == -EINTR) // Interrupted by signal
             return 0;
@@ -652,6 +722,7 @@ wmgr::get_events(page_state state, times
             throw jexception(jerrno::JERR__UNDERFLOW, oss.str(), "wmgr", "get_events");
         }
         _aio_evt_rem--;
+//std::cout << "'" << _aio_evt_rem; // DEBUG
         aio_cb* aiocbp = _aio_event_arr[i].obj; // This I/O control block (iocb)
         page_cb* pcbp = (page_cb*)(aiocbp->data); // This page control block (pcb)
         long aioret = (long)_aio_event_arr[i].res;
@@ -671,6 +742,7 @@ wmgr::get_events(page_state state, times
         }
         if (pcbp) // Page writes have pcb
         {
+//std::cout << "p"; // DEBUG
             uint32_t s = pcbp->_pdtokl->size();
             std::vector<data_tok*> dtokl;
             dtokl.reserve(s);
@@ -754,7 +826,8 @@ wmgr::get_events(page_state state, times
 
             // Clean up this pcb's data_tok list
             pcbp->_pdtokl->clear();
-            pcbp->_state = state;
+            pcbp->_state = UNUSED;
+//std::cout << "c" << pcbp->_index << pcbp->state_str(); // DEBUG
 
             // Perform AIO return callback
             if (_cbp && tot_data_toks)
@@ -762,10 +835,10 @@ wmgr::get_events(page_state state, times
         }
         else // File header writes have no pcb
         {
+//std::cout << "f"; // DEBUG
             file_hdr_t* fhp = (file_hdr_t*)aiocbp->u.c.buf;
-            _lfc.addWriteCompletedDblkCount(fhp->_file_number, QLS_JRNL_FHDR_RES_SIZE_SBLKS * JRNL_SBLK_SIZE_DBLKS);
+            _lfc.addWriteCompletedDblkCount(fhp->_file_number, QLS_JRNL_FHDR_RES_SIZE_SBLKS * QLS_SBLK_SIZE_DBLKS);
             _lfc.decrOutstandingAioOperationCount(fhp->_file_number);
-            //fcntlp->set_wr_fhdr_aio_outstanding(false); // TODO: Do we need this?
         }
     }
 
@@ -784,7 +857,9 @@ wmgr::is_txn_synced(const std::string& x
 }
 
 void
-wmgr::initialize(aio_callback* const cbp, const uint32_t wcache_pgsize_sblks, const uint16_t wcache_num_pages)
+wmgr::initialize(aio_callback* const cbp,
+                 const uint32_t wcache_pgsize_sblks,
+                 const uint16_t wcache_num_pages)
 {
 
     pmgr::initialize(cbp, wcache_pgsize_sblks, wcache_num_pages);
@@ -796,9 +871,11 @@ wmgr::initialize(aio_callback* const cbp
 }
 
 iores
-wmgr::pre_write_check(const _op_type op, const data_tok* const dtokp,
-        const std::size_t /*xidsize*/, const std::size_t /*dsize*/, const bool /*external*/
-        ) const
+wmgr::pre_write_check(const _op_type op,
+                      const data_tok* const dtokp,
+                      const std::size_t /*xidsize*/,
+                      const std::size_t /*dsize*/,
+                      const bool /*external*/) const
 {
     // Check status of current file
     // TODO: Replace for LFC
@@ -861,11 +938,12 @@ wmgr::pre_write_check(const _op_type op,
 }
 
 void
-wmgr::dequeue_check(const std::string& xid, const uint64_t drid)
+wmgr::dequeue_check(const std::string& xid,
+                    const uint64_t drid)
 {
     // First check emap
     bool found = false;
-    int16_t fid;
+    uint64_t fid;
     short eres = _emap.get_pfid(drid, fid);
     if (eres < enq_map::EMAP_OK) { // fail
         if (eres == enq_map::EMAP_RID_NOT_FOUND) {
@@ -891,13 +969,13 @@ void
 wmgr::dblk_roundup()
 {
     const uint32_t xmagic = QLS_EMPTY_MAGIC;
-    uint32_t wdblks = jrec::size_blks(_cached_offset_dblks, JRNL_SBLK_SIZE_DBLKS) * JRNL_SBLK_SIZE_DBLKS;
+    uint32_t wdblks = jrec::size_blks(_cached_offset_dblks, QLS_SBLK_SIZE_DBLKS) * QLS_SBLK_SIZE_DBLKS;
     while (_cached_offset_dblks < wdblks)
     {
-        void* wptr = (void*)((char*)_page_ptr_arr[_pg_index] + _pg_offset_dblks * JRNL_DBLK_SIZE_BYTES);
+        void* wptr = (void*)((char*)_page_ptr_arr[_pg_index] + _pg_offset_dblks * QLS_DBLK_SIZE_BYTES);
         std::memcpy(wptr, (const void*)&xmagic, sizeof(xmagic));
-#ifdef RHM_CLEAN
-        std::memset((char*)wptr + sizeof(xmagic), RHM_CLEAN_CHAR, JRNL_DBLK_SIZE_BYTES - sizeof(xmagic));
+#ifdef QLS_CLEAN
+        std::memset((char*)wptr + sizeof(xmagic), QLS_CLEAN_CHAR, QLS_DBLK_SIZE_BYTES - sizeof(xmagic));
 #endif
         _pg_offset_dblks++;
         _cached_offset_dblks++;
@@ -907,14 +985,15 @@ wmgr::dblk_roundup()
 void
 wmgr::rotate_page()
 {
-    _page_cb_arr[_pg_index]._state = AIO_PENDING;
-    if (_pg_offset_dblks >= _cache_pgsize_sblks * JRNL_SBLK_SIZE_DBLKS)
+//std::cout << "^^^^^ wmgr::rotate_page() " << status_str() << " pi=" << _pg_index; // DEBUG
+    if (_pg_offset_dblks >= _cache_pgsize_sblks * QLS_SBLK_SIZE_DBLKS)
     {
         _pg_offset_dblks = 0;
         _pg_cntr++;
     }
     if (++_pg_index >= _cache_num_pages)
         _pg_index = 0;
+//std::cout << "->" << _pg_index << std::endl; // DEBUG
 }
 
 void
@@ -928,7 +1007,7 @@ wmgr::status_str() const
     std::ostringstream oss;
     oss << "wmgr: pi=" << _pg_index << " pc=" << _pg_cntr;
     oss << " po=" << _pg_offset_dblks << " aer=" << _aio_evt_rem;
-    oss << " edac:" << (_enq_busy?"T":"F") << (_deq_busy?"T":"F");
+    oss << " edac=" << (_enq_busy?"T":"F") << (_deq_busy?"T":"F");
     oss << (_abort_busy?"T":"F") << (_commit_busy?"T":"F");
     oss << " ps=[";
     for (int i=0; i<_cache_num_pages; i++)
@@ -938,11 +1017,10 @@ wmgr::status_str() const
             case UNUSED:        oss << "-"; break;
             case IN_USE:        oss << "U"; break;
             case AIO_PENDING:   oss << "A"; break;
-            case AIO_COMPLETE:  oss << "*"; break;
             default:            oss << _page_cb_arr[i]._state;
         }
     }
-    oss << "] " << _lfc.status(0);
+    oss << "] ";
     return oss.str();
 }
 

Modified: qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/wmgr.h
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/wmgr.h?rev=1534383&r1=1534382&r2=1534383&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/wmgr.h (original)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/wmgr.h Mon Oct 21 21:26:10 2013
@@ -84,22 +84,45 @@ namespace qls_jrnl
         std::set<std::string> _txn_pending_set; ///< Set containing xids of pending commits/aborts
 
     public:
-        wmgr(jcntl* jc, enq_map& emap, txn_map& tmap, LinearFileController& lfc);
-        wmgr(jcntl* jc, enq_map& emap, txn_map& tmap, LinearFileController& lfc, const uint32_t max_dtokpp, const uint32_t max_iowait_us);
+        wmgr(jcntl* jc,
+             enq_map& emap,
+             txn_map& tmap,
+             LinearFileController& lfc);
+        wmgr(jcntl* jc,
+             enq_map& emap,
+             txn_map& tmap,
+             LinearFileController& lfc,
+             const uint32_t max_dtokpp,
+             const uint32_t max_iowait_us);
         virtual ~wmgr();
 
-        void initialize(aio_callback* const cbp, const uint32_t wcache_pgsize_sblks,
-                const uint16_t wcache_num_pages, const uint32_t max_dtokpp,
-                const uint32_t max_iowait_us, std::size_t eo = 0);
-        iores enqueue(const void* const data_buff, const std::size_t tot_data_len,
-                const std::size_t this_data_len, data_tok* dtokp, const void* const xid_ptr,
-                const std::size_t xid_len, const bool transient, const bool external);
-        iores dequeue(data_tok* dtokp, const void* const xid_ptr, const std::size_t xid_len,
-                const bool txn_coml_commit);
-        iores abort(data_tok* dtokp, const void* const xid_ptr, const std::size_t xid_len);
-        iores commit(data_tok* dtokp, const void* const xid_ptr, const std::size_t xid_len);
+        void initialize(aio_callback* const cbp,
+                        const uint32_t wcache_pgsize_sblks,
+                        const uint16_t wcache_num_pages,
+                        const uint32_t max_dtokpp,
+                        const uint32_t max_iowait_us,
+                        std::size_t eo = 0);
+        iores enqueue(const void* const data_buff,
+                      const std::size_t tot_data_len,
+                      const std::size_t this_data_len,
+                      data_tok* dtokp,
+                      const void* const xid_ptr,
+                      const std::size_t xid_len,
+                      const bool transient,
+                      const bool external);
+        iores dequeue(data_tok* dtokp,
+                      const void* const xid_ptr,
+                      const std::size_t xid_len,
+                      const bool txn_coml_commit);
+        iores abort(data_tok* dtokp,
+                    const void* const xid_ptr,
+                    const std::size_t xid_len);
+        iores commit(data_tok* dtokp,
+                     const void* const xid_ptr,
+                     const std::size_t xid_len);
         iores flush();
-        int32_t get_events(page_state state, timespec* const timeout, bool flush = false);
+        int32_t get_events(timespec* const timeout,
+                           bool flush);
         bool is_txn_synced(const std::string& xid);
         inline bool curr_pg_blocked() const { return _page_cb_arr[_pg_index]._state != UNUSED; }
         inline uint32_t unflushed_dblks() { return _cached_offset_dblks; }
@@ -108,14 +131,22 @@ namespace qls_jrnl
         const std::string status_str() const;
 
     private:
-        void initialize(aio_callback* const cbp, const uint32_t wcache_pgsize_sblks,
-                const uint16_t wcache_num_pages);
-        iores pre_write_check(const _op_type op, const data_tok* const dtokp,
-                const std::size_t xidsize = 0, const std::size_t dsize = 0, const bool external = false)
-                const;
-        void dequeue_check(const std::string& xid, const uint64_t drid);
-        void file_header_check(const uint64_t rid, const bool cont, const uint32_t rec_dblks_rem);
-        void flush_check(iores& res, bool& cont, bool& done);
+        void initialize(aio_callback* const cbp,
+                        const uint32_t wcache_pgsize_sblks,
+                        const uint16_t wcache_num_pages);
+        iores pre_write_check(const _op_type op,
+                              const data_tok* const dtokp,
+                              const std::size_t xidsize = 0,
+                              const std::size_t dsize = 0,
+                              const bool external = false) const;
+        void dequeue_check(const std::string& xid,
+                           const uint64_t drid);
+        void file_header_check(const uint64_t rid,
+                               const bool cont,
+                               const uint32_t rec_dblks_rem);
+        void flush_check(iores& res,
+                         bool& cont,
+                         bool& done, const uint64_t rid);
         iores write_flush();
         void get_next_file();
         void dblk_roundup();



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org