You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kp...@apache.org on 2013/10/21 23:26:11 UTC

svn commit: r1534383 [3/4] - in /qpid/branches/linearstore/qpid/cpp/src: ./ qpid/broker/ qpid/legacystore/jrnl/ qpid/linearstore/ qpid/linearstore/jrnl/ qpid/linearstore/jrnl/utils/

Modified: qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/deq_rec.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/deq_rec.cpp?rev=1534383&r1=1534382&r2=1534383&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/deq_rec.cpp (original)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/deq_rec.cpp Mon Oct 21 21:26:10 2013
@@ -110,8 +110,8 @@ deq_rec::encode(void* wptr, uint32_t rec
     if (_xidp == 0)
         assert(_deq_hdr._xidsize == 0);
 
-    std::size_t rec_offs = rec_offs_dblks * JRNL_DBLK_SIZE_BYTES;
-    std::size_t rem = max_size_dblks * JRNL_DBLK_SIZE_BYTES;
+    std::size_t rec_offs = rec_offs_dblks * QLS_DBLK_SIZE_BYTES;
+    std::size_t rem = max_size_dblks * QLS_DBLK_SIZE_BYTES;
     std::size_t wr_cnt = 0;
     if (rec_offs_dblks) // Continuation of split dequeue record (over 2 or more pages)
     {
@@ -161,10 +161,10 @@ deq_rec::encode(void* wptr, uint32_t rec
             {
                 std::memcpy((char*)wptr + wr_cnt, (char*)&_deq_tail + rec_offs, wsize);
                 wr_cnt += wsize;
-#ifdef RHM_CLEAN
-                std::size_t rec_offs = rec_offs_dblks * JRNL_DBLK_SIZE_BYTES;
-                std::size_t dblk_rec_size = size_dblks(rec_size() - rec_offs) * JRNL_DBLK_SIZE_BYTES;
-                std::memset((char*)wptr + wr_cnt, RHM_CLEAN_CHAR, dblk_rec_size - wr_cnt);
+#ifdef QLS_CLEAN
+                std::size_t rec_offs = rec_offs_dblks * QLS_DBLK_SIZE_BYTES;
+                std::size_t dblk_rec_size = size_dblks(rec_size() - rec_offs) * QLS_DBLK_SIZE_BYTES;
+                std::memset((char*)wptr + wr_cnt, QLS_CLEAN_CHAR, dblk_rec_size - wr_cnt);
 #endif
             }
             rec_offs -= sizeof(_deq_tail) - wsize;
@@ -205,9 +205,9 @@ deq_rec::encode(void* wptr, uint32_t rec
                 std::memcpy((char*)wptr + wr_cnt, (void*)&_deq_tail, sizeof(_deq_tail));
                 wr_cnt += sizeof(_deq_tail);
             }
-#ifdef RHM_CLEAN
-            std::size_t dblk_rec_size = size_dblks(rec_size()) * JRNL_DBLK_SIZE_BYTES;
-            std::memset((char*)wptr + wr_cnt, RHM_CLEAN_CHAR, dblk_rec_size - wr_cnt);
+#ifdef QLS_CLEAN
+            std::size_t dblk_rec_size = size_dblks(rec_size()) * QLS_DBLK_SIZE_BYTES;
+            std::memset((char*)wptr + wr_cnt, QLS_CLEAN_CHAR, dblk_rec_size - wr_cnt);
 #endif
         }
     }
@@ -226,7 +226,7 @@ deq_rec::decode(rec_hdr_t& h, void* rptr
         const uint32_t hdr_xid_dblks = size_dblks(sizeof(deq_hdr_t) + _deq_hdr._xidsize);
         const uint32_t hdr_xid_tail_dblks = size_dblks(sizeof(deq_hdr_t) + _deq_hdr._xidsize +
                 sizeof(rec_tail_t));
-        const std::size_t rec_offs = rec_offs_dblks * JRNL_DBLK_SIZE_BYTES;
+        const std::size_t rec_offs = rec_offs_dblks * QLS_DBLK_SIZE_BYTES;
 
         if (hdr_xid_tail_dblks - rec_offs_dblks <= max_size_dblks)
         {
@@ -259,7 +259,7 @@ deq_rec::decode(rec_hdr_t& h, void* rptr
             const std::size_t xid_rem = _deq_hdr._xidsize - xid_offs;
             std::memcpy((char*)_buff + xid_offs, rptr, xid_rem);
             rd_cnt += xid_rem;
-            const std::size_t tail_rem = (max_size_dblks * JRNL_DBLK_SIZE_BYTES) - rd_cnt;
+            const std::size_t tail_rem = (max_size_dblks * QLS_DBLK_SIZE_BYTES) - rd_cnt;
             if (tail_rem)
             {
                 std::memcpy((void*)&_deq_tail, ((char*)rptr + xid_rem), tail_rem);
@@ -269,7 +269,7 @@ deq_rec::decode(rec_hdr_t& h, void* rptr
         else
         {
             // Remainder of xid split
-            const std::size_t xid_cp_size = (max_size_dblks * JRNL_DBLK_SIZE_BYTES);
+            const std::size_t xid_cp_size = (max_size_dblks * QLS_DBLK_SIZE_BYTES);
             std::memcpy((char*)_buff + rec_offs - sizeof(deq_hdr_t), rptr, xid_cp_size);
             rd_cnt += xid_cp_size;
         }
@@ -309,7 +309,7 @@ deq_rec::decode(rec_hdr_t& h, void* rptr
                 // Entire header and xid fit within this page, tail split
                 std::memcpy(_buff, (char*)rptr + rd_cnt, _deq_hdr._xidsize);
                 rd_cnt += _deq_hdr._xidsize;
-                const std::size_t tail_rem = (max_size_dblks * JRNL_DBLK_SIZE_BYTES) - rd_cnt;
+                const std::size_t tail_rem = (max_size_dblks * QLS_DBLK_SIZE_BYTES) - rd_cnt;
                 if (tail_rem)
                 {
                     std::memcpy((void*)&_deq_tail, (char*)rptr + rd_cnt, tail_rem);
@@ -319,7 +319,7 @@ deq_rec::decode(rec_hdr_t& h, void* rptr
             else
             {
                 // Header fits within this page, xid split
-                const std::size_t xid_cp_size = (max_size_dblks * JRNL_DBLK_SIZE_BYTES) - rd_cnt;
+                const std::size_t xid_cp_size = (max_size_dblks * QLS_DBLK_SIZE_BYTES) - rd_cnt;
                 std::memcpy(_buff, (char*)rptr + rd_cnt, xid_cp_size);
                 rd_cnt += xid_cp_size;
             }
@@ -381,7 +381,7 @@ deq_rec::rcv_decode(rec_hdr_t h, std::if
             return false;
         }
     }
-    ifsp->ignore(rec_size_dblks() * JRNL_DBLK_SIZE_BYTES - rec_size());
+    ifsp->ignore(rec_size_dblks() * QLS_DBLK_SIZE_BYTES - rec_size());
     if (_deq_hdr._xidsize)
         chk_tail(); // Throws if tail invalid or record incomplete
     assert(!ifsp->fail() && !ifsp->bad());

Modified: qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/enq_map.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/enq_map.cpp?rev=1534383&r1=1534382&r2=1534383&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/enq_map.cpp (original)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/enq_map.cpp Mon Oct 21 21:26:10 2013
@@ -47,13 +47,13 @@ enq_map::~enq_map() {}
 
 
 short
-enq_map::insert_pfid(const uint64_t rid, const uint16_t pfid, const std::streampos file_posn)
+enq_map::insert_pfid(const uint64_t rid, const uint64_t pfid, const std::streampos file_posn)
 {
     return insert_pfid(rid, pfid, file_posn, false);
 }
 
 short
-enq_map::insert_pfid(const uint64_t rid, const uint16_t pfid, const std::streampos file_posn, const bool locked)
+enq_map::insert_pfid(const uint64_t rid, const uint64_t pfid, const std::streampos file_posn, const bool locked)
 {
     std::pair<emap_itr, bool> ret;
     emap_data_struct_t rec(pfid, file_posn, locked);
@@ -67,7 +67,7 @@ enq_map::insert_pfid(const uint64_t rid,
 }
 
 short
-enq_map::get_pfid(const uint64_t rid, int16_t& pfid)
+enq_map::get_pfid(const uint64_t rid, uint64_t& pfid)
 {
     slock s(_mutex);
     emap_itr itr = _map.find(rid);
@@ -80,7 +80,7 @@ enq_map::get_pfid(const uint64_t rid, in
 }
 
 short
-enq_map::get_remove_pfid(const uint64_t rid, int16_t& pfid, const bool txn_flag)
+enq_map::get_remove_pfid(const uint64_t rid, uint64_t& pfid, const bool txn_flag)
 {
     slock s(_mutex);
     emap_itr itr = _map.find(rid);
@@ -173,7 +173,7 @@ enq_map::rid_list(std::vector<uint64_t>&
 }
 
 void
-enq_map::pfid_list(std::vector<uint16_t>& fv)
+enq_map::pfid_list(std::vector<uint64_t>& fv)
 {
     fv.clear();
     {

Modified: qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/enq_map.h
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/enq_map.h?rev=1534383&r1=1534382&r2=1534383&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/enq_map.h (original)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/enq_map.h Mon Oct 21 21:26:10 2013
@@ -72,11 +72,11 @@ namespace qls_jrnl
         static short EMAP_TRUE;
 
         typedef struct emap_data_struct_t {
-            uint16_t        _pfid;
+            uint64_t        _pfid;
             std::streampos  _file_posn;
             bool            _lock;
             emap_data_struct_t() : _pfid(0), _file_posn(0), _lock(false) {}
-            emap_data_struct_t(const uint16_t pfid, const std::streampos file_posn, const bool lock) : _pfid(pfid), _file_posn(file_posn), _lock(lock) {}
+            emap_data_struct_t(const uint64_t pfid, const std::streampos file_posn, const bool lock) : _pfid(pfid), _file_posn(file_posn), _lock(lock) {}
         } emqp_data_struct_t;
         typedef std::pair<uint64_t, emap_data_struct_t> emap_param;
         typedef std::map<uint64_t, emap_data_struct_t> emap;
@@ -85,19 +85,15 @@ namespace qls_jrnl
     private:
         emap _map;
         smutex _mutex;
-//        std::vector<uint32_t> _pfid_enq_cnt;
 
     public:
         enq_map();
         virtual ~enq_map();
 
-//        void set_num_jfiles(const uint16_t num_jfiles);
-//        inline uint32_t get_enq_cnt(const uint16_t pfid) const { return _pfid_enq_cnt.at(pfid); };
-
-        short insert_pfid(const uint64_t rid, const uint16_t pfid, const std::streampos file_posn); // 0=ok; -3=duplicate rid;
-        short insert_pfid(const uint64_t rid, const uint16_t pfid, const std::streampos file_posn, const bool locked); // 0=ok; -3=duplicate rid;
-        short get_pfid(const uint64_t rid, int16_t& pfid); // >=0=pfid; -1=rid not found; -2=locked
-        short get_remove_pfid(const uint64_t rid, int16_t& pfid, const bool txn_flag = false); // >=0=pfid; -1=rid not found; -2=locked
+        short insert_pfid(const uint64_t rid, const uint64_t pfid, const std::streampos file_posn); // 0=ok; -3=duplicate rid;
+        short insert_pfid(const uint64_t rid, const uint64_t pfid, const std::streampos file_posn, const bool locked); // 0=ok; -3=duplicate rid;
+        short get_pfid(const uint64_t rid, uint64_t& pfid); // >=0=pfid; -1=rid not found; -2=locked
+        short get_remove_pfid(const uint64_t rid, uint64_t& pfid, const bool txn_flag = false); // >=0=pfid; -1=rid not found; -2=locked
         short get_file_posn(const uint64_t rid, std::streampos& file_posn); // -1=rid not found; -2=locked
         short get_data(const uint64_t rid, emap_data_struct_t& eds);
         bool is_enqueued(const uint64_t rid, bool ignore_lock = false);
@@ -108,7 +104,7 @@ namespace qls_jrnl
         inline bool empty() const { return _map.empty(); }
         inline uint32_t size() const { return uint32_t(_map.size()); }
         void rid_list(std::vector<uint64_t>& rv);
-        void pfid_list(std::vector<uint16_t>& fv);
+        void pfid_list(std::vector<uint64_t>& fv);
     };
 
 }}

Modified: qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/enq_rec.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/enq_rec.cpp?rev=1534383&r1=1534382&r2=1534383&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/enq_rec.cpp (original)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/enq_rec.cpp Mon Oct 21 21:26:10 2013
@@ -109,8 +109,8 @@ enq_rec::encode(void* wptr, uint32_t rec
     if (_xidp == 0)
         assert(_enq_hdr._xidsize == 0);
 
-    std::size_t rec_offs = rec_offs_dblks * JRNL_DBLK_SIZE_BYTES;
-    std::size_t rem = max_size_dblks * JRNL_DBLK_SIZE_BYTES;
+    std::size_t rec_offs = rec_offs_dblks * QLS_DBLK_SIZE_BYTES;
+    std::size_t rem = max_size_dblks * QLS_DBLK_SIZE_BYTES;
     std::size_t wr_cnt = 0;
     if (rec_offs_dblks) // Continuation of split data record (over 2 or more pages)
     {
@@ -181,10 +181,10 @@ enq_rec::encode(void* wptr, uint32_t rec
             {
                 std::memcpy((char*)wptr + wr_cnt, (char*)&_enq_tail + rec_offs, wsize);
                 wr_cnt += wsize;
-#ifdef RHM_CLEAN
-                std::size_t rec_offs = rec_offs_dblks * JRNL_DBLK_SIZE_BYTES;
-                std::size_t dblk_rec_size = size_dblks(rec_size() - rec_offs) * JRNL_DBLK_SIZE_BYTES;
-                std::memset((char*)wptr + wr_cnt, RHM_CLEAN_CHAR, dblk_rec_size - wr_cnt);
+#ifdef QLS_CLEAN
+                std::size_t rec_offs = rec_offs_dblks * QLS_DBLK_SIZE_BYTES;
+                std::size_t dblk_rec_size = size_dblks(rec_size() - rec_offs) * QLS_DBLK_SIZE_BYTES;
+                std::memset((char*)wptr + wr_cnt, QLS_CLEAN_CHAR, dblk_rec_size - wr_cnt);
 #endif
             }
             rec_offs -= sizeof(_enq_tail) - wsize;
@@ -237,9 +237,9 @@ enq_rec::encode(void* wptr, uint32_t rec
             }
             std::memcpy((char*)wptr + wr_cnt, (void*)&_enq_tail, sizeof(_enq_tail));
             wr_cnt += sizeof(_enq_tail);
-#ifdef RHM_CLEAN
-            std::size_t dblk_rec_size = size_dblks(rec_size()) * JRNL_DBLK_SIZE_BYTES;
-            std::memset((char*)wptr + wr_cnt, RHM_CLEAN_CHAR, dblk_rec_size - wr_cnt);
+#ifdef QLS_CLEAN
+            std::size_t dblk_rec_size = size_dblks(rec_size()) * QLS_DBLK_SIZE_BYTES;
+            std::memset((char*)wptr + wr_cnt, QLS_CLEAN_CHAR, dblk_rec_size - wr_cnt);
 #endif
         }
     }
@@ -260,7 +260,7 @@ enq_rec::decode(rec_hdr_t& h, void* rptr
         const uint32_t hdr_xid_data_tail_size = hdr_xid_data_size + sizeof(rec_tail_t);
         const uint32_t hdr_data_dblks = size_dblks(hdr_xid_data_size);
         const uint32_t hdr_tail_dblks = size_dblks(hdr_xid_data_tail_size);
-        const std::size_t rec_offs = rec_offs_dblks * JRNL_DBLK_SIZE_BYTES;
+        const std::size_t rec_offs = rec_offs_dblks * QLS_DBLK_SIZE_BYTES;
         const std::size_t offs = rec_offs - sizeof(enq_hdr_t);
 
         if (hdr_tail_dblks - rec_offs_dblks <= max_size_dblks)
@@ -331,7 +331,7 @@ enq_rec::decode(rec_hdr_t& h, void* rptr
                 std::memcpy((char*)_buff + offs, rptr, data_rem);
                 rd_cnt += data_rem;
             }
-            const std::size_t tail_rem = (max_size_dblks * JRNL_DBLK_SIZE_BYTES) - rd_cnt;
+            const std::size_t tail_rem = (max_size_dblks * QLS_DBLK_SIZE_BYTES) - rd_cnt;
             if (tail_rem)
             {
                 std::memcpy((void*)&_enq_tail, ((char*)rptr + rd_cnt), tail_rem);
@@ -341,7 +341,7 @@ enq_rec::decode(rec_hdr_t& h, void* rptr
         else
         {
             // Since xid and data are contiguous, both fit within current page - copy whole page
-            const std::size_t data_cp_size = (max_size_dblks * JRNL_DBLK_SIZE_BYTES);
+            const std::size_t data_cp_size = (max_size_dblks * QLS_DBLK_SIZE_BYTES);
             std::memcpy((char*)_buff + offs, rptr, data_cp_size);
             rd_cnt += data_cp_size;
         }
@@ -405,7 +405,7 @@ enq_rec::decode(rec_hdr_t& h, void* rptr
                             _enq_hdr._dsize);
                     rd_cnt += _enq_hdr._dsize;
                 }
-                const std::size_t tail_rem = (max_size_dblks * JRNL_DBLK_SIZE_BYTES) - rd_cnt;
+                const std::size_t tail_rem = (max_size_dblks * QLS_DBLK_SIZE_BYTES) - rd_cnt;
                 if (tail_rem)
                 {
                     std::memcpy((void*)&_enq_tail, (char*)rptr + rd_cnt, tail_rem);
@@ -422,7 +422,7 @@ enq_rec::decode(rec_hdr_t& h, void* rptr
                 }
                 if (_enq_hdr._dsize && !::is_enq_external(&_enq_hdr))
                 {
-                    const std::size_t data_cp_size = (max_size_dblks * JRNL_DBLK_SIZE_BYTES) - rd_cnt;
+                    const std::size_t data_cp_size = (max_size_dblks * QLS_DBLK_SIZE_BYTES) - rd_cnt;
                     std::memcpy((char*)_buff + _enq_hdr._xidsize, (char*)rptr + rd_cnt, data_cp_size);
                     rd_cnt += data_cp_size;
                 }
@@ -430,7 +430,7 @@ enq_rec::decode(rec_hdr_t& h, void* rptr
             else
             {
                 // Header fits within this page, xid split or separated
-                const std::size_t data_cp_size = (max_size_dblks * JRNL_DBLK_SIZE_BYTES) - rd_cnt;
+                const std::size_t data_cp_size = (max_size_dblks * QLS_DBLK_SIZE_BYTES) - rd_cnt;
                 std::memcpy(_buff, (char*)rptr + rd_cnt, data_cp_size);
                 rd_cnt += data_cp_size;
             }
@@ -516,7 +516,7 @@ enq_rec::rcv_decode(rec_hdr_t h, std::if
             return false;
         }
     }
-    ifsp->ignore(rec_size_dblks() * JRNL_DBLK_SIZE_BYTES - rec_size());
+    ifsp->ignore(rec_size_dblks() * QLS_DBLK_SIZE_BYTES - rec_size());
     chk_tail(); // Throws if tail invalid or record incomplete
     assert(!ifsp->fail() && !ifsp->bad());
     return true;

Modified: qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/enums.h
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/enums.h?rev=1534383&r1=1534382&r2=1534383&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/enums.h (original)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/enums.h Mon Oct 21 21:26:10 2013
@@ -40,9 +40,9 @@ namespace qls_jrnl
 //        RHM_IORES_RCINVALID,    ///< Read page cache is invalid (ie obsolete or uninitialized)
 //        RHM_IORES_ENQCAPTHRESH, ///< Enqueue capacity threshold (limit) reached.
 //        RHM_IORES_FULL,         ///< During write operations, the journal files are full.
-        RHM_IORES_BUSY,         ///< Another blocking operation is in progress.
-        RHM_IORES_TXPENDING,    ///< Operation blocked by pending transaction.
-        RHM_IORES_NOTIMPL       ///< Function is not implemented.
+//        RHM_IORES_BUSY,         ///< Another blocking operation is in progress.
+        RHM_IORES_TXPENDING     ///< Operation blocked by pending transaction.
+//        RHM_IORES_NOTIMPL       ///< Function is not implemented.
     };
     typedef _iores iores;
 
@@ -57,9 +57,9 @@ namespace qls_jrnl
 //            case RHM_IORES_RCINVALID: return "RHM_IORES_RCINVALID";
 //            case RHM_IORES_ENQCAPTHRESH: return "RHM_IORES_ENQCAPTHRESH";
 //            case RHM_IORES_FULL: return "RHM_IORES_FULL";
-            case RHM_IORES_BUSY: return "RHM_IORES_BUSY";
+//            case RHM_IORES_BUSY: return "RHM_IORES_BUSY";
             case RHM_IORES_TXPENDING: return "RHM_IORES_TXPENDING";
-            case RHM_IORES_NOTIMPL: return "RHM_IORES_NOTIMPL";
+//            case RHM_IORES_NOTIMPL: return "RHM_IORES_NOTIMPL";
         }
         return "<iores unknown>";
     }

Modified: qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/jcfg.h
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/jcfg.h?rev=1534383&r1=1534382&r2=1534383&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/jcfg.h (original)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/jcfg.h Mon Oct 21 21:26:10 2013
@@ -22,48 +22,37 @@
 #ifndef QPID_LEGACYSTORE_JRNL_JCFG_H
 #define QPID_LEGACYSTORE_JRNL_JCFG_H
 
+#define QLS_SBLK_SIZE_BYTES             4096        /**< Disk softblock size in bytes, should match size used on disk media */
+#define QLS_AIO_ALIGN_BOUNDARY_BYTES    QLS_SBLK_SIZE_BYTES /** Memory alignment boundary used for DMA */
 /**
-* <b>Rule:</b> Data block size (JRNL_DBLK_SIZE_BYTES) MUST be a power of 2 AND
-* a power of 2 factor of the disk softblock size (JRNL_SBLK_SIZE_BYTES):
+* <b>Rule:</b> Data block size (QLS_DBLK_SIZE_BYTES) MUST be a power of 2 AND
+* a power of 2 factor of the disk softblock size (QLS_SBLK_SIZE_BYTES):
 * <pre>
-* n * JRNL_DBLK_SIZE_BYTES == JRNL_SBLK_SIZE_BYTES (n = 1,2,4,8...)
+* n * QLS_DBLK_SIZE_BYTES == QLS_SBLK_SIZE_BYTES (n = 1,2,4,8...)
 * </pre>
 */
-#define JRNL_SBLK_SIZE_BYTES           4096        /**< Disk softblock size in bytes */
-#define QLS_AIO_ALIGN_BOUNDARY         JRNL_SBLK_SIZE_BYTES
-#define JRNL_DBLK_SIZE_BYTES           128         /**< Data block size in bytes (CANNOT BE LESS THAN 32!) */
-#define JRNL_SBLK_SIZE_DBLKS           (JRNL_SBLK_SIZE_BYTES / JRNL_DBLK_SIZE_BYTES)          /**< Disk softblock size in multiples of JRNL_DBLK_SIZE */
-#define JRNL_SBLK_SIZE_KIB             (JRNL_SBLK_SIZE_BYTES / 1024) /**< Disk softblock size in KiB */
-//#define JRNL_MIN_FILE_SIZE      128         ///< Min. jrnl file size in sblks (excl. file_hdr)
-//#define JRNL_MAX_FILE_SIZE      4194176     ///< Max. jrnl file size in sblks (excl. file_hdr)
-//#define JRNL_MIN_NUM_FILES      4           ///< Min. number of journal files
-//#define JRNL_MAX_NUM_FILES      64          ///< Max. number of journal files
-//#define JRNL_ENQ_THRESHOLD      80          ///< Percent full when enqueue connection will be closed
-//
-//#define JRNL_RMGR_PAGE_SIZE     128         ///< Journal page size in softblocks
-//#define JRNL_RMGR_PAGES         16          ///< Number of pages to use in wmgr
-//
-#define JRNL_WMGR_DEF_PAGE_SIZE_KIB    32
-#define JRNL_WMGR_DEF_PAGE_SIZE_SBLKS  (JRNL_WMGR_DEF_PAGE_SIZE_KIB / JRNL_SBLK_SIZE_KIB)          ///< Journal write page size in softblocks (default)
-#define JRNL_WMGR_DEF_PAGES            32          ///< Number of pages to use in wmgr (default)
-//
-#define JRNL_WMGR_MAXDTOKPP            1024        ///< Max. dtoks (data blocks) per page in wmgr
-#define JRNL_WMGR_MAXWAITUS            100         ///< Max. wait time (us) before submitting AIO
-//
-//#define JRNL_INFO_EXTENSION     "jinf"      ///< Extension for journal info files
-//#define JRNL_DATA_EXTENSION     "jdat"      ///< Extension for journal data files
-#define QLS_JRNL_FILE_EXTENSION        ".jrnl"     /**< Extension for journal data files */
-#define QLS_TXA_MAGIC                  0x61534c51  /**< ("RHMa" in little endian) Magic for dtx abort hdrs */
-#define QLS_TXC_MAGIC                  0x63534c51  /**< ("RHMc" in little endian) Magic for dtx commit hdrs */
-#define QLS_DEQ_MAGIC                  0x64534c51  /**< ("QLSd" in little endian) Magic for deq rec hdrs */
-#define QLS_ENQ_MAGIC                  0x65534c51  /**< ("QLSe" in little endian) Magic for enq rec hdrs */
-#define QLS_FILE_MAGIC                 0x66534c51  /**< ("QLSf" in little endian) Magic for file hdrs */
-#define QLS_EMPTY_MAGIC                0x78534c51  /**< ("QLSx" in little endian) Magic for empty dblk */
-#define QLS_JRNL_VERSION               2           /**< Version (of file layout) */
-#define QLS_JRNL_FHDR_RES_SIZE_SBLKS   1           /**< Journal file header reserved size in sblks (as defined by JRNL_SBLK_SIZE_BYTES) */
-#define QLS_CLEAN_CHAR                 0xff        /**< Char used to clear empty space on disk */
-//
-//#define RHM_LENDIAN_FLAG 0      ///< Value of little endian flag on disk
-//#define RHM_BENDIAN_FLAG 1      ///< Value of big endian flag on disk
+#define QLS_DBLK_SIZE_BYTES             128         /**< Data block size in bytes (CANNOT BE LESS THAN 32!) */
+#define QLS_SBLK_SIZE_DBLKS             (QLS_SBLK_SIZE_BYTES / QLS_DBLK_SIZE_BYTES) /**< Disk softblock size in multiples of QLS_DBLK_SIZE_BYTES */
+#define QLS_SBLK_SIZE_KIB               (QLS_SBLK_SIZE_BYTES / 1024) /**< Disk softblock size in KiB */
 
-#endif // ifndef QPID_LEGACYSTORE_JRNL_JCFG_H
+#define QLS_WMGR_DEF_PAGE_SIZE_KIB      32          /**< Journal write page size in KiB (default) */
+#define QLS_WMGR_DEF_PAGE_SIZE_SBLKS    (QLS_WMGR_DEF_PAGE_SIZE_KIB / QLS_SBLK_SIZE_KIB) /**< Journal write page size in softblocks (default) */
+#define QLS_WMGR_DEF_PAGES              32          /**< Number of pages to use in wmgr (default) */
+
+#define QLS_WMGR_MAXDTOKPP              1024        /**< Max. dtoks (data blocks) per page in wmgr */
+#define QLS_WMGR_MAXWAITUS              100         /**< Max. wait time (us) before submitting AIO */
+
+#define QLS_JRNL_FILE_EXTENSION         ".jrnl"     /**< Extension for journal data files */
+#define QLS_TXA_MAGIC                   0x61534c51  /**< ("RHMa" in little endian) Magic for dtx abort hdrs */
+#define QLS_TXC_MAGIC                   0x63534c51  /**< ("RHMc" in little endian) Magic for dtx commit hdrs */
+#define QLS_DEQ_MAGIC                   0x64534c51  /**< ("QLSd" in little endian) Magic for deq rec hdrs */
+#define QLS_ENQ_MAGIC                   0x65534c51  /**< ("QLSe" in little endian) Magic for enq rec hdrs */
+#define QLS_FILE_MAGIC                  0x66534c51  /**< ("QLSf" in little endian) Magic for file hdrs */
+#define QLS_EMPTY_MAGIC                 0x78534c51  /**< ("QLSx" in little endian) Magic for empty dblk */
+#define QLS_JRNL_VERSION                2           /**< Version (of file layout) */
+#define QLS_JRNL_FHDR_RES_SIZE_SBLKS    1           /**< Journal file header reserved size in sblks (as defined by QLS_SBLK_SIZE_BYTES) */
+
+#define QLS_CLEAN                                   /**< If defined, writes QLS_CLEAN_CHAR to all filled areas on disk */
+#define QLS_CLEAN_CHAR                  0xff        /**< Char used to clear empty space on disk */
+
+#endif /* ifndef QPID_LEGACYSTORE_JRNL_JCFG_H */

Modified: qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/jcntl.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/jcntl.cpp?rev=1534383&r1=1534382&r2=1534383&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/jcntl.cpp (original)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/jcntl.cpp Mon Oct 21 21:26:10 2013
@@ -31,10 +31,8 @@
 #include <iostream>
 #include <qpid/linearstore/jrnl/EmptyFilePool.h>
 #include <qpid/linearstore/jrnl/EmptyFilePoolManager.h>
-//#include "qpid/linearstore/jrnl/file_hdr.h"
 #include "qpid/linearstore/jrnl/jerrno.h"
-//#include "qpid/linearstore/jrnl/jinf.h"
-//#include "qpid/linearstore/jrnl/JournalFileController.h"
+#include "qpid/linearstore/jrnl/JournalLog.h"
 #include "qpid/linearstore/jrnl/utils/enq_hdr.h"
 #include <limits>
 #include <sstream>
@@ -66,25 +64,21 @@ bool jcntl::init_statics()
 
 // Functions
 
-jcntl::jcntl(const std::string& jid, const std::string& jdir/*, const std::string& base_filename*/):
+jcntl::jcntl(const std::string& jid,
+             const std::string& jdir,
+             JournalLog& jrnl_log):
     _jid(jid),
-    _jdir(jdir/*, base_filename*/),
-//    _base_filename(base_filename),
+    _jdir(jdir),
     _init_flag(false),
     _stop_flag(false),
     _readonly_flag(false),
-//    _autostop(true),
+    _jrnl_log(jrnl_log),
     _linearFileController(*this),
     _emptyFilePoolPtr(0),
-//    _jfsize_sblks(0),
-//    _lpmgr(),
     _emap(),
     _tmap(),
-//    _rrfc(&_lpmgr),
-//    _wrfc(&_lpmgr),
-//    _rmgr(this, _emap, _tmap/*, _rrfc*/),
-    _wmgr(this, _emap, _tmap, _linearFileController/*, _wrfc*/),
-    _rcvdat()
+    _wmgr(this, _emap, _tmap, _linearFileController),
+    _recoveryManager(_jdir.dirname(), _jid, _emap, _tmap, jrnl_log)
 {}
 
 jcntl::~jcntl()
@@ -92,14 +86,14 @@ jcntl::~jcntl()
     if (_init_flag && !_stop_flag)
         try { stop(true); }
         catch (const jexception& e) { std::cerr << e << std::endl; }
-//    _lpmgr.finalize();
     _linearFileController.finalize();
 }
 
 void
-jcntl::initialize(/*const uint16_t num_jfiles, const bool ae, const uint16_t ae_max_jfiles,
-        const uint32_t jfsize_sblks,*/ EmptyFilePool* efpp, const uint16_t wcache_num_pages, const uint32_t wcache_pgsize_sblks,
-        aio_callback* const cbp)
+jcntl::initialize(EmptyFilePool* efpp,
+                  const uint16_t wcache_num_pages,
+                  const uint32_t wcache_pgsize_sblks,
+                  aio_callback* const cbp)
 {
     _init_flag = false;
     _stop_flag = false;
@@ -126,14 +120,14 @@ jcntl::initialize(/*const uint16_t num_j
     _jdir.clear_dir();
 //    _lpmgr.initialize(num_jfiles, ae, ae_max_jfiles, this, &new_fcntl); // Creates new journal files
 
-    _linearFileController.initialize(_jdir.dirname(), efpp);
+    _linearFileController.initialize(_jdir.dirname(), efpp, 0ULL);
     _linearFileController.pullEmptyFileFromEfp();
-    std::cout << _linearFileController.status(2);
+//    std::cout << _linearFileController.status(2);
 //    _wrfc.initialize(_jfsize_sblks);
 //    _rrfc.initialize();
 //    _rrfc.set_findex(0);
 //    _rmgr.initialize(cbp);
-    _wmgr.initialize(cbp, wcache_pgsize_sblks, wcache_num_pages, JRNL_WMGR_MAXDTOKPP, JRNL_WMGR_MAXWAITUS);
+    _wmgr.initialize(cbp, wcache_pgsize_sblks, wcache_num_pages, QLS_WMGR_MAXDTOKPP, QLS_WMGR_MAXWAITUS);
 
     // Write info file (<basename>.jinf) to disk
 //    write_infofile();
@@ -142,7 +136,7 @@ jcntl::initialize(/*const uint16_t num_j
 }
 
 void
-jcntl::recover(EmptyFilePoolManager* efpm,
+jcntl::recover(EmptyFilePoolManager* efpmp,
                const uint16_t wcache_num_pages,
                const uint32_t wcache_pgsize_sblks,
                aio_callback* const cbp,
@@ -170,21 +164,26 @@ jcntl::recover(EmptyFilePoolManager* efp
     _jdir.verify_dir();
 //    _rcvdat.reset(num_jfiles/*, ae, ae_max_jfiles*/);
 
-    rcvr_janalyze(prep_txn_list_ptr, efpm);
-    highest_rid = _rcvdat._h_rid;
+//    rcvr_janalyze(prep_txn_list_ptr, efpm);
+    efpIdentity_t efpIdentity;
+    _recoveryManager.analyzeJournals(prep_txn_list_ptr, efpmp, &_emptyFilePoolPtr);
+
+    highest_rid = _recoveryManager.getHighestRecordId();
 //    if (_rcvdat._jfull)
 //        throw jexception(jerrno::JERR_JCNTL_RECOVERJFULL, "jcntl", "recover");
-    this->log(/*LOG_DEBUG*/LOG_INFO, _jid, _rcvdat.to_log(_jid));
+    _jrnl_log.log(/*LOG_DEBUG*/JournalLog::LOG_INFO, _jid, _recoveryManager.toString(_jid, true));
 
 //    _lpmgr.recover(_rcvdat, this, &new_fcntl);
 
-    _linearFileController.initialize(_jdir.dirname(), _emptyFilePoolPtr);
+    _linearFileController.initialize(_jdir.dirname(), _emptyFilePoolPtr, _recoveryManager.getHighestFileNumber());
+//    _linearFileController.setFileNumberCounter(_recoveryManager.getHighestFileNumber());
+    _recoveryManager.setLinearFileControllerJournals(&qpid::qls_jrnl::LinearFileController::addJournalFile, &_linearFileController);
 //    _wrfc.initialize(_jfsize_sblks, &_rcvdat);
 //    _rrfc.initialize();
 //    _rrfc.set_findex(_rcvdat.ffid());
 //    _rmgr.initialize(cbp);
-    _wmgr.initialize(cbp, wcache_pgsize_sblks, wcache_num_pages, JRNL_WMGR_MAXDTOKPP, JRNL_WMGR_MAXWAITUS,
-            (_rcvdat._lffull ? 0 : _rcvdat._eo));
+    _wmgr.initialize(cbp, wcache_pgsize_sblks, wcache_num_pages, QLS_WMGR_MAXDTOKPP, QLS_WMGR_MAXWAITUS,
+            (_recoveryManager.isLastFileFull() ? 0 : _recoveryManager.getEndOffset()));
 
     _readonly_flag = true;
     _init_flag = true;
@@ -214,8 +213,11 @@ jcntl::delete_jrnl_files()
 
 
 iores
-jcntl::enqueue_data_record(const void* const data_buff, const std::size_t tot_data_len,
-        const std::size_t this_data_len, data_tok* dtokp, const bool transient)
+jcntl::enqueue_data_record(const void* const data_buff,
+                           const std::size_t tot_data_len,
+                           const std::size_t this_data_len,
+                           data_tok* dtokp,
+                           const bool transient)
 {
     iores r;
     check_wstatus("enqueue_data_record");
@@ -228,7 +230,9 @@ jcntl::enqueue_data_record(const void* c
 }
 
 iores
-jcntl::enqueue_extern_data_record(const std::size_t tot_data_len, data_tok* dtokp, const bool transient)
+jcntl::enqueue_extern_data_record(const std::size_t tot_data_len,
+                                  data_tok* dtokp,
+                                  const bool transient)
 {
     iores r;
     check_wstatus("enqueue_extern_data_record");
@@ -240,9 +244,12 @@ jcntl::enqueue_extern_data_record(const 
 }
 
 iores
-jcntl::enqueue_txn_data_record(const void* const data_buff, const std::size_t tot_data_len,
-        const std::size_t this_data_len, data_tok* dtokp, const std::string& xid,
-        const bool transient)
+jcntl::enqueue_txn_data_record(const void* const data_buff,
+                               const std::size_t tot_data_len,
+                               const std::size_t this_data_len,
+                               data_tok* dtokp,
+                               const std::string& xid,
+                               const bool transient)
 {
     iores r;
     check_wstatus("enqueue_tx_data_record");
@@ -255,8 +262,10 @@ jcntl::enqueue_txn_data_record(const voi
 }
 
 iores
-jcntl::enqueue_extern_txn_data_record(const std::size_t tot_data_len, data_tok* dtokp,
-        const std::string& xid, const bool transient)
+jcntl::enqueue_extern_txn_data_record(const std::size_t tot_data_len,
+                                      data_tok* dtokp,
+                                      const std::string& xid,
+                                      const bool transient)
 {
     iores r;
     check_wstatus("enqueue_extern_txn_data_record");
@@ -268,27 +277,21 @@ jcntl::enqueue_extern_txn_data_record(co
     return r;
 }
 
-/* TODO
-iores
-jcntl::get_data_record(const uint64_t& rid, const std::size_t& dsize, const std::size_t& dsize_avail,
-        const void** const data, bool auto_discard)
-{
-    check_rstatus("get_data_record");
-    return _rmgr.get(rid, dsize, dsize_avail, data, auto_discard);
-} */
-
-/* TODO
 iores
-jcntl::discard_data_record(data_tok* const dtokp)
-{
-    check_rstatus("discard_data_record");
-    return _rmgr.discard(dtokp);
-} */
-
-iores
-jcntl::read_data_record(void** const datapp, std::size_t& dsize, void** const xidpp, std::size_t& xidsize,
-        bool& transient, bool& external, data_tok* const dtokp, bool /*ignore_pending_txns*/)
+jcntl::read_data_record(void** const datapp,
+                        std::size_t& dsize,
+                        void** const xidpp,
+                        std::size_t& xidsize,
+                        bool& transient,
+                        bool& external,
+                        data_tok* const dtokp,
+                        bool ignore_pending_txns)
 {
+    check_rstatus("read_data");
+    if (_recoveryManager.readNextRemainingRecord(datapp, dsize, xidpp, xidsize, transient, external, dtokp, ignore_pending_txns))
+        return RHM_IORES_SUCCESS;
+    return RHM_IORES_EMPTY;
+/*
     if (!dtokp->is_readable()) {
         std::ostringstream oss;
         oss << std::hex << std::setfill('0');
@@ -303,18 +306,18 @@ jcntl::read_data_record(void** const dat
     for (std::vector<uint64_t>::const_iterator i=ridl.begin(); i!=ridl.end(); ++i) {
         short res = _emap.get_data(*i, eds);
         if (res == enq_map::EMAP_OK) {
-            std::ifstream ifs(_rcvdat._fm[eds._pfid].c_str(), std::ifstream::in | std::ifstream::binary);
+            std::ifstream ifs(_recoveryManager._fm[eds._pfid].c_str(), std::ifstream::in | std::ifstream::binary);
             if (!ifs.good()) {
                 std::ostringstream oss;
-                oss << "rid=" << (*i) << " pfid=" << eds._pfid << " file=" << _rcvdat._fm[eds._pfid] << " file_posn=" << eds._file_posn;
-                throw jexception(jerrno::JERR_JCNTL_OPENRD, oss.str(), "jcntl", "read_data_record");
+                oss << "rid=" << (*i) << " pfid=" << eds._pfid << " file=" << _recoveryManager._fm[eds._pfid] << " file_posn=" << eds._file_posn;
+                throw jexception(jerrno::JERR_RCVM_OPENRD, oss.str(), "jcntl", "read_data_record");
             }
             ifs.seekg(eds._file_posn, std::ifstream::beg);
             ::enq_hdr_t eh;
             ifs.read((char*)&eh, sizeof(::enq_hdr_t));
             if (!::validate_enq_hdr(&eh, QLS_ENQ_MAGIC, QLS_JRNL_VERSION, *i)) {
                 std::ostringstream oss;
-                oss << "rid=" << (*i) << " pfid=" << eds._pfid << " file=" << _rcvdat._fm[eds._pfid] << " file_posn=" << eds._file_posn;
+                oss << "rid=" << (*i) << " pfid=" << eds._pfid << " file=" << _recoveryManager._fm[eds._pfid] << " file_posn=" << eds._file_posn;
                 throw jexception(jerrno::JERR_JCNTL_INVALIDENQHDR, oss.str(), "jcntl", "read_data_record");
             }
             dsize = eh._dsize;
@@ -335,6 +338,7 @@ jcntl::read_data_record(void** const dat
             }
         }
     }
+*/
 /*
     check_rstatus("read_data");
     iores res = _rmgr.read(datapp, dsize, xidpp, xidsize, transient, external, dtokp, ignore_pending_txns);
@@ -354,7 +358,8 @@ jcntl::read_data_record(void** const dat
 }
 
 iores
-jcntl::dequeue_data_record(data_tok* const dtokp, const bool txn_coml_commit)
+jcntl::dequeue_data_record(data_tok* const dtokp,
+                           const bool txn_coml_commit)
 {
     iores r;
     check_wstatus("dequeue_data");
@@ -366,7 +371,9 @@ jcntl::dequeue_data_record(data_tok* con
 }
 
 iores
-jcntl::dequeue_txn_data_record(data_tok* const dtokp, const std::string& xid, const bool txn_coml_commit)
+jcntl::dequeue_txn_data_record(data_tok* const dtokp,
+                               const std::string& xid,
+                               const bool txn_coml_commit)
 {
     iores r;
     check_wstatus("dequeue_data");
@@ -378,7 +385,8 @@ jcntl::dequeue_txn_data_record(data_tok*
 }
 
 iores
-jcntl::txn_abort(data_tok* const dtokp, const std::string& xid)
+jcntl::txn_abort(data_tok* const dtokp,
+                 const std::string& xid)
 {
     iores r;
     check_wstatus("txn_abort");
@@ -390,7 +398,8 @@ jcntl::txn_abort(data_tok* const dtokp, 
 }
 
 iores
-jcntl::txn_commit(data_tok* const dtokp, const std::string& xid)
+jcntl::txn_commit(data_tok* const dtokp,
+                  const std::string& xid)
 {
     iores r;
     check_wstatus("txn_commit");
@@ -415,18 +424,9 @@ jcntl::get_wr_events(timespec* const tim
     stlock t(_wr_mutex);
     if (!t.locked())
         return jerrno::LOCK_TAKEN;
-    int32_t res = _wmgr.get_events(pmgr::UNUSED, timeout);
-    return res;
+    return _wmgr.get_events(timeout, false);
 }
 
-/*
-int32_t
-jcntl::get_rd_events(timespec* const timeout)
-{
-    return _rmgr.get_events(pmgr::AIO_COMPLETE, timeout);
-}
-*/
-
 void
 jcntl::stop(const bool block_till_aio_cmpl)
 {
@@ -462,56 +462,6 @@ jcntl::flush(const bool block_till_aio_c
     return res;
 }
 
-/*
-void
-jcntl::log(log_level_t ll, const std::string& log_stmt) const
-{
-    log(ll, log_stmt.c_str());
-}
-
-void
-jcntl::log(log_level_t ll, const char* const log_stmt) const
-{
-    if (ll > LOG_INFO)
-    {
-        std::cout << log_level_str(ll) << ": Journal \"" << _jid << "\": " << log_stmt << std::endl;
-    }
-}
-*/
-
-/*
-void
-jcntl::chk_wr_frot()
-{
-    if (_wrfc.index() == _rrfc.index())
-        _rmgr.invalidate();
-}
-*/
-
-void
-jcntl::fhdr_wr_sync(const uint16_t /*lid*/)
-{
-/*
-    fcntl* fcntlp = _lpmgr.get_fcntlp(lid);
-    while (fcntlp->wr_fhdr_aio_outstanding())
-    {
-        if (get_wr_events(&_aio_cmpl_timeout) == jerrno::AIO_TIMEOUT)
-            throw jexception(jerrno::JERR_JCNTL_AIOCMPLWAIT, "jcntl", "fhdr_wr_sync");
-    }
-*/
-}
-
-/*
-fcntl*
-jcntl::new_fcntl(jcntl* const jcp, const uint16_t lid, const uint16_t fid, const rcvdat* const rdp)
-{
-    if (!jcp) return 0;
-    std::ostringstream oss;
-    oss << jcp->jrnl_dir() << "/" << jcp->base_filename();
-    return new fcntl(oss.str(), fid, lid, jcp->jfsize_sblks(), rdp);
-}
-*/
-
 // Protected/Private functions
 
 void
@@ -561,11 +511,15 @@ jcntl::handle_aio_wait(const iores res, 
     {
         while (_wmgr.curr_pg_blocked())
         {
-            if (_wmgr.get_events(pmgr::UNUSED, &_aio_cmpl_timeout) == jerrno::AIO_TIMEOUT)
+            if (_wmgr.get_aio_evt_rem() == 0) {
+std::cout << "&&&&&& jcntl::handle_aio_wait() " << _wmgr.status_str() << std::endl; // DEBUG
+                throw jexception("_wmgr.curr_pg_blocked() with no events remaining"); // TODO - complete exception
+            }
+            if (_wmgr.get_events(&_aio_cmpl_timeout, false) == jerrno::AIO_TIMEOUT)
             {
                 std::ostringstream oss;
                 oss << "get_events() returned JERR_JCNTL_AIOCMPLWAIT; wmgr_status: " << _wmgr.status_str();
-                this->log(LOG_CRITICAL, _jid, oss.str());
+                _jrnl_log.log(JournalLog::LOG_CRITICAL, _jid, oss.str());
                 throw jexception(jerrno::JERR_JCNTL_AIOCMPLWAIT, "jcntl", "handle_aio_wait");
             }
         }
@@ -592,417 +546,4 @@ jcntl::handle_aio_wait(const iores res, 
     return false;
 }
 
-
-// static
-void
-jcntl::rcvr_read_jfile(const std::string& jfn, ::file_hdr_t* fh, std::string& queueName) {
-    const std::size_t headerBlockSize = QLS_JRNL_FHDR_RES_SIZE_SBLKS * JRNL_SBLK_SIZE_KIB * 1024;
-    char buffer[headerBlockSize];
-    std::ifstream ifs(jfn.c_str(), std::ifstream::in | std::ifstream::binary);
-    if (!ifs.good()) {
-        std::ostringstream oss;
-        oss << "File=" << jfn;
-        throw jexception(jerrno::JERR_JCNTL_OPENRD, oss.str(), "jcntl", "rcvr_read_jfile");
-    }
-    ifs.read(buffer, headerBlockSize);
-    if (!ifs) {
-        std::streamsize s = ifs.gcount();
-        ifs.close();
-        std::ostringstream oss;
-        oss << "File=" << jfn << "; attempted_read_size=" << headerBlockSize << "; actual_read_size=" << s;
-        throw jexception(jerrno::JERR_JCNTL_READ, oss.str(), "jcntl", "rcvr_read_jfile");
-    }
-    ifs.close();
-    ::memcpy(fh, buffer, sizeof(::file_hdr_t));
-    queueName.assign(buffer + sizeof(::file_hdr_t), fh->_queue_name_len);
-}
-
-
-void jcntl::rcvr_analyze_fhdrs(EmptyFilePoolManager* efpmp) {
-    std::string headerQueueName;
-    ::file_hdr_t fh;
-    efpIdentity_t efpid;
-//    std::map<uint64_t, std::string> fileMap;
-    std::vector<std::string> dirList;
-    jdir::read_dir(_jdir.dirname(), dirList, false, true, false, true);
-    for (std::vector<std::string>::iterator i = dirList.begin(); i != dirList.end(); ++i) {
-        rcvr_read_jfile(*i, &fh, headerQueueName);
-        if (headerQueueName.compare(_jid) != 0) {
-            std::ostringstream oss;
-            oss << "Journal file " << (*i) << " belongs to queue \"" << headerQueueName << "\": ignoring";
-            log(LOG_WARN, _jid, oss.str());
-        } else {
-            _rcvdat._fm[fh._file_number] = *i;
-            efpid.first = fh._efp_partition;
-            efpid.second = fh._file_size_kib;
-        }
-    }
-    _rcvdat._jfl.clear();
-    for (std::map<uint64_t, std::string>::iterator i=_rcvdat._fm.begin(); i!=_rcvdat._fm.end(); ++i) {
-        _rcvdat._jfl.push_back(i->second);
-    }
-    _rcvdat._enq_cnt_list.resize(_rcvdat._jfl.size(), 0);
-    _emptyFilePoolPtr = efpmp->getEmptyFilePool(efpid);
-}
-
-
-void jcntl::rcvr_janalyze(const std::vector<std::string>* prep_txn_list_ptr, EmptyFilePoolManager* efpmp) {
-    // Analyze file headers of existing journal files
-    rcvr_analyze_fhdrs(efpmp);
-
-    // Restore all read and write pointers and transactions
-    if (!_rcvdat._jempty)
-    {
-        uint16_t fid = 0;
-        std::ifstream ifs;
-        //bool lowi = rd._owi; // local copy of owi to be used during analysis
-        while (rcvr_get_next_record(fid, &ifs)) ;
-        if (ifs.is_open()) ifs.close();
-
-        // Remove all txns from tmap that are not in the prepared list
-        if (prep_txn_list_ptr)
-        {
-            std::vector<std::string> xid_list;
-            _tmap.xid_list(xid_list);
-            for (std::vector<std::string>::iterator itr = xid_list.begin(); itr != xid_list.end(); itr++)
-            {
-                std::vector<std::string>::const_iterator pitr =
-                        std::find(prep_txn_list_ptr->begin(), prep_txn_list_ptr->end(), *itr);
-                if (pitr == prep_txn_list_ptr->end()) // not found in prepared list
-                {
-                    txn_data_list tdl = _tmap.get_remove_tdata_list(*itr); // tdl will be empty if xid not found
-                    // Unlock any affected enqueues in emap
-                    for (tdl_itr i=tdl.begin(); i<tdl.end(); i++)
-                    {
-                        if (i->_enq_flag) // enq op - decrement enqueue count
-                            _rcvdat._enq_cnt_list[i->_pfid]--;
-                        else if (_emap.is_enqueued(i->_drid, true)) // deq op - unlock enq record
-                        {
-                            int16_t ret = _emap.unlock(i->_drid);
-                            if (ret < enq_map::EMAP_OK) // fail
-                            {
-                                // enq_map::unlock()'s only error is enq_map::EMAP_RID_NOT_FOUND
-                                std::ostringstream oss;
-                                oss << std::hex << "_emap.unlock(): drid=0x\"" << i->_drid;
-                                throw jexception(jerrno::JERR_MAP_NOTFOUND, oss.str(), "jcntl", "rcvr_janalyze");
-                            }
-                        }
-                    }
-                }
-            }
-        }
-
-        // Check for file full condition
-        _rcvdat._lffull = _rcvdat._eo == _emptyFilePoolPtr->fileSize_kib() * 1024;
-    }
-}
-
-
-bool
-jcntl::rcvr_get_next_record(uint16_t& fid, std::ifstream* ifsp)
-{
-    std::size_t cum_size_read = 0;
-    void* xidp = 0;
-    rec_hdr_t h;
-
-    bool hdr_ok = false;
-    std::streampos file_pos;
-    while (!hdr_ok)
-    {
-        if (!ifsp->is_open())
-        {
-            if (!jfile_cycle(fid, ifsp, true))
-                return false;
-        }
-        file_pos = ifsp->tellg();
-        ifsp->read((char*)&h, sizeof(rec_hdr_t));
-        if (ifsp->gcount() == sizeof(rec_hdr_t))
-            hdr_ok = true;
-        else
-        {
-            if (!jfile_cycle(fid, ifsp, true))
-                return false;
-        }
-    }
-
-    switch(h._magic)
-    {
-        case QLS_ENQ_MAGIC:
-            {
-                std::cout << " e" << std::flush;
-                enq_rec er;
-                uint16_t start_fid = fid; // fid may increment in decode() if record folds over file boundary
-                if (!decode(er, fid, ifsp, cum_size_read, h, file_pos))
-                    return false;
-                if (!er.is_transient()) // Ignore transient msgs
-                {
-                    _rcvdat._enq_cnt_list[start_fid]++;
-                    if (er.xid_size())
-                    {
-                        er.get_xid(&xidp);
-                        assert(xidp != 0);
-                        std::string xid((char*)xidp, er.xid_size());
-                        _tmap.insert_txn_data(xid, txn_data(h._rid, 0, start_fid, true));
-                        if (_tmap.set_aio_compl(xid, h._rid) < txn_map::TMAP_OK) // fail - xid or rid not found
-                        {
-                            std::ostringstream oss;
-                            oss << std::hex << "_tmap.set_aio_compl: txn_enq xid=\"" << xid << "\" rid=0x" << h._rid;
-                            throw jexception(jerrno::JERR_MAP_NOTFOUND, oss.str(), "jcntl", "rcvr_get_next_record");
-                        }
-                        std::free(xidp);
-                    }
-                    else
-                    {
-                        if (_emap.insert_pfid(h._rid, start_fid, file_pos) < enq_map::EMAP_OK) // fail
-                        {
-                            // The only error code emap::insert_pfid() returns is enq_map::EMAP_DUP_RID.
-                            std::ostringstream oss;
-                            oss << std::hex << "rid=0x" << h._rid << " _pfid=0x" << start_fid;
-                            throw jexception(jerrno::JERR_MAP_DUPLICATE, oss.str(), "jcntl", "rcvr_get_next_record");
-                        }
-                    }
-                }
-            }
-            break;
-        case QLS_DEQ_MAGIC:
-            {
-                std::cout << " d" << std::flush;
-                deq_rec dr;
-                uint16_t start_fid = fid; // fid may increment in decode() if record folds over file boundary
-                if (!decode(dr, fid, ifsp, cum_size_read, h, file_pos))
-                    return false;
-                if (dr.xid_size())
-                {
-                    // If the enqueue is part of a pending txn, it will not yet be in emap
-                    _emap.lock(dr.deq_rid()); // ignore not found error
-                    dr.get_xid(&xidp);
-                    assert(xidp != 0);
-                    std::string xid((char*)xidp, dr.xid_size());
-                    _tmap.insert_txn_data(xid, txn_data(dr.rid(), dr.deq_rid(), start_fid, false,
-                            dr.is_txn_coml_commit()));
-                    if (_tmap.set_aio_compl(xid, dr.rid()) < txn_map::TMAP_OK) // fail - xid or rid not found
-                    {
-                        std::ostringstream oss;
-                        oss << std::hex << "_tmap.set_aio_compl: txn_deq xid=\"" << xid << "\" rid=0x" << dr.rid();
-                        throw jexception(jerrno::JERR_MAP_NOTFOUND, oss.str(), "jcntl", "rcvr_get_next_record");
-                    }
-                    std::free(xidp);
-                }
-                else
-                {
-                    int16_t enq_fid;
-                    if (_emap.get_remove_pfid(dr.deq_rid(), enq_fid, true) == enq_map::EMAP_OK) // ignore not found error
-                        _rcvdat._enq_cnt_list[enq_fid]--;
-                }
-            }
-            break;
-        case QLS_TXA_MAGIC:
-            {
-                std::cout << " a" << std::flush;
-                txn_rec ar;
-                if (!decode(ar, fid, ifsp, cum_size_read, h, file_pos))
-                    return false;
-                // Delete this txn from tmap, unlock any locked records in emap
-                ar.get_xid(&xidp);
-                assert(xidp != 0);
-                std::string xid((char*)xidp, ar.xid_size());
-                txn_data_list tdl = _tmap.get_remove_tdata_list(xid); // tdl will be empty if xid not found
-                for (tdl_itr itr = tdl.begin(); itr != tdl.end(); itr++)
-                {
-                    if (itr->_enq_flag)
-                        _rcvdat._enq_cnt_list[itr->_pfid]--;
-                    else
-                        _emap.unlock(itr->_drid); // ignore not found error
-                }
-                std::free(xidp);
-            }
-            break;
-        case QLS_TXC_MAGIC:
-            {
-                std::cout << " t" << std::flush;
-                txn_rec cr;
-                if (!decode(cr, fid, ifsp, cum_size_read, h, file_pos))
-                    return false;
-                // Delete this txn from tmap, process records into emap
-                cr.get_xid(&xidp);
-                assert(xidp != 0);
-                std::string xid((char*)xidp, cr.xid_size());
-                txn_data_list tdl = _tmap.get_remove_tdata_list(xid); // tdl will be empty if xid not found
-                for (tdl_itr itr = tdl.begin(); itr != tdl.end(); itr++)
-                {
-                    if (itr->_enq_flag) // txn enqueue
-                    {
-                        if (_emap.insert_pfid(itr->_rid, itr->_pfid, file_pos) < enq_map::EMAP_OK) // fail
-                        {
-                            // The only error code emap::insert_pfid() returns is enq_map::EMAP_DUP_RID.
-                            std::ostringstream oss;
-                            oss << std::hex << "rid=0x" << itr->_rid << " _pfid=0x" << itr->_pfid;
-                            throw jexception(jerrno::JERR_MAP_DUPLICATE, oss.str(), "jcntl", "rcvr_get_next_record");
-                        }
-                    }
-                    else // txn dequeue
-                    {
-                        int16_t enq_fid;
-                        if (_emap.get_remove_pfid(itr->_drid, enq_fid, true) == enq_map::EMAP_OK) // ignore not found error
-                            _rcvdat._enq_cnt_list[enq_fid]--;
-                    }
-                }
-                std::free(xidp);
-            }
-            break;
-        case QLS_EMPTY_MAGIC:
-            {
-                std::cout << " x" << std::flush;
-                uint32_t rec_dblks = jrec::size_dblks(sizeof(rec_hdr_t));
-                ifsp->ignore(rec_dblks * JRNL_DBLK_SIZE_BYTES - sizeof(rec_hdr_t));
-                assert(!ifsp->fail() && !ifsp->bad());
-                if (!jfile_cycle(fid, ifsp, false))
-                    return false;
-            }
-            break;
-        case 0:
-            std::cout << " 0" << std::endl << std::flush;
-            check_journal_alignment(fid, file_pos);
-            return false;
-        default:
-            std::cout << " ?" << std::endl << std::flush;
-            // Stop as this is the overwrite boundary.
-            check_journal_alignment(fid, file_pos);
-            return false;
-    }
-    return true;
-}
-
-
-bool
-jcntl::decode(jrec& rec, uint16_t& fid, std::ifstream* ifsp, std::size_t& cum_size_read,
-        rec_hdr_t& h, std::streampos& file_offs)
-{
-    uint16_t start_fid = fid;
-    std::streampos start_file_offs = file_offs;
-
-    if (_rcvdat._h_rid == 0)
-        _rcvdat._h_rid = h._rid;
-    else if (h._rid - _rcvdat._h_rid < 0x8000000000000000ULL) // RFC 1982 comparison for unsigned 64-bit
-        _rcvdat._h_rid = h._rid;
-
-    bool done = false;
-    while (!done)
-    {
-        try { done = rec.rcv_decode(h, ifsp, cum_size_read); }
-        catch (const jexception& e)
-        {
-// TODO - review this logic and tidy up how rd._lfid is assigned. See new jinf.get_end_file() fn.
-// Original
-//             if (e.err_code() != jerrno::JERR_JREC_BADRECTAIL ||
-//                     fid != (rd._ffid ? rd._ffid - 1 : _num_jfiles - 1)) throw;
-// Tried this, but did not work
-//             if (e.err_code() != jerrno::JERR_JREC_BADRECTAIL || h._magic != 0) throw;
-            check_journal_alignment(start_fid, start_file_offs);
-//             rd._lfid = start_fid;
-            return false;
-        }
-        if (!done && !jfile_cycle(fid, ifsp, /*lowi, rd,*/ false))
-        {
-            check_journal_alignment(start_fid, start_file_offs);
-            return false;
-        }
-    }
-    return true;
-}
-
-
-bool
-jcntl::jfile_cycle(uint16_t& fid, std::ifstream* ifsp, const bool jump_fro)
-{
-    if (ifsp->is_open())
-    {
-        if (ifsp->eof() || !ifsp->good())
-        {
-            ifsp->clear();
-            _rcvdat._eo = ifsp->tellg(); // remember file offset before closing
-            assert(_rcvdat._eo != std::numeric_limits<std::size_t>::max()); // Check for error code -1
-            ifsp->close();
-            if (++fid == _rcvdat._jfl.size()) // used up all known journal files
-                return false;
-        }
-    }
-    if (!ifsp->is_open())
-    {
-        ifsp->clear(); // clear eof flag, req'd for older versions of c++
-        ifsp->open(_rcvdat._jfl[fid].c_str(), std::ios_base::in | std::ios_base::binary);
-        if (!ifsp->good())
-            throw jexception(jerrno::JERR__FILEIO, _rcvdat._jfl[fid], "jcntl", "jfile_cycle");
-
-        // Read file header
-        std::cout << " F" << fid << std::flush;
-        file_hdr_t fhdr;
-        ifsp->read((char*)&fhdr, sizeof(fhdr));
-        assert(ifsp->good());
-        if (fhdr._rhdr._magic == QLS_FILE_MAGIC)
-        {
-            if (!_rcvdat._fro)
-                _rcvdat._fro = fhdr._fro;
-            std::streamoff foffs = jump_fro ? fhdr._fro : JRNL_SBLK_SIZE_BYTES;
-            ifsp->seekg(foffs);
-        }
-        else
-        {
-            ifsp->close();
-            if (fid == 0) {
-                _rcvdat._jempty = true;
-            }
-            return false;
-        }
-    }
-    return true;
-}
-
-
-void
-jcntl::check_journal_alignment(const uint16_t fid, std::streampos& file_pos/*, rcvdat& rd*/)
-{
-    unsigned sblk_offs = file_pos % JRNL_SBLK_SIZE_BYTES;
-    if (sblk_offs)
-    {
-        {
-            std::ostringstream oss;
-            oss << std::hex << "Bad record alignment found at fid=0x" << fid;
-            oss << " offs=0x" << file_pos << " (likely journal overwrite boundary); " << std::dec;
-            oss << (JRNL_SBLK_SIZE_DBLKS - (sblk_offs/JRNL_DBLK_SIZE_BYTES)) << " filler record(s) required.";
-            this->log(LOG_WARN, _jid, oss.str());
-        }
-        const uint32_t xmagic = QLS_EMPTY_MAGIC;
-        std::ostringstream oss;
-        oss << _jdir.dirname() << "/" /*<< _base_filename*/ << "."; // TODO linear journal name
-        oss << std::hex << std::setfill('0') << std::setw(4) << fid << QLS_JRNL_FILE_EXTENSION;
-        std::ofstream ofsp(oss.str().c_str(),
-                std::ios_base::in | std::ios_base::out | std::ios_base::binary);
-        if (!ofsp.good())
-            throw jexception(jerrno::JERR__FILEIO, oss.str(), "jcntl", "check_journal_alignment");
-        ofsp.seekp(file_pos);
-        void* buff = std::malloc(JRNL_DBLK_SIZE_BYTES);
-        assert(buff != 0);
-        std::memcpy(buff, (const void*)&xmagic, sizeof(xmagic));
-        // Normally, RHM_CLEAN must be set before these fills are done, but this is a recover
-        // situation (i.e. performance is not an issue), and it makes the location of the write
-        // clear should inspection of the file be required.
-        std::memset((char*)buff + sizeof(xmagic), QLS_CLEAN_CHAR, JRNL_DBLK_SIZE_BYTES - sizeof(xmagic));
-
-        while (file_pos % JRNL_SBLK_SIZE_BYTES)
-        {
-            ofsp.write((const char*)buff, JRNL_DBLK_SIZE_BYTES);
-            assert(!ofsp.fail());
-            std::ostringstream oss;
-            oss << std::hex << "Recover phase write: Wrote filler record: fid=0x" << fid << " offs=0x" << file_pos;
-            this->log(LOG_NOTICE, _jid, oss.str());
-            file_pos = ofsp.tellp();
-        }
-        ofsp.close();
-        std::free(buff);
-        this->log(LOG_INFO, _jid, "Bad record alignment fixed.");
-    }
-    _rcvdat._eo = file_pos;
-}
-
 }}

Modified: qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/jcntl.h
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/jcntl.h?rev=1534383&r1=1534382&r2=1534383&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/jcntl.h (original)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/jcntl.h Mon Oct 21 21:26:10 2013
@@ -32,16 +32,11 @@ namespace qls_jrnl
 #include <cstddef>
 #include <deque>
 #include <qpid/linearstore/jrnl/LinearFileController.h>
-#include <qpid/linearstore/jrnl/JournalLog.h>
 #include "qpid/linearstore/jrnl/jdir.h"
-//#include "qpid/linearstore/jrnl/fcntl.h"
-//#include "qpid/linearstore/jrnl/lpmgr.h"
-#include "qpid/linearstore/jrnl/rcvdat.h"
+#include "qpid/linearstore/jrnl/RecoveryManager.h"
 #include "qpid/linearstore/jrnl/slock.h"
 #include "qpid/linearstore/jrnl/smutex.h"
-#include "qpid/linearstore/jrnl/rmgr.h"
 #include "qpid/linearstore/jrnl/wmgr.h"
-//#include "qpid/linearstore/jrnl/wrfc.h"
 
 namespace qpid
 {
@@ -60,7 +55,7 @@ namespace qls_jrnl
     * which is used per data block written to the journal, and is used to track its status through
     * the AIO enqueue, read and dequeue process.
     */
-    class jcntl : public JournalLog
+    class jcntl
     {
     protected:
         /**
@@ -82,16 +77,6 @@ namespace qls_jrnl
         jdir _jdir;
 
         /**
-        * \brief Base filename
-        *
-        * This string contains the base filename used for the journal files. The filenames will
-        * start with this base, and have various sections added to it to derive the final file names
-        * that will be written to disk. No file separator characters should be included here, but
-        * all other legal filename characters are valid.
-        */
-//        std::string _base_filename;
-
-        /**
         * \brief Initialized flag
         *
         * This flag starts out set to false, is set to true once this object has been initialized,
@@ -120,21 +105,14 @@ namespace qls_jrnl
         */
         bool _readonly_flag;
 
-        /**
-        * \brief If set, calls stop() if the jouranl write pointer overruns dequeue low water
-        *     marker. If not set, then attempts to write will throw exceptions until the journal
-        *     file low water marker moves to the next journal file.
-        */
-        //bool _autostop;             ///< Autostop flag - stops journal when overrun occurs
-
         // Journal control structures
+        JournalLog& _jrnl_log;                      ///< Ref to Journal Log instance
         LinearFileController _linearFileController; ///< Linear File Controller
         EmptyFilePool* _emptyFilePoolPtr;           ///< Pointer to Empty File Pool for this queue
         enq_map _emap;                              ///< Enqueue map for low water mark management
         txn_map _tmap;                              ///< Transaction map open transactions
-        //rmgr _rmgr;                                 ///< Read page manager which manages AIO
         wmgr _wmgr;                                 ///< Write page manager which manages AIO
-        rcvdat _rcvdat;                             ///< Recovery data used for recovery
+        RecoveryManager _recoveryManager;           ///< Recovery data used for recovery
         smutex _wr_mutex;                           ///< Mutex for journal writes
 
     public:
@@ -150,7 +128,9 @@ namespace qls_jrnl
         * \param jdir The directory which will contain the journal files.
         * \param base_filename The string which will be used to start all journal filenames.
         */
-        jcntl(const std::string& jid, const std::string& jdir/*, const std::string& base_filename*/);
+        jcntl(const std::string& jid,
+              const std::string& jdir,
+              JournalLog& jrnl_log);
 
         /**
         * \brief Destructor.
@@ -158,6 +138,7 @@ namespace qls_jrnl
         virtual ~jcntl();
 
         inline const std::string& id() const { return _jid; }
+
         inline const std::string& jrnl_dir() const { return _jdir.dirname(); }
 
         /**
@@ -191,9 +172,10 @@ namespace qls_jrnl
         *
         * \exception TODO
         */
-        void initialize(/*const uint16_t num_jfiles, const bool auto_expand, const uint16_t ae_max_jfiles,
-                const uint32_t jfsize_sblks,*/EmptyFilePool* efpp, const uint16_t wcache_num_pages, const uint32_t wcache_pgsize_sblks,
-                aio_callback* const cbp);
+        void initialize(EmptyFilePool* efpp,
+                        const uint16_t wcache_num_pages,
+                        const uint32_t wcache_pgsize_sblks,
+                        aio_callback* const cbp);
 
         /**
         * /brief Initialize journal by recovering state from previously written journal.
@@ -294,11 +276,15 @@ namespace qls_jrnl
         *
         * \exception TODO
         */
-        iores enqueue_data_record(const void* const data_buff, const std::size_t tot_data_len,
-                const std::size_t this_data_len, data_tok* dtokp, const bool transient = false);
-
-        iores enqueue_extern_data_record(const std::size_t tot_data_len, data_tok* dtokp,
-                const bool transient = false);
+        iores enqueue_data_record(const void* const data_buff,
+                                  const std::size_t tot_data_len,
+                                  const std::size_t this_data_len,
+                                  data_tok* dtokp,
+                                  const bool transient = false);
+
+        iores enqueue_extern_data_record(const std::size_t tot_data_len,
+                                         data_tok* dtokp,
+                                         const bool transient = false);
 
         /**
         * \brief Enqueue data.
@@ -313,84 +299,17 @@ namespace qls_jrnl
         *
         * \exception TODO
         */
-        iores enqueue_txn_data_record(const void* const data_buff, const std::size_t tot_data_len,
-                const std::size_t this_data_len, data_tok* dtokp, const std::string& xid,
-                const bool transient = false);
-        iores enqueue_extern_txn_data_record(const std::size_t tot_data_len, data_tok* dtokp,
-                const std::string& xid, const bool transient = false);
-
-        /* TODO
-        **
-        * \brief Retrieve details of next record to be read without consuming the record.
-        *
-        * Retrieve information about current read record. A pointer to the data is returned, along
-        * with the data size and available data size. Data is considered "available" when the AIO
-        * operations to fill page-cache pages from disk have returned, and is ready for consumption.
-        *
-        * If <i>dsize_avail</i> &lt; <i>dsize</i>, then not all of the data is available or part of
-        * the data is in non-contiguous memory, and a subsequent call will update both the pointer
-        * and <i>dsize_avail</i> if more pages have returned from AIO.
-        *
-        * The <i>dsize_avail</i> parameter will return the amount of data from this record that is
-        * available in the page cache as contiguous memory, even if it spans page cache boundaries.
-        * However, if a record spans the end of the page cache and continues at the beginning, even
-        * if both parts are ready for consumption, then this must be divided into at least two
-        * get_data_record() operations, as the data is contained in at least two non-contiguous
-        * segments of the page cache.
-        *
-        * Once all the available data for a record is exposed, it can not be read again using
-        * this function. It must be consumed prior to getting the next record. This can be done by
-        * calling discard_data_record() or read_data_record(). However, if parameter
-        * <i>auto_discard</i> is set to <b><i>true</i></b>, then this record will be automatically
-        * consumed when the entire record has become available without having to explicitly call
-        * discard_next_data_record() or read_data_record().
-        *
-        * If the current record is an open transactional record, then it cannot be read until it is
-        * committed. If it is aborted, it can never be read. Under this condition, get_data_record()
-        * will return RHM_IORES_TXPENDING, the data pointer will be set to NULL and all data
-        * lengths will be set to 0.
-        *
-        * Example: Read a record of 30k. Assume a read page cache of 10 pages of size 10k starting
-        * at address base_ptr (page0 = base_ptr, page1 = page_ptr+10k, etc.). The first 15k of
-        * the record falls at the end of the page cache, the remaining 15k folded to the beginning.
-        * The current page (page 8) containing 5k is available, the remaining pages which contain
-        * this record are pending AIO return:
-        * <pre>
-        * call       dsize
-        * no.  dsize avail data ptr     Return   Comment
-        * ----+-----+-----+------------+--------+--------------------------------------------------
-        * 1    30k   5k    base_ptr+85k SUCCESS  Initial call, read first 5k
-        * 2    30k   0k    base_ptr+90k AIO_WAIT AIO still pending; no further pages avail
-        * 3    30k   10k   base_ptr+90k SUCCESS  AIO now returned; now read till end of page cache
-        * 4    30k   15k   base_ptr     SUCCESS  data_ptr now pointing to start of page cache
-        * </pre>
-        *
-        * \param rid Reference that returns the record ID (rid)
-        * \param dsize Reference that returns the total data size of the record data .
-        * \param dsize_avail Reference that returns the amount of the data that is available for
-        *     consumption.
-        * \param data Pointer to data pointer which will point to the first byte of the next record
-        *     data.
-        * \param auto_discard If <b><i>true</i></b>, automatically discard the record being read if
-        *     the entire record is available (i.e. dsize == dsize_avail). Otherwise
-        *     discard_next_data_record() must be explicitly called.
-        *
-        * \exception TODO
-        *
-        // *** NOT YET IMPLEMENTED ***
-        iores get_data_record(const uint64_t& rid, const std::size_t& dsize,
-                const std::size_t& dsize_avail, const void** const data, bool auto_discard = false);
-        */
-
-        /* TODO
-        **
-        * \brief Discard (skip) next record to be read without reading or retrieving it.
-        *
-        * \exception TODO
-        *
-        // *** NOT YET IMPLEMENTED ***
-        iores discard_data_record(data_tok* const dtokp);
-        */
+        iores enqueue_txn_data_record(const void* const data_buff,
+                                      const std::size_t tot_data_len,
+                                      const std::size_t this_data_len,
+                                      data_tok* dtokp,
+                                      const std::string& xid,
+                                      const bool transient = false);
+
+        iores enqueue_extern_txn_data_record(const std::size_t tot_data_len,
+                                             data_tok* dtokp,
+                                             const std::string& xid,
+                                             const bool transient = false);
 
         /**
         * \brief Reads data from the journal. It is the responsibility of the reader to free
@@ -434,9 +353,14 @@ namespace qls_jrnl
         *
         * \exception TODO
         */
-        iores read_data_record(void** const datapp, std::size_t& dsize, void** const xidpp,
-                std::size_t& xidsize, bool& transient, bool& external, data_tok* const dtokp,
-                bool ignore_pending_txns = false);
+        iores read_data_record(void** const datapp,
+                               std::size_t& dsize,
+                               void** const xidpp,
+                               std::size_t& xidsize,
+                               bool& transient,
+                               bool& external,
+                               data_tok* const dtokp,
+                               bool ignore_pending_txns = false);
 
         /**
         * \brief Dequeues (marks as no longer needed) data record in journal.
@@ -455,7 +379,8 @@ namespace qls_jrnl
         *
         * \exception TODO
         */
-        iores dequeue_data_record(data_tok* const dtokp, const bool txn_coml_commit = false);
+        iores dequeue_data_record(data_tok* const dtokp,
+                                  const bool txn_coml_commit = false);
 
         /**
         * \brief Dequeues (marks as no longer needed) data record in journal.
@@ -476,7 +401,9 @@ namespace qls_jrnl
         *
         * \exception TODO
         */
-        iores dequeue_txn_data_record(data_tok* const dtokp, const std::string& xid, const bool txn_coml_commit = false);
+        iores dequeue_txn_data_record(data_tok* const dtokp,
+                                      const std::string& xid,
+                                      const bool txn_coml_commit = false);
 
         /**
         * \brief Abort the transaction for all records enqueued or dequeued with the matching xid.
@@ -491,7 +418,8 @@ namespace qls_jrnl
         *
         * \exception TODO
         */
-        iores txn_abort(data_tok* const dtokp, const std::string& xid);
+        iores txn_abort(data_tok* const dtokp,
+                        const std::string& xid);
 
         /**
         * \brief Commit the transaction for all records enqueued or dequeued with the matching xid.
@@ -506,7 +434,8 @@ namespace qls_jrnl
         *
         * \exception TODO
         */
-        iores txn_commit(data_tok* const dtokp, const std::string& xid);
+        iores txn_commit(data_tok* const dtokp,
+                         const std::string& xid);
 
         /**
         * \brief Check whether all the enqueue records for the given xid have reached disk.
@@ -527,15 +456,6 @@ namespace qls_jrnl
         int32_t get_wr_events(timespec* const timeout);
 
         /**
-        * \brief Forces a check for returned AIO read events.
-        *
-        * Forces a check for returned AIO read events. This is normally performed by read_data()
-        * operations, but if these operations cease, then this call needs to be made to force the
-        * processing of any outstanding AIO operations.
-        */
-//        int32_t get_rd_events(timespec* const timeout);
-
-        /**
         * \brief Stop the journal from accepting any further requests to read or write data.
         *
         * This operation is used to stop the journal. This is the normal mechanism for bringing the
@@ -555,27 +475,14 @@ namespace qls_jrnl
         */
         iores flush(const bool block_till_aio_cmpl = false);
 
-        inline uint32_t get_enq_cnt() const { return _emap.size(); } // TODO: Thread safe?
+        inline uint32_t get_enq_cnt() const { return _emap.size(); } // TODO: _emap: Thread safe?
 
         inline uint32_t get_wr_aio_evt_rem() const { slock l(_wr_mutex); return _wmgr.get_aio_evt_rem(); }
 
-//        inline uint32_t get_rd_aio_evt_rem() const { return _rmgr.get_aio_evt_rem(); }
-
-        inline uint32_t get_wr_outstanding_aio_dblks() const;
-                /*{ return _wrfc.aio_outstanding_dblks(); }*/
+        uint32_t get_wr_outstanding_aio_dblks() const;
 
-//        inline uint32_t get_wr_outstanding_aio_dblks(uint16_t lfid) const;
-//                { return _lpmgr.get_fcntlp(lfid)->wr_aio_outstanding_dblks(); }
+        uint32_t get_rd_outstanding_aio_dblks() const;
 
-        inline uint32_t get_rd_outstanding_aio_dblks() const;
-//                { return _rrfc.aio_outstanding_dblks(); }
-
-//        inline uint32_t get_rd_outstanding_aio_dblks(uint16_t lfid) const;
-//                { return _lpmgr.get_fcntlp(lfid)->rd_aio_outstanding_dblks(); }
-
-//        inline uint16_t get_rd_fid() const { return _rrfc.index(); }
-//        inline uint16_t get_wr_fid() const { return _wrfc.index(); }
-//        uint16_t get_earliest_fid();
         LinearFileController& getLinearFileControllerRef();
 
         /**
@@ -583,13 +490,20 @@ namespace qls_jrnl
         *     false if the rid is transactionally enqueued and is not committed, or if it is
         *     locked (i.e. transactionally dequeued, but the dequeue has not been committed).
         */
-        inline bool is_enqueued(const uint64_t rid, bool ignore_lock = false)
-                { return _emap.is_enqueued(rid, ignore_lock); }
-        inline bool is_locked(const uint64_t rid)
-                { if (_emap.is_enqueued(rid, true) < enq_map::EMAP_OK) return false; return _emap.is_locked(rid) == enq_map::EMAP_TRUE; }
+        inline bool is_enqueued(const uint64_t rid, bool ignore_lock = false) { return _emap.is_enqueued(rid, ignore_lock); }
+
+        inline bool is_locked(const uint64_t rid) {
+            if (_emap.is_enqueued(rid, true) < enq_map::EMAP_OK)
+                return false;
+            return _emap.is_locked(rid) == enq_map::EMAP_TRUE;
+        }
+
         inline void enq_rid_list(std::vector<uint64_t>& rids) { _emap.rid_list(rids); }
+
         inline void enq_xid_list(std::vector<std::string>& xids) { _tmap.xid_list(xids); }
+
         inline uint32_t get_open_txn_cnt() const { return _tmap.size(); }
+
         // TODO Make this a const, but txn_map must support const first.
         inline txn_map& get_txn_map() { return _tmap; }
 
@@ -626,41 +540,10 @@ namespace qls_jrnl
         */
         inline const std::string& dirname() const { return _jdir.dirname(); }
 
-        /**
-        * \brief Get the journal base filename.
-        *
-        * Get the journal base filename as set during initialization. This is the prefix used in all
-        * journal files of this instance. Note that if more than one instance of the journal shares
-        * the same directory, their base filenames <b>MUST</b> be different or else the instances
-        * will overwrite one another.
-        */
-//        inline const std::string& base_filename() const { return _base_filename; }
-
-//        inline uint16_t num_jfiles() const; { return _lpmgr.num_jfiles(); }
-
-//        inline fcntl* get_fcntlp(const uint16_t lfid) const { return _lpmgr.get_fcntlp(lfid); }
-
-//        inline uint32_t jfsize_sblks() const { return _jfsize_sblks; }
-
-        // Logging
-//        virtual void log(log_level_t level, const std::string& log_stmt) const;
-//        virtual void log(log_level_t level, const char* const log_stmt) const;
-
-        // FIXME these are _rmgr to _wmgr interactions, remove when _rmgr contains ref to _wmgr:
-        //void chk_wr_frot();
-        inline uint32_t unflushed_dblks() { return _wmgr.unflushed_dblks(); }
-        void fhdr_wr_sync(const uint16_t lid);
-        inline uint32_t wr_subm_cnt_dblks(const uint16_t lfid) const; /*{ return _lpmgr.get_fcntlp(lfid)->wr_subm_cnt_dblks(); }*/
-
         // Management instrumentation callbacks
         inline virtual void instr_incr_outstanding_aio_cnt() {}
         inline virtual void instr_decr_outstanding_aio_cnt() {}
 
-        /**
-        * /brief Static function for creating new fcntl objects for use with obj_arr.
-        */
-//        static fcntl* new_fcntl(jcntl* const jcp, const uint16_t lid, const uint16_t fid, const rcvdat* const rdp);
-
     protected:
         static bool _init;
         static bool init_statics();
@@ -676,11 +559,6 @@ namespace qls_jrnl
         void check_rstatus(const char* fn_name) const;
 
         /**
-        * \brief Write info file &lt;basefilename&gt;.jinf to disk
-        */
-//        void write_infofile() const;
-
-        /**
         * \brief Call that blocks while waiting for all outstanding AIOs to complete
         */
         void aio_cmpl_wait();
@@ -690,27 +568,6 @@ namespace qls_jrnl
         *     AIO wait conditions to clear.
         */
         bool handle_aio_wait(const iores res, iores& resout, const data_tok* dtp);
-
-        /**
-        * \brief Analyze journal for recovery.
-        */
-        static void rcvr_read_jfile(const std::string& jfn, ::file_hdr_t* fh, std::string& queueName);
-
-        void rcvr_analyze_fhdrs(EmptyFilePoolManager* efpmp);
-
-        void rcvr_janalyze(const std::vector<std::string>* prep_txn_list_ptr, EmptyFilePoolManager* efpmp);
-
-        bool rcvr_get_next_record(uint16_t& fid, std::ifstream* ifsp/*, bool& lowi, rcvdat& rd*/);
-
-        bool decode(jrec& rec, uint16_t& fid, std::ifstream* ifsp, std::size_t& cum_size_read,
-                rec_hdr_t& h, /*bool& lowi, rcvdat& rd,*/ std::streampos& rec_offset);
-
-        bool jfile_cycle(uint16_t& fid, std::ifstream* ifsp, /*bool& lowi, rcvdat& rd,*/ const bool jump_fro);
-
-        //bool check_owi(const uint16_t fid, rec_hdr_t& h, bool& lowi, rcvdat& rd,
-        //        std::streampos& read_pos);
-
-        void check_journal_alignment(const uint16_t fid, std::streampos& rec_offset/*, rcvdat& rd*/);
     };
 
 }}

Modified: qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/jerrno.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/jerrno.cpp?rev=1534383&r1=1534382&r2=1534383&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/jerrno.cpp (original)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/jerrno.cpp Mon Oct 21 21:26:10 2013
@@ -50,8 +50,6 @@ const uint32_t jerrno::JERR_JCNTL_READON
 const uint32_t jerrno::JERR_JCNTL_AIOCMPLWAIT    = 0x0202;
 const uint32_t jerrno::JERR_JCNTL_UNKNOWNMAGIC   = 0x0203;
 const uint32_t jerrno::JERR_JCNTL_NOTRECOVERED   = 0x0204;
-const uint32_t jerrno::JERR_JCNTL_OPENRD         = 0x0205;
-const uint32_t jerrno::JERR_JCNTL_READ           = 0x0206;
 const uint32_t jerrno::JERR_JCNTL_ENQSTATE       = 0x0207;
 const uint32_t jerrno::JERR_JCNTL_INVALIDENQHDR  = 0x0208;
 
@@ -90,6 +88,11 @@ const uint32_t jerrno::JERR_WMGR_DEQDISC
 const uint32_t jerrno::JERR_WMGR_DEQRIDNOTENQ    = 0x0805;
 const uint32_t jerrno::JERR_WMGR_BADFH           = 0x0806;
 
+// class RecoveryManager
+const uint32_t jerrno::JERR_RCVM_OPENRD          = 0x0900;
+const uint32_t jerrno::JERR_RCVM_READ            = 0x0901;
+const uint32_t jerrno::JERR_RCVM_WRITE           = 0x0902;
+
 //// class rmgr
 //const uint32_t jerrno::JERR_RMGR_UNKNOWNMAGIC    = 0x0900;
 //const uint32_t jerrno::JERR_RMGR_RIDMISMATCH     = 0x0901;
@@ -143,8 +146,6 @@ jerrno::__init()
     _err_map[JERR_JCNTL_AIOCMPLWAIT] = "JERR_JCNTL_AIOCMPLWAIT: Timeout waiting for AIOs to complete.";
     _err_map[JERR_JCNTL_UNKNOWNMAGIC] = "JERR_JCNTL_UNKNOWNMAGIC: Found record with unknown magic.";
     _err_map[JERR_JCNTL_NOTRECOVERED] = "JERR_JCNTL_NOTRECOVERED: Operation requires recover() to be run first.";
-    _err_map[JERR_JCNTL_OPENRD] = "JERR_JCNTL_OPENRD: Unable to open file for write";
-    _err_map[JERR_JCNTL_READ] = "JERR_JCNTL_READ: Read error: no or insufficient data to read";
     _err_map[JERR_JCNTL_ENQSTATE] = "JERR_JCNTL_ENQSTATE: Read error: Record not in ENQ state";
     _err_map[JERR_JCNTL_INVALIDENQHDR] = "JERR_JCNTL_INVALIDENQHDR: Invalid ENQ header";
 
@@ -182,6 +183,10 @@ jerrno::__init()
     _err_map[JERR_WMGR_DEQRIDNOTENQ] = "JERR_WMGR_DEQRIDNOTENQ: Dequeue rid is not enqueued.";
     _err_map[JERR_WMGR_BADFH] = "JERR_WMGR_BADFH: Bad file handle.";
 
+    // class RecoveryManager
+    _err_map[JERR_RCVM_OPENRD] = "JERR_JCNTL_OPENRD: Unable to open file for write";
+    _err_map[JERR_RCVM_READ] = "JERR_JCNTL_READ: Read error: no or insufficient data to read";
+    _err_map[JERR_RCVM_WRITE] = "JERR_RCVM_WRITE: Write error";
 //    // class rmgr
 //    _err_map[JERR_RMGR_UNKNOWNMAGIC] = "JERR_RMGR_UNKNOWNMAGIC: Found record with unknown magic.";
 //    _err_map[JERR_RMGR_RIDMISMATCH] = "JERR_RMGR_RIDMISMATCH: RID mismatch between current record and dtok RID";

Modified: qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/jerrno.h
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/jerrno.h?rev=1534383&r1=1534382&r2=1534383&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/jerrno.h (original)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/jerrno.h Mon Oct 21 21:26:10 2013
@@ -69,8 +69,6 @@ namespace qls_jrnl
         static const uint32_t JERR_JCNTL_AIOCMPLWAIT;  ///< Timeout waiting for AIOs to complete
         static const uint32_t JERR_JCNTL_UNKNOWNMAGIC; ///< Found record with unknown magic
         static const uint32_t JERR_JCNTL_NOTRECOVERED; ///< Req' recover() to be called first
-        static const uint32_t JERR_JCNTL_OPENRD;       ///< Unable to open file for read
-        static const uint32_t JERR_JCNTL_READ;         ///< Read error: no or insufficient data to read
         static const uint32_t JERR_JCNTL_ENQSTATE;     ///< Read error: Record not in ENQ state
         static const uint32_t JERR_JCNTL_INVALIDENQHDR;///< Invalid ENQ header
 
@@ -108,6 +106,11 @@ namespace qls_jrnl
         static const uint32_t JERR_WMGR_DEQRIDNOTENQ;  ///< Deq. rid not enqueued
         static const uint32_t JERR_WMGR_BADFH;         ///< Bad file handle
 
+        // class RecoveryManager
+        static const uint32_t JERR_RCVM_OPENRD;       ///< Unable to open file for read
+        static const uint32_t JERR_RCVM_READ;         ///< Read error: no or insufficient data to read
+        static const uint32_t JERR_RCVM_WRITE;          ///< Write error
+
 //        // class rmgr
 //        static const uint32_t JERR_RMGR_UNKNOWNMAGIC;  ///< Found record with unknown magic
 //        static const uint32_t JERR_RMGR_RIDMISMATCH;   ///< RID mismatch between rec and dtok

Modified: qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/jrec.h
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/jrec.h?rev=1534383&r1=1534382&r2=1534383&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/jrec.h (original)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/jrec.h Mon Oct 21 21:26:10 2013
@@ -148,9 +148,9 @@ namespace qls_jrnl
         virtual std::size_t rec_size() const = 0;
         inline virtual uint32_t rec_size_dblks() const { return size_dblks(rec_size()); }
         static inline uint32_t size_dblks(const std::size_t size)
-                { return size_blks(size, JRNL_DBLK_SIZE_BYTES); }
+                { return size_blks(size, QLS_DBLK_SIZE_BYTES); }
         static inline uint32_t size_sblks(const std::size_t size)
-                { return size_blks(size, JRNL_SBLK_SIZE_BYTES); }
+                { return size_blks(size, QLS_SBLK_SIZE_BYTES); }
         static inline uint32_t size_blks(const std::size_t size, const std::size_t blksize)
                 { return (size + blksize - 1)/blksize; }
         virtual uint64_t rid() const = 0;

Modified: qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/pmgr.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/pmgr.cpp?rev=1534383&r1=1534382&r2=1534383&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/pmgr.cpp (original)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/pmgr.cpp Mon Oct 21 21:26:10 2013
@@ -41,12 +41,11 @@ pmgr::page_cb::page_cb(uint16_t index):
         _wdblks(0),
         _rdblks(0),
         _pdtokl(0),
-//        _wfh(0),
-//        _rfh(0),
         _jfp(0),
         _pbuff(0)
 {}
 
+// TODO: almost identical to pmgr::page_state_str() below - resolve
 const char*
 pmgr::page_cb::state_str() const
 {
@@ -58,14 +57,12 @@ pmgr::page_cb::state_str() const
             return "IN_USE";
         case AIO_PENDING:
             return "AIO_PENDING";
-        case AIO_COMPLETE:
-            return "AIO_COMPLETE";
     }
     return "<unknown>";
 }
 
 // static
-const uint32_t pmgr::_sblkSizeBytes = JRNL_SBLK_SIZE_BYTES;
+const uint32_t pmgr::_sblkSizeBytes = QLS_SBLK_SIZE_BYTES;
 
 pmgr::pmgr(jcntl* jc, enq_map& emap, txn_map& tmap):
         _cache_pgsize_sblks(0),
@@ -109,11 +106,11 @@ pmgr::initialize(aio_callback* const cbp
 
     // 1. Allocate page memory (as a single block)
     std::size_t cache_pgsize = _cache_num_pages * _cache_pgsize_sblks * _sblkSizeBytes;
-    if (::posix_memalign(&_page_base_ptr, QLS_AIO_ALIGN_BOUNDARY, cache_pgsize))
+    if (::posix_memalign(&_page_base_ptr, QLS_AIO_ALIGN_BOUNDARY_BYTES, cache_pgsize))
     {
         clean();
         std::ostringstream oss;
-        oss << "posix_memalign(): alignment=" << QLS_AIO_ALIGN_BOUNDARY << " size=" << cache_pgsize;
+        oss << "posix_memalign(): alignment=" << QLS_AIO_ALIGN_BOUNDARY_BYTES << " size=" << cache_pgsize;
         oss << FORMAT_SYSERR(errno);
         throw jexception(jerrno::JERR__MALLOC, oss.str(), "pmgr", "initialize");
     }
@@ -186,6 +183,7 @@ pmgr::clean()
     _aio_event_arr = 0;
 }
 
+// TODO: almost identical to pmgr::page_cb::state_str() above - resolve
 const char*
 pmgr::page_state_str(page_state ps)
 {
@@ -197,8 +195,6 @@ pmgr::page_state_str(page_state ps)
             return "IN_USE";
         case AIO_PENDING:
             return "AIO_PENDING";
-        case AIO_COMPLETE:
-            return "AIO_COMPLETE";
     }
     return "<page_state unknown>";
 }



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org