You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kp...@apache.org on 2013/10/07 20:39:25 UTC

svn commit: r1530024 [4/4] - in /qpid/branches/linearstore/qpid/cpp/src: ./ qpid/linearstore/ qpid/linearstore/jrnl/ qpid/linearstore/jrnl/utils/ tests/linearstore/

Modified: qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/utils/file_hdr.h
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/utils/file_hdr.h?rev=1530024&r1=1530023&r2=1530024&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/utils/file_hdr.h (original)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/utils/file_hdr.h Mon Oct  7 18:39:24 2013
@@ -89,7 +89,7 @@ typedef struct file_hdr_t {
 
 void file_hdr_create(file_hdr_t* dest, const uint32_t magic, const uint16_t version, const uint16_t fhdr_size_sblks,
                      const uint16_t efp_partition, const uint64_t file_size);
-int file_hdr_init(file_hdr_t* dest, const uint16_t uflag, const uint64_t rid, const uint64_t fro,
+int file_hdr_init(void* dest, const uint64_t dest_len, const uint16_t uflag, const uint64_t rid, const uint64_t fro,
                   const uint64_t file_number, const uint16_t queue_name_len, const char* queue_name);
 void file_hdr_reset(file_hdr_t* target);
 int is_file_hdr_reset(file_hdr_t* target);

Modified: qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/wmgr.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/wmgr.cpp?rev=1530024&r1=1530023&r2=1530024&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/wmgr.cpp (original)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/wmgr.cpp Mon Oct  7 18:39:24 2013
@@ -28,26 +28,23 @@
 #include "qpid/linearstore/jrnl/utils/file_hdr.h"
 #include "qpid/linearstore/jrnl/jcntl.h"
 #include "qpid/linearstore/jrnl/jerrno.h"
+#include "qpid/linearstore/jrnl/JournalFile.h"
 #include <sstream>
 #include <stdint.h>
 
+//#include <iostream> // DEBUG
+
 namespace qpid
 {
 namespace qls_jrnl
 {
 
-wmgr::wmgr(jcntl* jc, enq_map& emap, txn_map& tmap/*, wrfc& wrfc*/):
+wmgr::wmgr(jcntl* jc, enq_map& emap, txn_map& tmap, LinearFileController& lfc):
         pmgr(jc, emap, tmap),
-//        _wrfc(wrfc),
+        _lfc(lfc),
         _max_dtokpp(0),
         _max_io_wait_us(0),
-        _fhdr_base_ptr(0),
-        _fhdr_ptr_arr(0),
-        _fhdr_aio_cb_arr(0),
         _cached_offset_dblks(0),
-//        _jfsize_dblks(0),
-//        _jfsize_pgs(0),
-//        _num_jfiles(0),
         _enq_busy(false),
         _deq_busy(false),
         _abort_busy(false),
@@ -55,19 +52,12 @@ wmgr::wmgr(jcntl* jc, enq_map& emap, txn
         _txn_pending_set()
 {}
 
-wmgr::wmgr(jcntl* jc, enq_map& emap, txn_map& tmap/*, wrfc& wrfc*/,
-        const uint32_t max_dtokpp, const uint32_t max_iowait_us):
+wmgr::wmgr(jcntl* jc, enq_map& emap, txn_map& tmap, LinearFileController& lfc, const uint32_t max_dtokpp, const uint32_t max_iowait_us):
         pmgr(jc, emap, tmap /* , dtoklp */),
-//        _wrfc(wrfc),
+        _lfc(lfc),
         _max_dtokpp(max_dtokpp),
         _max_io_wait_us(max_iowait_us),
-        _fhdr_base_ptr(0),
-        _fhdr_ptr_arr(0),
-        _fhdr_aio_cb_arr(0),
         _cached_offset_dblks(0),
-//        _jfsize_dblks(0),
-//        _jfsize_pgs(0),
-//        _num_jfiles(0),
         _enq_busy(false),
         _deq_busy(false),
         _abort_busy(false),
@@ -94,14 +84,10 @@ wmgr::initialize(aio_callback* const cbp
 
     initialize(cbp, wcache_pgsize_sblks, wcache_num_pages);
 
-//    _jfsize_dblks = _jc->jfsize_sblks() * JRNL_SBLK_SIZE_DBLKS;
-//    _jfsize_pgs = _jc->jfsize_sblks() / _cache_pgsize_sblks;
-//    assert(_jc->jfsize_sblks() % JRNL_RMGR_PAGE_SIZE == 0);
-
     if (eo)
     {
         const uint32_t wr_pg_size_dblks = _cache_pgsize_sblks * JRNL_SBLK_SIZE_DBLKS;
-        uint32_t data_dblks = (eo / JRNL_DBLK_SIZE) - 4; // 4 dblks for file hdr
+        uint32_t data_dblks = (eo / JRNL_DBLK_SIZE_BYTES) - 4; // 4 dblks for file hdr
         _pg_cntr = data_dblks / wr_pg_size_dblks;
         _pg_offset_dblks = data_dblks - (_pg_cntr * wr_pg_size_dblks);
     }
@@ -138,7 +124,7 @@ wmgr::enqueue(const void* const data_buf
         }
     }
 
-    uint64_t rid = (dtokp->external_rid() | cont) ? dtokp->rid() : /*_wrfc.get_incr_rid()*/0; // TODO: replace for linearstore: _wrfc
+    uint64_t rid = (dtokp->external_rid() | cont) ? dtokp->rid() : _lfc.getNextRecordId();/*_wrfc.get_incr_rid()*/
     _enq_rec.reset(rid, data_buff, tot_data_len, xid_ptr, xid_len/*, _wrfc.owi()*/, transient,
             external);
     if (!cont)
@@ -155,15 +141,15 @@ wmgr::enqueue(const void* const data_buf
     while (!done)
     {
         assert(_pg_offset_dblks < _cache_pgsize_sblks * JRNL_SBLK_SIZE_DBLKS);
-        void* wptr = (void*)((char*)_page_ptr_arr[_pg_index] + _pg_offset_dblks * JRNL_DBLK_SIZE);
+        void* wptr = (void*)((char*)_page_ptr_arr[_pg_index] + _pg_offset_dblks * JRNL_DBLK_SIZE_BYTES);
         uint32_t data_offs_dblks = dtokp->dblocks_written();
         uint32_t ret = _enq_rec.encode(wptr, data_offs_dblks,
                 (_cache_pgsize_sblks * JRNL_SBLK_SIZE_DBLKS) - _pg_offset_dblks);
 
         // Remember fid which contains the record header in case record is split over several files
-        // TODO: replace for linearstore: _wrfc
-//        if (data_offs_dblks == 0)
-//            dtokp->set_fid(_wrfc.index());
+        if (data_offs_dblks == 0) {
+            dtokp->set_fid(_lfc.getCurrentFileSeqNum());
+        }
         _pg_offset_dblks += ret;
         _cached_offset_dblks += ret;
         dtokp->incr_dblocks_written(ret);
@@ -180,7 +166,7 @@ wmgr::enqueue(const void* const data_buf
             // long multi-page messages have their token on the page containing the END of the
             // message. AIO callbacks will then only process this token when entire message is
             // enqueued.
-            //_wrfc.incr_enqcnt(dtokp->fid()); // TODO: replace for linearstore: _wrfc
+            _lfc.incrEnqueuedRecordCount();
 
             if (xid_len) // If part of transaction, add to transaction map
             {
@@ -189,7 +175,7 @@ wmgr::enqueue(const void* const data_buf
             }
             else
             {
-                if (_emap.insert_pfid(rid, dtokp->fid()) < enq_map::EMAP_OK) // fail
+                if (_emap.insert_pfid(rid, dtokp->fid(), 0) < enq_map::EMAP_OK) // fail
                 {
                     // The only error code emap::insert_pfid() returns is enq_map::EMAP_DUP_RID.
                     std::ostringstream oss;
@@ -260,15 +246,15 @@ wmgr::dequeue(data_tok* dtokp, const voi
     while (!done)
     {
         assert(_pg_offset_dblks < _cache_pgsize_sblks * JRNL_SBLK_SIZE_DBLKS);
-        void* wptr = (void*)((char*)_page_ptr_arr[_pg_index] + _pg_offset_dblks * JRNL_DBLK_SIZE);
+        void* wptr = (void*)((char*)_page_ptr_arr[_pg_index] + _pg_offset_dblks * JRNL_DBLK_SIZE_BYTES);
         uint32_t data_offs_dblks = dtokp->dblocks_written();
         uint32_t ret = _deq_rec.encode(wptr, data_offs_dblks,
                 (_cache_pgsize_sblks * JRNL_SBLK_SIZE_DBLKS) - _pg_offset_dblks);
 
         // Remember fid which contains the record header in case record is split over several files
-        // TODO: replace for linearstore: _wrfc
-//        if (data_offs_dblks == 0)
-//            dtokp->set_fid(_wrfc.index());
+        if (data_offs_dblks == 0) {
+            dtokp->set_fid(_lfc.getCurrentFileSeqNum());
+        }
         _pg_offset_dblks += ret;
         _cached_offset_dblks += ret;
         dtokp->incr_dblocks_written(ret);
@@ -290,23 +276,24 @@ wmgr::dequeue(data_tok* dtokp, const voi
             }
             else
             {
-                int16_t fid = _emap.get_remove_pfid(dtokp->dequeue_rid());
-                if (fid < enq_map::EMAP_OK) // fail
+                int16_t fid;
+                short eres = _emap.get_remove_pfid(dtokp->dequeue_rid(), fid);
+                if (eres < enq_map::EMAP_OK) // fail
                 {
-                    if (fid == enq_map::EMAP_RID_NOT_FOUND)
+                    if (eres == enq_map::EMAP_RID_NOT_FOUND)
                     {
                         std::ostringstream oss;
                         oss << std::hex << "rid=0x" << rid;
                         throw jexception(jerrno::JERR_MAP_NOTFOUND, oss.str(), "wmgr", "dequeue");
                     }
-                    if (fid == enq_map::EMAP_LOCKED)
+                    if (eres == enq_map::EMAP_LOCKED)
                     {
                         std::ostringstream oss;
                         oss << std::hex << "rid=0x" << rid;
                         throw jexception(jerrno::JERR_MAP_LOCKED, oss.str(), "wmgr", "dequeue");
                     }
                 }
-//                _wrfc.decr_enqcnt(fid); // TODO: replace for linearstore: _wrfc
+                _lfc.decrEnqueuedRecordCount();
             }
 
             done = true;
@@ -348,7 +335,7 @@ wmgr::abort(data_tok* dtokp, const void*
         }
     }
 
-    uint64_t rid = (dtokp->external_rid() | cont) ? dtokp->rid() :/* _wrfc.get_incr_rid()*/0; // TODO: replace for linearstore: _wrfc
+    uint64_t rid = (dtokp->external_rid() | cont) ? dtokp->rid() : _lfc.getNextRecordId();
     _txn_rec.reset(QLS_TXA_MAGIC, rid, xid_ptr, xid_len/*, _wrfc.owi()*/);
     if (!cont)
     {
@@ -362,15 +349,14 @@ wmgr::abort(data_tok* dtokp, const void*
     while (!done)
     {
         assert(_pg_offset_dblks < _cache_pgsize_sblks * JRNL_SBLK_SIZE_DBLKS);
-        void* wptr = (void*)((char*)_page_ptr_arr[_pg_index] + _pg_offset_dblks * JRNL_DBLK_SIZE);
+        void* wptr = (void*)((char*)_page_ptr_arr[_pg_index] + _pg_offset_dblks * JRNL_DBLK_SIZE_BYTES);
         uint32_t data_offs_dblks = dtokp->dblocks_written();
         uint32_t ret = _txn_rec.encode(wptr, data_offs_dblks,
                 (_cache_pgsize_sblks * JRNL_SBLK_SIZE_DBLKS) - _pg_offset_dblks);
 
         // Remember fid which contains the record header in case record is split over several files
-        // TODO: replace for linearstore: _wrfc
-//        if (data_offs_dblks == 0)
-//            dtokp->set_fid(_wrfc.index());
+        if (data_offs_dblks == 0)
+            dtokp->set_fid(_lfc.getCurrentFileSeqNum());
         _pg_offset_dblks += ret;
         _cached_offset_dblks += ret;
         dtokp->incr_dblocks_written(ret);
@@ -389,9 +375,8 @@ wmgr::abort(data_tok* dtokp, const void*
             {
 				if (!itr->_enq_flag)
 				    _emap.unlock(itr->_drid); // ignore rid not found error
-				 // TODO: replace for linearstore: _wrfc
-//                if (itr->_enq_flag)
-//                    _wrfc.decr_enqcnt(itr->_pfid);
+                if (itr->_enq_flag)
+                    _lfc.decrEnqueuedRecordCount(itr->_pfid);
             }
             std::pair<std::set<std::string>::iterator, bool> res = _txn_pending_set.insert(xid);
             if (!res.second)
@@ -440,7 +425,7 @@ wmgr::commit(data_tok* dtokp, const void
         }
     }
 
-    uint64_t rid = (dtokp->external_rid() | cont) ? dtokp->rid() : /*_wrfc.get_incr_rid()*/0;  // TODO: replace for linearstore: _wrfc
+    uint64_t rid = (dtokp->external_rid() | cont) ? dtokp->rid() : _lfc.getNextRecordId();
     _txn_rec.reset(QLS_TXC_MAGIC, rid, xid_ptr, xid_len/*, _wrfc.owi()*/);
     if (!cont)
     {
@@ -454,15 +439,14 @@ wmgr::commit(data_tok* dtokp, const void
     while (!done)
     {
         assert(_pg_offset_dblks < _cache_pgsize_sblks * JRNL_SBLK_SIZE_DBLKS);
-        void* wptr = (void*)((char*)_page_ptr_arr[_pg_index] + _pg_offset_dblks * JRNL_DBLK_SIZE);
+        void* wptr = (void*)((char*)_page_ptr_arr[_pg_index] + _pg_offset_dblks * JRNL_DBLK_SIZE_BYTES);
         uint32_t data_offs_dblks = dtokp->dblocks_written();
         uint32_t ret = _txn_rec.encode(wptr, data_offs_dblks,
                 (_cache_pgsize_sblks * JRNL_SBLK_SIZE_DBLKS) - _pg_offset_dblks);
 
         // Remember fid which contains the record header in case record is split over several files
-        // TODO: replace for linearstore: _wrfc
-//        if (data_offs_dblks == 0)
-//            dtokp->set_fid(_wrfc.index());
+        if (data_offs_dblks == 0)
+            dtokp->set_fid(_lfc.getCurrentFileSeqNum());
         _pg_offset_dblks += ret;
         _cached_offset_dblks += ret;
         dtokp->incr_dblocks_written(ret);
@@ -481,7 +465,7 @@ wmgr::commit(data_tok* dtokp, const void
             {
                 if (itr->_enq_flag) // txn enqueue
                 {
-                    if (_emap.insert_pfid(itr->_rid, itr->_pfid) < enq_map::EMAP_OK) // fail
+                    if (_emap.insert_pfid(itr->_rid, itr->_pfid, 0) < enq_map::EMAP_OK) // fail
                     {
                         // The only error code emap::insert_pfid() returns is enq_map::EMAP_DUP_RID.
                         std::ostringstream oss;
@@ -491,23 +475,24 @@ wmgr::commit(data_tok* dtokp, const void
                 }
                 else // txn dequeue
                 {
-                    int16_t fid = _emap.get_remove_pfid(itr->_drid, true);
-                    if (fid < enq_map::EMAP_OK) // fail
+                    int16_t fid;
+                    short eres = _emap.get_remove_pfid(itr->_drid, fid, true);
+                    if (eres < enq_map::EMAP_OK) // fail
                     {
-                        if (fid == enq_map::EMAP_RID_NOT_FOUND)
+                        if (eres == enq_map::EMAP_RID_NOT_FOUND)
                         {
                             std::ostringstream oss;
                             oss << std::hex << "rid=0x" << rid;
                             throw jexception(jerrno::JERR_MAP_NOTFOUND, oss.str(), "wmgr", "dequeue");
                         }
-                        if (fid == enq_map::EMAP_LOCKED)
+                        if (eres == enq_map::EMAP_LOCKED)
                         {
                             std::ostringstream oss;
                             oss << std::hex << "rid=0x" << rid;
                             throw jexception(jerrno::JERR_MAP_LOCKED, oss.str(), "wmgr", "dequeue");
                         }
                     }
-//                    _wrfc.decr_enqcnt(fid); // TODO: replace for linearstore: _wrfc
+                    _lfc.decrEnqueuedRecordCount(fid);
                 }
             }
             std::pair<std::set<std::string>::iterator, bool> res = _txn_pending_set.insert(xid);
@@ -532,30 +517,26 @@ wmgr::commit(data_tok* dtokp, const void
 }
 
 void
-wmgr::file_header_check(const uint64_t /*rid*/, const bool /*cont*/, const uint32_t /*rec_dblks_rem*/)
+wmgr::file_header_check(const uint64_t rid, const bool cont, const uint32_t rec_dblks_rem)
 {
-    // Has the file header been written (i.e. write pointers still at 0)?
-    // TODO: replace for linearstore: _wrfc
-/*
-    if (_wrfc.is_void()) // TODO: replace for linearstore: _wrfc
+    if (_lfc.isEmpty()) // File never written (i.e. no header or data)
     {
-        bool file_fit = rec_dblks_rem <= _jfsize_dblks;
-        bool file_full = rec_dblks_rem == _jfsize_dblks;
         std::size_t fro = 0;
-        if (cont)
-        {
-            if (file_fit && !file_full)
-                fro = (rec_dblks_rem + JRNL_SBLK_SIZE_DBLKS) * JRNL_DBLK_SIZE;
+        if (cont) {
+            bool file_fit = rec_dblks_rem <= _lfc.dataSize_sblks() * JRNL_SBLK_SIZE_DBLKS; // Will fit within this journal file
+            bool file_full = rec_dblks_rem == _lfc.dataSize_sblks() * JRNL_SBLK_SIZE_DBLKS; // Will exactly fill this journal file
+            if (file_fit && !file_full) {
+                fro = (rec_dblks_rem + (QLS_JRNL_FHDR_RES_SIZE_SBLKS * JRNL_SBLK_SIZE_DBLKS)) * JRNL_DBLK_SIZE_BYTES;
+            }
+        } else {
+            fro = QLS_JRNL_FHDR_RES_SIZE_SBLKS * JRNL_SBLK_SIZE_BYTES;
         }
-        else
-            fro = JRNL_SBLK_SIZE;
-        write_fhdr(rid, _wrfc.index(), _wrfc.index(), fro); // TODO: replace for linearstore: _wrfc
+        _lfc.asyncFileHeaderWrite(_ioctx, 0, rid, fro);
     }
-*/
 }
 
 void
-wmgr::flush_check(iores& res, bool& /*cont*/, bool& done)
+wmgr::flush_check(iores& res, bool& cont, bool& done)
 {
     // Is page is full, flush
     if (_pg_offset_dblks >= _cache_pgsize_sblks * JRNL_SBLK_SIZE_DBLKS)
@@ -569,22 +550,15 @@ wmgr::flush_check(iores& res, bool& /*co
             done = true;
         }
 
-/*
         // If file is full, rotate to next file
-        if (_pg_cntr >= _jfsize_pgs)
+        uint32_t fileSize_pgs = _lfc.fileSize_sblks() / _cache_pgsize_sblks;
+        if (_pg_cntr >= fileSize_pgs)
         {
-            iores rfres = rotate_file();
-            if (rfres != RHM_IORES_SUCCESS)
-                res = rfres;
-            if (!done)
-            {
-                if (rfres == RHM_IORES_SUCCESS)
-                    cont = true;
-                else
-                    done = true;
+            get_next_file();
+            if (!done) {
+                cont = true;
             }
         }
-*/
     }
 }
 
@@ -592,14 +566,10 @@ iores
 wmgr::flush()
 {
     iores res = write_flush();
-/*
-    if (_pg_cntr >= _jfsize_pgs)
-    {
-        iores rfres = rotate_file();
-        if (rfres != RHM_IORES_SUCCESS)
-            res = rfres;
+    uint32_t fileSize_pgs = _lfc.fileSize_sblks() / _cache_pgsize_sblks;
+    if (res == RHM_IORES_SUCCESS && _pg_cntr >= fileSize_pgs) {
+        get_next_file();
     }
-*/
     return res;
 }
 
@@ -607,7 +577,6 @@ iores
 wmgr::write_flush()
 {
     iores res = RHM_IORES_SUCCESS;
-/*
     // Don't bother flushing an empty page or one that is still in state AIO_PENDING
     if (_cached_offset_dblks)
     {
@@ -629,18 +598,9 @@ wmgr::write_flush()
             // if necessary.
             dblk_roundup();
 
-            std::size_t pg_offs = (_pg_offset_dblks - _cached_offset_dblks) * JRNL_DBLK_SIZE;
+            std::size_t pg_offs = (_pg_offset_dblks - _cached_offset_dblks) * JRNL_DBLK_SIZE_BYTES;
             aio_cb* aiocbp = &_aio_cb_arr[_pg_index];
-            aio::prep_pwrite_2(aiocbp, _wrfc.fh(),
-                (char*)_page_ptr_arr[_pg_index] + pg_offs, _cached_offset_dblks * JRNL_DBLK_SIZE,
-                _wrfc.subm_offs());
-            page_cb* pcbp = (page_cb*)(aiocbp->data); // This page control block (pcb)
-            pcbp->_wdblks = _cached_offset_dblks;
-            pcbp->_wfh = _wrfc.file_controller();
-            if (aio::submit(_ioctx, 1, &aiocbp) < 0)
-                throw jexception(jerrno::JERR__AIO, "wmgr", "write_flush");
-            _wrfc.add_subm_cnt_dblks(_cached_offset_dblks);
-            _wrfc.incr_aio_cnt();
+            _lfc.asyncPageWrite(_ioctx, aiocbp, (char*)_page_ptr_arr[_pg_index] + pg_offs, _cached_offset_dblks);
             _aio_evt_rem++;
             _cached_offset_dblks = 0;
             _jc->instr_incr_outstanding_aio_cnt();
@@ -653,21 +613,14 @@ wmgr::write_flush()
     get_events(UNUSED, 0);
     if (_page_cb_arr[_pg_index]._state == UNUSED)
         _page_cb_arr[_pg_index]._state = IN_USE;
-*/
     return res;
 }
 
-iores
-wmgr::rotate_file()
+void
+wmgr::get_next_file()
 {
-    // TODO: replace for linearstore: _wrfc
-/*
     _pg_cntr = 0;
-    iores res = _wrfc.rotate();
-    _jc->chk_wr_frot();
-    return res;
-*/
-    return RHM_IORES_SUCCESS;
+    _lfc.pullEmptyFileFromEfp();
 }
 
 int32_t
@@ -702,17 +655,15 @@ wmgr::get_events(page_state state, times
         aio_cb* aiocbp = _aio_event_arr[i].obj; // This I/O control block (iocb)
         page_cb* pcbp = (page_cb*)(aiocbp->data); // This page control block (pcb)
         long aioret = (long)_aio_event_arr[i].res;
-        if (aioret < 0)
-        {
+        if (aioret < 0) {
             std::ostringstream oss;
             oss << "AIO write operation failed: " << std::strerror(-aioret) << " (" << aioret << ") [";
-            if (pcbp)
+            if (pcbp) {
                 oss << "pg=" << pcbp->_index;
-            else
-            {
-                // TODO: replace for linearstore: fhp->_pfid
-//                file_hdr_t* fhp = (file_hdr_t*)aiocbp->u.c.buf;
-//                oss << "fid=" << fhp->_pfid;
+            } else {
+                file_hdr_t* fhp = (file_hdr_t*)aiocbp->u.c.buf;
+                oss << "fnum=" << fhp->_file_number;
+                oss << " qname=" << std::string((char*)fhp + sizeof(file_hdr_t), fhp->_queue_name_len);
             }
             oss << " size=" << aiocbp->u.c.nbytes;
             oss << " offset=" << aiocbp->u.c.offset << " fh=" << aiocbp->aio_fildes << "]";
@@ -790,16 +741,15 @@ wmgr::get_events(page_state state, times
                         oss << "dtok_id=" << dtokp->id() << " dtok_state=" << dtokp->wstate_str();
                         throw jexception(jerrno::JERR_WMGR_BADDTOKSTATE, oss.str(), "wmgr",
                                 "get_events");
-                    } // switch
-                } // if
-            } // for
+                    }
+                }
+            }
 
             // Increment the completed write offset
             // NOTE: We cannot use _wrfc here, as it may have rotated since submitting count.
             // Use stored pointer to fcntl in the pcb instead.
-            // TODO: replace for linearstore:  pcbp->_wfh
-//            pcbp->_wfh->add_wr_cmpl_cnt_dblks(pcbp->_wdblks);
-//            pcbp->_wfh->decr_aio_cnt();
+            pcbp->_jfp->addCompletedDblkCount(pcbp->_wdblks);
+            pcbp->_jfp->decrOutstandingAioOperationCount();
             _jc->instr_decr_outstanding_aio_cnt();
 
             // Clean up this pcb's data_tok list
@@ -812,16 +762,10 @@ wmgr::get_events(page_state state, times
         }
         else // File header writes have no pcb
         {
-            // get lfid from original file header record, update info for that lfid
-            // TODO: replace for linearstore: lfid
-/*
-            file_hdr_t* fhp = (file_hdr*)aiocbp->u.c.buf;
-            uint32_t lfid = fhp->_lfid;
-            fcntl* fcntlp = _jc->get_fcntlp(lfid);
-            fcntlp->add_wr_cmpl_cnt_dblks(JRNL_SBLK_SIZE_DBLKS);
-            fcntlp->decr_aio_cnt();
-            fcntlp->set_wr_fhdr_aio_outstanding(false);
-*/
+            file_hdr_t* fhp = (file_hdr_t*)aiocbp->u.c.buf;
+            _lfc.addWriteCompletedDblkCount(fhp->_file_number, QLS_JRNL_FHDR_RES_SIZE_SBLKS * JRNL_SBLK_SIZE_DBLKS);
+            _lfc.decrOutstandingAioOperationCount(fhp->_file_number);
+            //fcntlp->set_wr_fhdr_aio_outstanding(false); // TODO: Do we need this?
         }
     }
 
@@ -840,35 +784,15 @@ wmgr::is_txn_synced(const std::string& x
 }
 
 void
-wmgr::initialize(aio_callback* const /*cbp*/, const uint32_t /*wcache_pgsize_sblks*/, const uint16_t /*wcache_num_pages*/)
+wmgr::initialize(aio_callback* const cbp, const uint32_t wcache_pgsize_sblks, const uint16_t wcache_num_pages)
 {
-/*
+
     pmgr::initialize(cbp, wcache_pgsize_sblks, wcache_num_pages);
     wmgr::clean();
-    _num_jfiles = _jc->num_jfiles(); // TODO: replace for linearstore: _jc->num_jfiles()
-    if (::posix_memalign(&_fhdr_base_ptr, _sblksize, _sblksize * _num_jfiles))
-    {
-        wmgr::clean();
-        std::ostringstream oss;
-        oss << "posix_memalign(): blksize=" << _sblksize << " size=" << _sblksize;
-        oss << FORMAT_SYSERR(errno);
-        throw jexception(jerrno::JERR__MALLOC, oss.str(), "wmgr", "initialize");
-    }
-    _fhdr_ptr_arr = (void**)std::malloc(_num_jfiles * sizeof(void*));
-    MALLOC_CHK(_fhdr_ptr_arr, "_fhdr_ptr_arr", "wmgr", "initialize");
-    _fhdr_aio_cb_arr = (aio_cb**)std::malloc(sizeof(aio_cb*) * _num_jfiles);
-    MALLOC_CHK(_fhdr_aio_cb_arr, "_fhdr_aio_cb_arr", "wmgr", "initialize");
-    std::memset(_fhdr_aio_cb_arr, 0, sizeof(aio_cb*) * _num_jfiles);
-    for (uint16_t i=0; i<_num_jfiles; i++)
-    {
-        _fhdr_ptr_arr[i] = (void*)((char*)_fhdr_base_ptr + _sblksize * i);
-        _fhdr_aio_cb_arr[i] = new aio_cb;
-    }
     _page_cb_arr[0]._state = IN_USE;
     _ddtokl.clear();
     _cached_offset_dblks = 0;
     _enq_busy = false;
-*/
 }
 
 iores
@@ -877,7 +801,7 @@ wmgr::pre_write_check(const _op_type op,
         ) const
 {
     // Check status of current file
-    // TODO: replace for linearstore: _wrfc
+    // TODO: Replace for LFC
 /*
     if (!_wrfc.is_wr_reset())
     {
@@ -907,13 +831,6 @@ wmgr::pre_write_check(const _op_type op,
     {
         case WMGR_ENQUEUE:
             {
-                // Check for enqueue reaching cutoff threshold
-                // TODO: replace for linearstore: _wrfc
-/*
-                uint32_t size_dblks = jrec::size_dblks(enq_rec::rec_size(xidsize, dsize, external));
-                if (!_enq_busy && _wrfc.enq_threshold(_cached_offset_dblks + size_dblks))
-                    return RHM_IORES_ENQCAPTHRESH;
-*/
                 if (!dtokp->is_writable())
                 {
                     std::ostringstream oss;
@@ -948,25 +865,22 @@ wmgr::dequeue_check(const std::string& x
 {
     // First check emap
     bool found = false;
-    int16_t fid = _emap.get_pfid(drid);
-    if (fid < enq_map::EMAP_OK) // fail
-    {
-        if (fid == enq_map::EMAP_RID_NOT_FOUND)
-        {
-            if (xid.size())
+    int16_t fid;
+    short eres = _emap.get_pfid(drid, fid);
+    if (eres < enq_map::EMAP_OK) { // fail
+        if (eres == enq_map::EMAP_RID_NOT_FOUND) {
+            if (xid.size()) {
                 found = _tmap.data_exists(xid, drid);
-        }
-        else if (fid == enq_map::EMAP_LOCKED)
-        {
+            }
+        } else if (eres == enq_map::EMAP_LOCKED) {
             std::ostringstream oss;
             oss << std::hex << "drid=0x" << drid;
             throw jexception(jerrno::JERR_MAP_LOCKED, oss.str(), "wmgr", "dequeue_check");
         }
-    }
-    else
+    } else {
         found = true;
-    if (!found)
-    {
+    }
+    if (!found) {
         std::ostringstream oss;
         oss << "jrnl=" << _jc->id() << " drid=0x" << std::hex << drid;
         throw jexception(jerrno::JERR_WMGR_DEQRIDNOTENQ, oss.str(), "wmgr", "dequeue_check");
@@ -980,10 +894,10 @@ wmgr::dblk_roundup()
     uint32_t wdblks = jrec::size_blks(_cached_offset_dblks, JRNL_SBLK_SIZE_DBLKS) * JRNL_SBLK_SIZE_DBLKS;
     while (_cached_offset_dblks < wdblks)
     {
-        void* wptr = (void*)((char*)_page_ptr_arr[_pg_index] + _pg_offset_dblks * JRNL_DBLK_SIZE);
+        void* wptr = (void*)((char*)_page_ptr_arr[_pg_index] + _pg_offset_dblks * JRNL_DBLK_SIZE_BYTES);
         std::memcpy(wptr, (const void*)&xmagic, sizeof(xmagic));
 #ifdef RHM_CLEAN
-        std::memset((char*)wptr + sizeof(xmagic), RHM_CLEAN_CHAR, JRNL_DBLK_SIZE - sizeof(xmagic));
+        std::memset((char*)wptr + sizeof(xmagic), RHM_CLEAN_CHAR, JRNL_DBLK_SIZE_BYTES - sizeof(xmagic));
 #endif
         _pg_offset_dblks++;
         _cached_offset_dblks++;
@@ -991,28 +905,6 @@ wmgr::dblk_roundup()
 }
 
 void
-wmgr::write_fhdr(uint64_t rid, uint16_t fid, uint16_t /*lid*/, std::size_t fro)
-{
-    file_hdr_t fhdr/*(QLS_FILE_MAGIC, QLS_JRNL_VERSION, rid, fid, lid, fro, _wrfc.owi(), true)*/;
-    /*int err =*/ ::file_hdr_init(&fhdr, 0, rid, fro, 0, _jc->id().length(), _jc->id().c_str());
-    std::memcpy(_fhdr_ptr_arr[fid], &fhdr, sizeof(fhdr));
-#ifdef RHM_CLEAN
-    std::memset((char*)_fhdr_ptr_arr[fid] + sizeof(fhdr), RHM_CLEAN_CHAR, _sblksize - sizeof(fhdr));
-#endif
-    aio_cb* aiocbp = _fhdr_aio_cb_arr[fid];
-//    aio::prep_pwrite(aiocbp, _wrfc.fh(), _fhdr_ptr_arr[fid], _sblksize, 0); // TODO: replace for linearstore: _wrfc
-    if (aio::submit(_ioctx, 1, &aiocbp) < 0)
-        throw jexception(jerrno::JERR__AIO, "wmgr", "write_fhdr");
-    _aio_evt_rem++;
-    // TODO: replace for linearstore: _wrfc
-/*
-    _wrfc.add_subm_cnt_dblks(JRNL_SBLK_SIZE_DBLKS);
-    _wrfc.incr_aio_cnt();
-    _wrfc.file_controller()->set_wr_fhdr_aio_outstanding(true);
-*/
-}
-
-void
 wmgr::rotate_page()
 {
     _page_cb_arr[_pg_index]._state = AIO_PENDING;
@@ -1026,21 +918,8 @@ wmgr::rotate_page()
 }
 
 void
-wmgr::clean()
-{
-    std::free(_fhdr_base_ptr);
-    _fhdr_base_ptr = 0;
-
-    std::free(_fhdr_ptr_arr);
-    _fhdr_ptr_arr = 0;
-
-    if (_fhdr_aio_cb_arr)
-    {
-//        for (uint32_t i=0; i<_num_jfiles; i++)
-//            delete _fhdr_aio_cb_arr[i];
-        std::free(_fhdr_aio_cb_arr);
-        _fhdr_aio_cb_arr = 0;
-    }
+wmgr::clean() {
+    // Clean up allocated memory here
 }
 
 const std::string
@@ -1063,7 +942,7 @@ wmgr::status_str() const
             default:            oss << _page_cb_arr[i]._state;
         }
     }
-    oss << "] " /*<< _wrfc.status_str()*/; // TODO: replace for linearstore: _wrfc
+    oss << "] " << _lfc.status(0);
     return oss.str();
 }
 

Modified: qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/wmgr.h
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/wmgr.h?rev=1530024&r1=1530023&r2=1530024&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/wmgr.h (original)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/wmgr.h Mon Oct  7 18:39:24 2013
@@ -30,15 +30,18 @@ class wmgr;
 }}
 
 #include <cstring>
+#include "qpid/linearstore/jrnl/EmptyFilePoolTypes.h"
 #include "qpid/linearstore/jrnl/enums.h"
 #include "qpid/linearstore/jrnl/pmgr.h"
-//#include "qpid/linearstore/jrnl/wrfc.h"
 #include <set>
 
+class file_hdr_t;
+
 namespace qpid
 {
 namespace qls_jrnl
 {
+    class LinearFileController;
 
     /**
     * \brief Class for managing a write page cache of arbitrary size and number of pages.
@@ -59,17 +62,11 @@ namespace qls_jrnl
     class wmgr : public pmgr
     {
     private:
-//        wrfc& _wrfc;                    ///< Ref to write rotating file controller
+        LinearFileController& _lfc;     ///< Linear File Controller ref
         uint32_t _max_dtokpp;           ///< Max data writes per page
         uint32_t _max_io_wait_us;       ///< Max wait in microseconds till submit
-        void* _fhdr_base_ptr;           ///< Base pointer to file header memory
-        void** _fhdr_ptr_arr;           ///< Array of pointers to file headers memory
-        aio_cb** _fhdr_aio_cb_arr;      ///< Array of iocb pointers for file header writes
         uint32_t _cached_offset_dblks;  ///< Amount of unwritten data in page (dblocks)
         std::deque<data_tok*> _ddtokl;  ///< Deferred dequeue data_tok list
-//        uint32_t _jfsize_dblks;         ///< Journal file size in dblks (NOT sblks!)
-//        uint32_t _jfsize_pgs;           ///< Journal file size in cache pages
-//        uint16_t _num_jfiles;           ///< Number of files used in iocb mallocs
 
         // TODO: Convert _enq_busy etc into a proper threadsafe lock
         // TODO: Convert to enum? Are these encodes mutually exclusive?
@@ -87,9 +84,8 @@ namespace qls_jrnl
         std::set<std::string> _txn_pending_set; ///< Set containing xids of pending commits/aborts
 
     public:
-        wmgr(jcntl* jc, enq_map& emap, txn_map& tmap/*, wrfc& wrfc*/);
-        wmgr(jcntl* jc, enq_map& emap, txn_map& tmap/*, wrfc& wrfc*/, const uint32_t max_dtokpp,
-                const uint32_t max_iowait_us);
+        wmgr(jcntl* jc, enq_map& emap, txn_map& tmap, LinearFileController& lfc);
+        wmgr(jcntl* jc, enq_map& emap, txn_map& tmap, LinearFileController& lfc, const uint32_t max_dtokpp, const uint32_t max_iowait_us);
         virtual ~wmgr();
 
         void initialize(aio_callback* const cbp, const uint32_t wcache_pgsize_sblks,
@@ -106,7 +102,6 @@ namespace qls_jrnl
         int32_t get_events(page_state state, timespec* const timeout, bool flush = false);
         bool is_txn_synced(const std::string& xid);
         inline bool curr_pg_blocked() const { return _page_cb_arr[_pg_index]._state != UNUSED; }
-//        inline bool curr_file_blocked() const; { return _wrfc.aio_cnt() > 0; }
         inline uint32_t unflushed_dblks() { return _cached_offset_dblks; }
 
         // Debug aid
@@ -122,9 +117,8 @@ namespace qls_jrnl
         void file_header_check(const uint64_t rid, const bool cont, const uint32_t rec_dblks_rem);
         void flush_check(iores& res, bool& cont, bool& done);
         iores write_flush();
-        iores rotate_file();
+        void get_next_file();
         void dblk_roundup();
-        void write_fhdr(uint64_t rid, uint16_t fid, uint16_t lid, std::size_t fro);
         void rotate_page();
         void clean();
     };

Added: qpid/branches/linearstore/qpid/cpp/src/tests/linearstore/linearstoredirsetup.sh
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/tests/linearstore/linearstoredirsetup.sh?rev=1530024&view=auto
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/tests/linearstore/linearstoredirsetup.sh (added)
+++ qpid/branches/linearstore/qpid/cpp/src/tests/linearstore/linearstoredirsetup.sh Mon Oct  7 18:39:24 2013
@@ -0,0 +1,24 @@
+#!/bin/bash
+
+STORE_DIR=/tmp
+LINEARSTOREDIR=~/RedHat/linearstore
+
+rm -rf $STORE_DIR/qls
+rm -rf $STORE_DIR/p002
+rm $STORE_DIR/p004
+
+mkdir $STORE_DIR/qls
+mkdir $STORE_DIR/p002
+touch $STORE_DIR/p004
+mkdir $STORE_DIR/qls/p001
+touch $STORE_DIR/qls/p003
+ln -s $STORE_DIR/p002 $STORE_DIR/qls/p002
+ln -s $STORE_DIR/p004 $STORE_DIR/qls/p004
+
+${LINEARSTOREDIR}/tools/src/py/linearstore/efptool.py $STORE_DIR/qls/ -a -p 1 -s 2048 -n 25
+${LINEARSTOREDIR}/tools/src/py/linearstore/efptool.py $STORE_DIR/qls/ -a -p 1 -s 512 -n 25
+${LINEARSTOREDIR}/tools/src/py/linearstore/efptool.py $STORE_DIR/qls/ -a -p 2 -s 2048 -n 25
+
+${LINEARSTOREDIR}/tools/src/py/linearstore/efptool.py $STORE_DIR/qls/ -l
+tree -la $STORE_DIR/qls
+

Propchange: qpid/branches/linearstore/qpid/cpp/src/tests/linearstore/linearstoredirsetup.sh
------------------------------------------------------------------------------
    svn:executable = *



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org