You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ch...@apache.org on 2012/12/19 21:34:58 UTC

svn commit: r1424091 [8/9] - in /qpid/trunk/qpid/cpp/src: ./ qpid/legacystore/ qpid/legacystore/jrnl/

Added: qpid/trunk/qpid/cpp/src/qpid/legacystore/jrnl/rmgr.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/legacystore/jrnl/rmgr.cpp?rev=1424091&view=auto
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/legacystore/jrnl/rmgr.cpp (added)
+++ qpid/trunk/qpid/cpp/src/qpid/legacystore/jrnl/rmgr.cpp Wed Dec 19 20:34:56 2012
@@ -0,0 +1,701 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+/**
+ * \file rmgr.cpp
+ *
+ * Qpid asynchronous store plugin library
+ *
+ * File containing code for class mrg::journal::rmgr (read manager). See
+ * comments in file rmgr.hpp for details.
+ *
+ * \author Kim van der Riet
+ *
+ * Copyright (c) 2007, 2008, 2009, 2010 Red Hat, Inc.
+ *
+ */
+
+#include "jrnl/rmgr.hpp"
+
+#include <cassert>
+#include <cerrno>
+#include <cstdlib>
+#include "jrnl/jcntl.hpp"
+#include "jrnl/jerrno.hpp"
+#include <sstream>
+
+namespace mrg
+{
+namespace journal
+{
+
+rmgr::rmgr(jcntl* jc, enq_map& emap, txn_map& tmap, rrfc& rrfc):
+        pmgr(jc, emap, tmap),
+        _rrfc(rrfc),
+        _hdr(),
+        _fhdr_buffer(0),
+        _fhdr_aio_cb_ptr(0),
+        _fhdr_rd_outstanding(false)
+{}
+
+rmgr::~rmgr()
+{
+    rmgr::clean();
+}
+
+void
+rmgr::initialize(aio_callback* const cbp)
+{
+    pmgr::initialize(cbp, JRNL_RMGR_PAGE_SIZE, JRNL_RMGR_PAGES);
+    clean();
+    // Allocate memory for reading file header
+    if (::posix_memalign(&_fhdr_buffer, _sblksize, _sblksize))
+    {
+        std::ostringstream oss;
+        oss << "posix_memalign(): blksize=" << _sblksize << " size=" << _sblksize;
+        oss << FORMAT_SYSERR(errno);
+        throw jexception(jerrno::JERR__MALLOC, oss.str(), "rmgr", "initialize");
+    }
+    _fhdr_aio_cb_ptr = new aio_cb;
+    std::memset(_fhdr_aio_cb_ptr, 0, sizeof(aio_cb*));
+}
+
+void
+rmgr::clean()
+{
+    std::free(_fhdr_buffer);
+    _fhdr_buffer = 0;
+
+    if (_fhdr_aio_cb_ptr)
+    {
+        delete _fhdr_aio_cb_ptr;
+        _fhdr_aio_cb_ptr = 0;
+    }
+}
+
+iores
+rmgr::read(void** const datapp, std::size_t& dsize, void** const xidpp, std::size_t& xidsize,
+        bool& transient, bool& external, data_tok* dtokp,  bool ignore_pending_txns)
+{
+    iores res = pre_read_check(dtokp);
+    if (res != RHM_IORES_SUCCESS)
+    {
+        set_params_null(datapp, dsize, xidpp, xidsize);
+        return res;
+    }
+
+    if (dtokp->rstate() == data_tok::SKIP_PART)
+    {
+        if (_page_cb_arr[_pg_index]._state != AIO_COMPLETE)
+        {
+            aio_cycle();   // check if rd AIOs returned; initiate new reads if possible
+            return RHM_IORES_PAGE_AIOWAIT;
+        }
+        const iores res = skip(dtokp);
+        if (res != RHM_IORES_SUCCESS)
+        {
+            set_params_null(datapp, dsize, xidpp, xidsize);
+            return res;
+        }
+    }
+    if (dtokp->rstate() == data_tok::READ_PART)
+    {
+        assert(dtokp->rid() == _hdr._rid);
+        void* rptr = (void*)((char*)_page_ptr_arr[_pg_index] + (_pg_offset_dblks * JRNL_DBLK_SIZE));
+        const iores res = read_enq(_hdr, rptr, dtokp);
+        dsize = _enq_rec.get_data(datapp);
+        xidsize = _enq_rec.get_xid(xidpp);
+        transient = _enq_rec.is_transient();
+        external = _enq_rec.is_external();
+        return res;
+    }
+
+    set_params_null(datapp, dsize, xidpp, xidsize);
+    _hdr.reset();
+    // Read header, determine next record type
+    while (true)
+    {
+        if(dblks_rem() == 0 && _rrfc.is_compl() && !_rrfc.is_wr_aio_outstanding())
+        {
+            aio_cycle();   // check if rd AIOs returned; initiate new reads if possible
+            if(dblks_rem() == 0 && _rrfc.is_compl() && !_rrfc.is_wr_aio_outstanding())
+            {
+                if (_jc->unflushed_dblks() > 0)
+                    _jc->flush();
+                else if (!_aio_evt_rem)
+                    return RHM_IORES_EMPTY;
+            }
+        }
+        if (_page_cb_arr[_pg_index]._state != AIO_COMPLETE)
+        {
+            aio_cycle();
+            return RHM_IORES_PAGE_AIOWAIT;
+        }
+        void* rptr = (void*)((char*)_page_ptr_arr[_pg_index] + (_pg_offset_dblks * JRNL_DBLK_SIZE));
+        std::memcpy(&_hdr, rptr, sizeof(rec_hdr));
+        switch (_hdr._magic)
+        {
+            case RHM_JDAT_ENQ_MAGIC:
+            {
+                _enq_rec.reset(); // sets enqueue rec size
+                // Check if RID of this rec is still enqueued, if so read it, else skip
+                bool is_enq = false;
+                int16_t fid = _emap.get_pfid(_hdr._rid);
+                if (fid < enq_map::EMAP_OK)
+                {
+                    bool enforce_txns = !_jc->is_read_only() && !ignore_pending_txns;
+                    // Block read for transactionally locked record (only when not recovering)
+                    if (fid == enq_map::EMAP_LOCKED && enforce_txns)
+                        return RHM_IORES_TXPENDING;
+
+                    // (Recover mode only) Ok, not in emap - now search tmap, if present then read
+                    is_enq = _tmap.is_enq(_hdr._rid);
+                    if (enforce_txns && is_enq)
+                        return RHM_IORES_TXPENDING;
+                }
+                else
+                    is_enq = true;
+
+                if (is_enq) // ok, this record is enqueued, check it, then read it...
+                {
+                    if (dtokp->rid())
+                    {
+                        if (_hdr._rid != dtokp->rid())
+                        {
+                            std::ostringstream oss;
+                            oss << std::hex << "rid=0x" << _hdr._rid << "; dtok_rid=0x" << dtokp->rid()
+                                << "; dtok_id=0x" << dtokp->id();
+                            throw jexception(jerrno::JERR_RMGR_RIDMISMATCH, oss.str(), "rmgr", "read");
+                        }
+                    }
+                    else
+                        dtokp->set_rid(_hdr._rid);
+
+// TODO: Add member _fid to pmgr::page_cb which indicates the fid from which this page was
+// populated. When this value is set in wmgr::flush() somewehere, then uncomment the following
+// check:
+//                     if (fid != _page_cb_arr[_pg_index]._fid)
+//                     {
+//                         std::ostringstream oss;
+//                         oss << std::hex << std::setfill('0');
+//                         oss << "rid=0x" << std::setw(16) << _hdr._rid;
+//                         oss << "; emap_fid=0x" << std::setw(4) << fid;
+//                         oss << "; current_fid=" << _rrfc.fid();
+//                         throw jexception(jerrno::JERR_RMGR_FIDMISMATCH, oss.str(), "rmgr",
+//                              "read");
+//                     }
+
+                    const iores res = read_enq(_hdr, rptr, dtokp);
+                    dsize = _enq_rec.get_data(datapp);
+                    xidsize = _enq_rec.get_xid(xidpp);
+                    transient = _enq_rec.is_transient();
+                    external = _enq_rec.is_external();
+                    return res;
+                }
+                else // skip this record, it is already dequeued
+                    consume_xid_rec(_hdr, rptr, dtokp);
+                break;
+            }
+            case RHM_JDAT_DEQ_MAGIC:
+                consume_xid_rec(_hdr, rptr, dtokp);
+                break;
+            case RHM_JDAT_TXA_MAGIC:
+                consume_xid_rec(_hdr, rptr, dtokp);
+                break;
+            case RHM_JDAT_TXC_MAGIC:
+                consume_xid_rec(_hdr, rptr, dtokp);
+                break;
+            case RHM_JDAT_EMPTY_MAGIC:
+                consume_filler();
+                break;
+            default:
+                return RHM_IORES_EMPTY;
+        }
+    }
+}
+
+int32_t
+rmgr::get_events(page_state state, timespec* const timeout, bool flush)
+{
+    if (_aio_evt_rem == 0) // no events to get
+        return 0;
+
+    int32_t ret;
+    if ((ret = aio::getevents(_ioctx, flush ? _aio_evt_rem : 1, _aio_evt_rem/*_cache_num_pages + _jc->num_jfiles()*/, _aio_event_arr, timeout)) < 0)
+    {
+        if (ret == -EINTR) // Interrupted by signal
+            return 0;
+        std::ostringstream oss;
+        oss << "io_getevents() failed: " << std::strerror(-ret) << " (" << ret << ")";
+        throw jexception(jerrno::JERR__AIO, oss.str(), "rmgr", "get_events");
+    }
+    if (ret == 0 && timeout)
+        return jerrno::AIO_TIMEOUT;
+
+    std::vector<u_int16_t> pil;
+    pil.reserve(ret);
+    for (int i=0; i<ret; i++) // Index of returned AIOs
+    {
+        if (_aio_evt_rem == 0)
+        {
+            std::ostringstream oss;
+            oss << "_aio_evt_rem; evt " << (i + 1) << " of " << ret;
+            throw jexception(jerrno::JERR__UNDERFLOW, oss.str(), "rmgr", "get_events");
+        }
+        _aio_evt_rem--;
+        aio_cb* aiocbp = _aio_event_arr[i].obj; // This I/O control block (iocb)
+        page_cb* pcbp = (page_cb*)(aiocbp->data); // This page control block (pcb)
+        long aioret = (long)_aio_event_arr[i].res;
+        if (aioret < 0)
+        {
+            std::ostringstream oss;
+            oss << "AIO read operation failed: " << std::strerror(-aioret) << " (" << aioret << ")";
+            oss << " [pg=" << pcbp->_index << " buf=" << aiocbp->u.c.buf;
+            oss << " rsize=0x" << std::hex << aiocbp->u.c.nbytes;
+            oss << " offset=0x" << aiocbp->u.c.offset << std::dec;
+            oss << " fh=" << aiocbp->aio_fildes << "]";
+            throw jexception(jerrno::JERR__AIO, oss.str(), "rmgr", "get_events");
+        }
+
+        if (pcbp) // Page reads have pcb
+        {
+            if (pcbp->_rfh->rd_subm_cnt_dblks() >= JRNL_SBLK_SIZE) // Detects if write reset of this fcntl obj has occurred.
+            {
+                // Increment the completed read offset
+                // NOTE: We cannot use _rrfc here, as it may have rotated since submitting count.
+                // Use stored pointer to fcntl in the pcb instead.
+                pcbp->_rdblks = aiocbp->u.c.nbytes / JRNL_DBLK_SIZE;
+                pcbp->_rfh->add_rd_cmpl_cnt_dblks(pcbp->_rdblks);
+                pcbp->_state = state;
+                pil[i] = pcbp->_index;
+            }
+        }
+        else // File header reads have no pcb
+        {
+            std::memcpy(&_fhdr, _fhdr_buffer, sizeof(file_hdr));
+            _rrfc.add_cmpl_cnt_dblks(JRNL_SBLK_SIZE);
+
+            u_int32_t fro_dblks = (_fhdr._fro / JRNL_DBLK_SIZE) - JRNL_SBLK_SIZE;
+            // Check fro_dblks does not exceed the write pointers which can happen in some corrupted journal recoveries
+            if (fro_dblks > _jc->wr_subm_cnt_dblks(_fhdr._pfid) - JRNL_SBLK_SIZE)
+                fro_dblks = _jc->wr_subm_cnt_dblks(_fhdr._pfid) - JRNL_SBLK_SIZE;
+            _pg_cntr = fro_dblks / (JRNL_RMGR_PAGE_SIZE * JRNL_SBLK_SIZE);
+            u_int32_t tot_pg_offs_dblks = _pg_cntr * JRNL_RMGR_PAGE_SIZE * JRNL_SBLK_SIZE;
+            _pg_index = _pg_cntr % JRNL_RMGR_PAGES;
+            _pg_offset_dblks = fro_dblks - tot_pg_offs_dblks;
+            _rrfc.add_subm_cnt_dblks(tot_pg_offs_dblks);
+            _rrfc.add_cmpl_cnt_dblks(tot_pg_offs_dblks);
+
+            _fhdr_rd_outstanding = false;
+            _rrfc.set_valid();
+        }
+    }
+
+    // Perform AIO return callback
+    if (_cbp && ret)
+        _cbp->rd_aio_cb(pil);
+    return ret;
+}
+
+void
+rmgr::recover_complete()
+{}
+
+void
+rmgr::invalidate()
+{
+    if (_rrfc.is_valid())
+        _rrfc.set_invalid();
+}
+
+void
+rmgr::flush(timespec* timeout)
+{
+    // Wait for any outstanding AIO read operations to complete before synchronizing
+    while (_aio_evt_rem)
+    {
+        if (get_events(AIO_COMPLETE, timeout) == jerrno::AIO_TIMEOUT) // timed out, nothing returned
+        {
+            throw jexception(jerrno::JERR__TIMEOUT,
+                            "Timed out waiting for outstanding read aio to return", "rmgr", "init_validation");
+        }
+    }
+
+    // Reset all read states and pointers
+    for (int i=0; i<_cache_num_pages; i++)
+        _page_cb_arr[i]._state = UNUSED;
+    _rrfc.unset_findex();
+    _pg_index = 0;
+    _pg_offset_dblks = 0;
+}
+
+bool
+rmgr::wait_for_validity(timespec* timeout, const bool throw_on_timeout)
+{
+    bool timed_out = false;
+    while (!_rrfc.is_valid() && !timed_out)
+    {
+        timed_out = get_events(AIO_COMPLETE, timeout) == jerrno::AIO_TIMEOUT;
+        if (timed_out && throw_on_timeout)
+            throw jexception(jerrno::JERR__TIMEOUT, "Timed out waiting for read validity", "rmgr", "wait_for_validity");
+    }
+    return _rrfc.is_valid();
+}
+
+iores
+rmgr::pre_read_check(data_tok* dtokp)
+{
+    if (_aio_evt_rem)
+        get_events(AIO_COMPLETE, 0);
+
+    if (!_rrfc.is_valid())
+        return RHM_IORES_RCINVALID;
+
+    // block reads until outstanding file header read completes as fro is needed to read
+    if (_fhdr_rd_outstanding)
+        return RHM_IORES_PAGE_AIOWAIT;
+
+    if(dblks_rem() == 0 && _rrfc.is_compl() && !_rrfc.is_wr_aio_outstanding())
+    {
+        aio_cycle();   // check if any AIOs have returned
+        if(dblks_rem() == 0 && _rrfc.is_compl() && !_rrfc.is_wr_aio_outstanding())
+        {
+            if (_jc->unflushed_dblks() > 0)
+                _jc->flush();
+            else if (!_aio_evt_rem)
+                return RHM_IORES_EMPTY;
+        }
+    }
+
+    // Check write state of this token is ENQ - required for read
+    if (dtokp)
+    {
+        if (!dtokp->is_readable())
+        {
+            std::ostringstream oss;
+            oss << std::hex << std::setfill('0');
+            oss << "dtok_id=0x" << std::setw(8) << dtokp->id();
+            oss << "; dtok_rid=0x" << std::setw(16) << dtokp->rid();
+            oss << "; dtok_wstate=" << dtokp->wstate_str();
+            throw jexception(jerrno::JERR_RMGR_ENQSTATE, oss.str(), "rmgr", "pre_read_check");
+        }
+    }
+
+    return RHM_IORES_SUCCESS;
+}
+
+iores
+rmgr::read_enq(rec_hdr& h, void* rptr, data_tok* dtokp)
+{
+    if (_page_cb_arr[_pg_index]._state != AIO_COMPLETE)
+    {
+        aio_cycle();   // check if any AIOs have returned
+        return RHM_IORES_PAGE_AIOWAIT;
+    }
+
+    // Read data from this page, first block will have header and data size.
+    u_int32_t dblks_rd = _enq_rec.decode(h, rptr, dtokp->dblocks_read(), dblks_rem());
+    dtokp->incr_dblocks_read(dblks_rd);
+
+    _pg_offset_dblks += dblks_rd;
+
+    // If data still incomplete, move to next page and decode again
+    while (dtokp->dblocks_read() < _enq_rec.rec_size_dblks())
+    {
+        rotate_page();
+        if (_page_cb_arr[_pg_index]._state != AIO_COMPLETE)
+        {
+            dtokp->set_rstate(data_tok::READ_PART);
+            dtokp->set_dsize(_enq_rec.data_size());
+            return RHM_IORES_PAGE_AIOWAIT;
+        }
+
+        rptr = (void*)((char*)_page_ptr_arr[_pg_index]);
+        dblks_rd = _enq_rec.decode(h, rptr, dtokp->dblocks_read(), dblks_rem());
+        dtokp->incr_dblocks_read(dblks_rd);
+        _pg_offset_dblks += dblks_rd;
+    }
+
+    // If we have finished with this page, rotate it
+    if (dblks_rem() == 0)
+        rotate_page();
+
+    // Set the record size in dtokp
+    dtokp->set_rstate(data_tok::READ);
+    dtokp->set_dsize(_enq_rec.data_size());
+    return RHM_IORES_SUCCESS;
+}
+
+void
+rmgr::consume_xid_rec(rec_hdr& h, void* rptr, data_tok* dtokp)
+{
+    if (h._magic == RHM_JDAT_ENQ_MAGIC)
+    {
+        enq_hdr ehdr;
+        std::memcpy(&ehdr, rptr, sizeof(enq_hdr));
+        if (ehdr.is_external())
+            dtokp->set_dsize(ehdr._xidsize + sizeof(enq_hdr) + sizeof(rec_tail));
+        else
+            dtokp->set_dsize(ehdr._xidsize + ehdr._dsize + sizeof(enq_hdr) + sizeof(rec_tail));
+    }
+    else if (h._magic == RHM_JDAT_DEQ_MAGIC)
+    {
+        deq_hdr dhdr;
+        std::memcpy(&dhdr, rptr, sizeof(deq_hdr));
+        if (dhdr._xidsize)
+            dtokp->set_dsize(dhdr._xidsize + sizeof(deq_hdr) + sizeof(rec_tail));
+        else
+            dtokp->set_dsize(sizeof(deq_hdr));
+    }
+    else if (h._magic == RHM_JDAT_TXA_MAGIC || h._magic == RHM_JDAT_TXC_MAGIC)
+    {
+        txn_hdr thdr;
+        std::memcpy(&thdr, rptr, sizeof(txn_hdr));
+        dtokp->set_dsize(thdr._xidsize + sizeof(txn_hdr) + sizeof(rec_tail));
+    }
+    else
+    {
+        std::ostringstream oss;
+        oss << "Record type found = \"" << (char*)&h._magic << "\"";
+        throw jexception(jerrno::JERR_RMGR_BADRECTYPE, oss.str(), "rmgr", "consume_xid_rec");
+    }
+    dtokp->set_dblocks_read(0);
+    skip(dtokp);
+}
+
+void
+rmgr::consume_filler()
+{
+    // Filler (Magic "RHMx") is one dblk by definition
+    _pg_offset_dblks++;
+    if (dblks_rem() == 0)
+        rotate_page();
+}
+
+iores
+rmgr::skip(data_tok* dtokp)
+{
+    u_int32_t dsize_dblks = jrec::size_dblks(dtokp->dsize());
+    u_int32_t tot_dblk_cnt = dtokp->dblocks_read();
+    while (true)
+    {
+        u_int32_t this_dblk_cnt = 0;
+        if (dsize_dblks - tot_dblk_cnt > dblks_rem())
+            this_dblk_cnt = dblks_rem();
+        else
+            this_dblk_cnt = dsize_dblks - tot_dblk_cnt;
+        if (this_dblk_cnt)
+        {
+            dtokp->incr_dblocks_read(this_dblk_cnt);
+            _pg_offset_dblks += this_dblk_cnt;
+            tot_dblk_cnt += this_dblk_cnt;
+        }
+        // If skip still incomplete, move to next page and decode again
+        if (tot_dblk_cnt < dsize_dblks)
+        {
+            if (dblks_rem() == 0)
+                rotate_page();
+            if (_page_cb_arr[_pg_index]._state != AIO_COMPLETE)
+            {
+                dtokp->set_rstate(data_tok::SKIP_PART);
+                return RHM_IORES_PAGE_AIOWAIT;
+            }
+        }
+        else
+        {
+            // Skip complete, put state back to unread
+            dtokp->set_rstate(data_tok::UNREAD);
+            dtokp->set_dsize(0);
+            dtokp->set_dblocks_read(0);
+
+            // If we have finished with this page, rotate it
+            if (dblks_rem() == 0)
+                rotate_page();
+            return RHM_IORES_SUCCESS;
+        }
+    }
+}
+
+iores
+rmgr::aio_cycle()
+{
+    // Perform validity checks
+    if (_fhdr_rd_outstanding) // read of file header still outstanding in aio
+        return RHM_IORES_SUCCESS;
+    if (!_rrfc.is_valid())
+    {
+        // Flush and reset all read states and pointers
+        flush(&jcntl::_aio_cmpl_timeout);
+
+        _jc->get_earliest_fid(); // determine initial file to read; calls _rrfc.set_findex() to set value
+        // If this file has not yet been written to, return RHM_IORES_EMPTY
+        if (_rrfc.is_void() && !_rrfc.is_wr_aio_outstanding())
+            return RHM_IORES_EMPTY;
+        init_file_header_read(); // send off AIO read request for file header
+        return RHM_IORES_SUCCESS;
+    }
+
+    int16_t first_uninit = -1;
+    u_int16_t num_uninit = 0;
+    u_int16_t num_compl = 0;
+    bool outstanding = false;
+    // Index must start with current buffer and cycle around so that first
+    // uninitialized buffer is initialized first
+    for (u_int16_t i=_pg_index; i<_pg_index+_cache_num_pages; i++)
+    {
+        int16_t ci = i % _cache_num_pages;
+        switch (_page_cb_arr[ci]._state)
+        {
+            case UNUSED:
+                if (first_uninit < 0)
+                    first_uninit = ci;
+                num_uninit++;
+                break;
+            case IN_USE:
+                break;
+            case AIO_PENDING:
+                outstanding = true;
+                break;
+            case AIO_COMPLETE:
+                num_compl++;
+                break;
+            default:;
+        }
+    }
+    iores res = RHM_IORES_SUCCESS;
+    if (num_uninit)
+        res = init_aio_reads(first_uninit, num_uninit);
+    else if (num_compl == _cache_num_pages) // This condition exists after invalidation
+        res = init_aio_reads(0, _cache_num_pages);
+    if (outstanding)
+        get_events(AIO_COMPLETE, 0);
+    return res;
+}
+
+iores
+rmgr::init_aio_reads(const int16_t first_uninit, const u_int16_t num_uninit)
+{
+    for (int16_t i=0; i<num_uninit; i++)
+    {
+        if (_rrfc.is_void()) // Nothing to do; this file not yet written to
+            break;
+
+        if (_rrfc.subm_offs() == 0)
+        {
+            _rrfc.add_subm_cnt_dblks(JRNL_SBLK_SIZE);
+            _rrfc.add_cmpl_cnt_dblks(JRNL_SBLK_SIZE);
+        }
+
+        // TODO: Future perf improvement: Do a single AIO read for all available file
+        // space into all contiguous empty pages in one AIO operation.
+
+        u_int32_t file_rem_dblks = _rrfc.remaining_dblks();
+        file_rem_dblks -= file_rem_dblks % JRNL_SBLK_SIZE; // round down to closest sblk boundary
+        u_int32_t pg_size_dblks = JRNL_RMGR_PAGE_SIZE * JRNL_SBLK_SIZE;
+        u_int32_t rd_size = file_rem_dblks > pg_size_dblks ? pg_size_dblks : file_rem_dblks;
+        if (rd_size)
+        {
+            int16_t pi = (i + first_uninit) % _cache_num_pages;
+            // TODO: For perf, combine contiguous pages into single read
+            //   1 or 2 AIOs needed depending on whether read block folds
+            aio_cb* aiocbp = &_aio_cb_arr[pi];
+            aio::prep_pread_2(aiocbp, _rrfc.fh(), _page_ptr_arr[pi], rd_size * JRNL_DBLK_SIZE, _rrfc.subm_offs());
+            if (aio::submit(_ioctx, 1, &aiocbp) < 0)
+                throw jexception(jerrno::JERR__AIO, "rmgr", "init_aio_reads");
+            _rrfc.add_subm_cnt_dblks(rd_size);
+            _aio_evt_rem++;
+            _page_cb_arr[pi]._state = AIO_PENDING;
+            _page_cb_arr[pi]._rfh = _rrfc.file_controller();
+        }
+        else // If there is nothing to read for this page, neither will there be for the others...
+            break;
+        if (_rrfc.file_rotate())
+            _rrfc.rotate();
+    }
+    return RHM_IORES_SUCCESS;
+}
+
+void
+rmgr::rotate_page()
+{
+    _page_cb_arr[_pg_index]._rdblks = 0;
+    _page_cb_arr[_pg_index]._state = UNUSED;
+    if (_pg_offset_dblks >= JRNL_RMGR_PAGE_SIZE * JRNL_SBLK_SIZE)
+    {
+        _pg_offset_dblks = 0;
+        _pg_cntr++;
+    }
+    if (++_pg_index >= _cache_num_pages)
+        _pg_index = 0;
+    aio_cycle();
+    _pg_offset_dblks = 0;
+    // This counter is for bookkeeping only, page rotates are handled directly in init_aio_reads()
+    // FIXME: _pg_cntr should be sync'd with aio ops, not use of page as it is now...
+    // Need to move reset into if (_rrfc.file_rotate()) above.
+    if (_pg_cntr >= (_jc->jfsize_sblks() / JRNL_RMGR_PAGE_SIZE))
+        _pg_cntr = 0;
+}
+
+u_int32_t
+rmgr::dblks_rem() const
+{
+    return _page_cb_arr[_pg_index]._rdblks - _pg_offset_dblks;
+}
+
+void
+rmgr::set_params_null(void** const datapp, std::size_t& dsize, void** const xidpp, std::size_t& xidsize)
+{
+    *datapp = 0;
+    dsize = 0;
+    *xidpp = 0;
+    xidsize = 0;
+}
+
+void
+rmgr::init_file_header_read()
+{
+    _jc->fhdr_wr_sync(_rrfc.index()); // wait if the file header write is outstanding
+    int rfh = _rrfc.fh();
+    aio::prep_pread_2(_fhdr_aio_cb_ptr, rfh, _fhdr_buffer, _sblksize, 0);
+    if (aio::submit(_ioctx, 1, &_fhdr_aio_cb_ptr) < 0)
+        throw jexception(jerrno::JERR__AIO, "rmgr", "init_file_header_read");
+    _aio_evt_rem++;
+    _rrfc.add_subm_cnt_dblks(JRNL_SBLK_SIZE);
+    _fhdr_rd_outstanding = true;
+}
+
+/* TODO (sometime in the future)
+const iores
+rmgr::get(const u_int64_t& rid, const std::size_t& dsize, const std::size_t& dsize_avail,
+        const void** const data, bool auto_discard)
+{
+    return RHM_IORES_SUCCESS;
+}
+
+const iores
+rmgr::discard(data_tok* dtokp)
+{
+    return RHM_IORES_SUCCESS;
+}
+*/
+
+} // namespace journal
+} // namespace mrg

Added: qpid/trunk/qpid/cpp/src/qpid/legacystore/jrnl/rmgr.hpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/legacystore/jrnl/rmgr.hpp?rev=1424091&view=auto
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/legacystore/jrnl/rmgr.hpp (added)
+++ qpid/trunk/qpid/cpp/src/qpid/legacystore/jrnl/rmgr.hpp Wed Dec 19 20:34:56 2012
@@ -0,0 +1,117 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+/**
+ * \file rmgr.hpp
+ *
+ * Qpid asynchronous store plugin library
+ *
+ * File containing code for class mrg::journal::rmgr (read manager). See
+ * class documentation for details.
+ *
+ * \author Kim van der Riet
+ *
+ * Copyright (c) 2007, 2008, 2009 Red Hat, Inc.
+ *
+ */
+
+#ifndef mrg_journal_rmgr_hpp
+#define mrg_journal_rmgr_hpp
+
+namespace mrg
+{
+namespace journal
+{
+class rmgr;
+}
+}
+
+#include <cstring>
+#include "jrnl/enums.hpp"
+#include "jrnl/file_hdr.hpp"
+#include "jrnl/pmgr.hpp"
+#include "jrnl/rec_hdr.hpp"
+#include "jrnl/rrfc.hpp"
+
+namespace mrg
+{
+namespace journal
+{
+
+    /**
+    * \brief Class for managing a read page cache of arbitrary size and number of pages.
+    *
+    * The read page cache works on the principle of filling as many pages as possilbe in advance of
+    * reading the data. This ensures that delays caused by AIO operations are minimized.
+    */
+    class rmgr : public pmgr
+    {
+    private:
+        rrfc& _rrfc;                ///< Ref to read rotating file controller
+        rec_hdr _hdr;               ///< Header used to determind record type
+
+        void* _fhdr_buffer;         ///< Buffer used for fhdr reads
+        aio_cb* _fhdr_aio_cb_ptr;   ///< iocb pointer for fhdr reads
+        file_hdr _fhdr;             ///< file header instance for reading file headers
+        bool _fhdr_rd_outstanding;  ///< true if a fhdr read is outstanding
+
+    public:
+        rmgr(jcntl* jc, enq_map& emap, txn_map& tmap, rrfc& rrfc);
+        virtual ~rmgr();
+
+        using pmgr::initialize;
+        void initialize(aio_callback* const cbp);
+        iores read(void** const datapp, std::size_t& dsize, void** const xidpp,
+                std::size_t& xidsize, bool& transient, bool& external, data_tok* dtokp,
+                bool ignore_pending_txns);
+        int32_t get_events(page_state state, timespec* const timeout, bool flush = false);
+        void recover_complete();
+        inline iores synchronize() { if (_rrfc.is_valid()) return RHM_IORES_SUCCESS; return aio_cycle(); }
+        void invalidate();
+        bool wait_for_validity(timespec* const timeout, const bool throw_on_timeout = false);
+
+        /* TODO (if required)
+        const iores get(const u_int64_t& rid, const std::size_t& dsize, const std::size_t& dsize_avail,
+                const void** const data, bool auto_discard);
+        const iores discard(data_tok* dtok);
+        */
+
+    private:
+        void clean();
+        void flush(timespec* timeout);
+        iores pre_read_check(data_tok* dtokp);
+        iores read_enq(rec_hdr& h, void* rptr, data_tok* dtokp);
+        void consume_xid_rec(rec_hdr& h, void* rptr, data_tok* dtokp);
+        void consume_filler();
+        iores skip(data_tok* dtokp);
+        iores aio_cycle();
+        iores init_aio_reads(const int16_t first_uninit, const u_int16_t num_uninit);
+        void rotate_page();
+        u_int32_t dblks_rem() const;
+        void set_params_null(void** const datapp, std::size_t& dsize, void** const xidpp,
+                std::size_t& xidsize);
+        void init_file_header_read();
+    };
+
+} // namespace journal
+} // namespace mrg
+
+#endif // ifndef mrg_journal_rmgr_hpp

Added: qpid/trunk/qpid/cpp/src/qpid/legacystore/jrnl/rrfc.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/legacystore/jrnl/rrfc.cpp?rev=1424091&view=auto
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/legacystore/jrnl/rrfc.cpp (added)
+++ qpid/trunk/qpid/cpp/src/qpid/legacystore/jrnl/rrfc.cpp Wed Dec 19 20:34:56 2012
@@ -0,0 +1,130 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+/**
+ * \file rrfc.cpp
+ *
+ * Qpid asynchronous store plugin library
+ *
+ * File containing code for class mrg::journal::rrfc (rotating
+ * file controller). See comments in file rrfc.hpp for details.
+ *
+ * \author Kim van der Riet
+ *
+ * Copyright (c) 2007, 2008, 2009 Red Hat, Inc.
+ *
+ * This file is part of the Qpid async store library msgstore.so.
+ *
+ */
+
+
+#include "jrnl/rrfc.hpp"
+
+#include <cerrno>
+#include <fcntl.h>
+#include <unistd.h>
+#include "jrnl/jerrno.hpp"
+#include "jrnl/jexception.hpp"
+
+namespace mrg
+{
+namespace journal
+{
+
+rrfc::rrfc(const lpmgr* lpmp): rfc(lpmp), _fh(-1), _valid(false)
+{}
+
+rrfc::~rrfc()
+{
+    close_fh();
+}
+
+void
+rrfc::finalize()
+{
+    unset_findex();
+    rfc::finalize();
+}
+
+void
+rrfc::set_findex(const u_int16_t fc_index)
+{
+    rfc::set_findex(fc_index);
+    open_fh(_curr_fc->fname());
+}
+
+void
+rrfc::unset_findex()
+{
+    set_invalid();
+    close_fh();
+    rfc::unset_findex();
+}
+
+iores
+rrfc::rotate()
+{
+    if (!_lpmp->num_jfiles())
+        throw jexception(jerrno::JERR__NINIT, "rrfc", "rotate");
+    u_int16_t next_fc_index = _fc_index + 1;
+    if (next_fc_index == _lpmp->num_jfiles())
+        next_fc_index = 0;
+    set_findex(next_fc_index);
+    return RHM_IORES_SUCCESS;
+}
+
+std::string
+rrfc::status_str() const
+{
+    std::ostringstream oss;
+    oss << "rrfc: " << rfc::status_str();
+    if (is_active())
+        oss << " fcntl[" << _fc_index << "]: " << _curr_fc->status_str();
+    return oss.str();
+}
+
+// === protected functions ===
+
+void
+rrfc::open_fh(const std::string& fn)
+{
+    close_fh();
+    _fh = ::open(fn.c_str(), O_RDONLY | O_DIRECT);
+    if (_fh < 0)
+    {
+        std::ostringstream oss;
+        oss << "file=\"" << fn << "\"" << FORMAT_SYSERR(errno);
+        throw jexception(jerrno::JERR_RRFC_OPENRD, oss.str(), "rrfc", "open_fh");
+    }
+}
+
+void
+rrfc::close_fh()
+{
+    if (_fh >= 0)
+    {
+        ::close(_fh);
+        _fh = -1;
+    }
+}
+
+} // namespace journal
+} // namespace mrg

Added: qpid/trunk/qpid/cpp/src/qpid/legacystore/jrnl/rrfc.hpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/legacystore/jrnl/rrfc.hpp?rev=1424091&view=auto
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/legacystore/jrnl/rrfc.hpp (added)
+++ qpid/trunk/qpid/cpp/src/qpid/legacystore/jrnl/rrfc.hpp Wed Dec 19 20:34:56 2012
@@ -0,0 +1,182 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+/**
+ * \file rrfc.hpp
+ *
+ * Qpid asynchronous store plugin library
+ *
+ * File containing code for class mrg::journal::rrfc (rotating
+ * file controller). See class documentation for details.
+ *
+ * \author Kim van der Riet
+ *
+ * Copyright (c) 2007, 2008, 2009 Red Hat, Inc.
+ *
+ */
+
+#ifndef mrg_journal_rrfc_hpp
+#define mrg_journal_rrfc_hpp
+
+namespace mrg
+{
+namespace journal
+{
+class rrfc;
+}
+}
+
+#include "jrnl/fcntl.hpp"
+#include "jrnl/rfc.hpp"
+
+namespace mrg
+{
+namespace journal
+{
+
+    /**
+    * \class rrfc
+    * \brief Read Rotating File Controller (rrfc) - Subclassed from pure virtual class rfc. Used to control the read
+    *     pipeline in a rotating file buffer or journal. See class rfc for further details.
+    *
+    * The states that exist in this class are identical to class rfc from which it inherits, but in addition, the value
+    * of the read file handle _fh is also considered. The calls to set_findex also opens the file handle _fh to the
+    * active file for reading. Similarly, unset_findex() closes this file handle.
+    *
+    * <pre>
+    *                                                                   is_init()  is_active()
+    *                  +===+                    _lpmp.is_init() == false
+    *      +---------->|   |     Uninitialized: _curr_fc == 0               F           F
+    *      |       +-->+===+ --+                _fh == -1
+    *      |       |           |
+    *      |       |           |
+    *      |   finalize()   initialize()
+    *      |       |           |
+    *      |       |           |
+    *      |       +-- +===+<--+                _lpmp.is_init() == true
+    *  finalize()      |   |     Inactive:      _curr_fc == 0               T           F
+    *      |       +-->+===+ --+                _fh == -1
+    *      |       |           |
+    *      |       |           |
+    *      | unset_findex() set_findex()
+    *      |       |           |
+    *      |       |           |
+    *      |       +-- +===+<--+                _lpmp.is_init() == true
+    *      +---------- |   |     Active:        _curr_fc != 0               T           T
+    *                  +===+                    _fh >= 0
+    * </pre>
+    *
+    * In adition to the states above, class rrfc contains a validity flag. This is operated indepenedently of the state
+    * machine. This flag (_valid) indicates when the read buffers are valid for reading. This is not strictly a state,
+    * but simply a flag used to keep track of the status, and is set/unset with calls to set_valid() and set_invalid()
+    * respectively.
+    */
+    class rrfc : public rfc
+    {
+    protected:
+        int _fh;                ///< Read file handle
+        bool _valid;            ///< Flag is true when read pages contain vailid data
+
+    public:
+        rrfc(const lpmgr* lpmp);
+        virtual ~rrfc();
+
+        /**
+        * \brief Initialize the controller, moving from state Uninitialized to Initialized. The main function of
+        *     initialize() is to set the number of files and the pointer to the fcntl array.
+        */
+        inline void initialize() { rfc::initialize(); _valid = false; }
+
+        /**
+        * \brief Reset the controller to Uninitialized state, usually called when the journal is stopped. Once called,
+        *     initialize() must be called to reuse an instance.
+        */
+        void finalize();
+
+        /**
+        * \brief Opens the file handle for reading a particular fid. Moves to state open.
+        */
+        void set_findex(const u_int16_t fc_index);
+
+        /**
+        * \brief Closes the read file handle and nulls the active fcntl pointer. Moves to state closed.
+        */
+        void unset_findex();
+
+        /**
+        * \brief Check the state: true = open; false = closed.
+        */
+        inline bool is_active() const { return _curr_fc != 0 && _fh >= 0; }
+
+        /**
+        * \brief Sets the validity flag which indicates that the read buffers contain valid data for reading.
+        */
+        inline void set_invalid() { _valid = false; }
+
+        /**
+        * \brief Resets the validity flag wich indicates that the read buffers are no longer synchronized and cannot
+        *     be read whithout resynchronization.
+        */
+        inline void set_valid() { _valid = true; }
+
+        /**
+        * \brief Checks the read buffer validity status: true = valid, can be read; false = invalid, synchronization
+        *     required.
+        */
+        inline bool is_valid() const { return _valid; }
+
+        /**
+        * \brief Rotate active file controller to next file in rotating file group.
+        * \exception jerrno::JERR__NINIT if called before calling initialize().
+        */
+        iores rotate();
+
+        inline int fh() const { return _fh; }
+
+        inline u_int32_t subm_cnt_dblks() const { return _curr_fc->rd_subm_cnt_dblks(); }
+        inline std::size_t subm_offs() const { return _curr_fc->rd_subm_offs(); }
+        inline u_int32_t add_subm_cnt_dblks(u_int32_t a) { return _curr_fc->add_rd_subm_cnt_dblks(a); }
+
+        inline u_int32_t cmpl_cnt_dblks() const { return _curr_fc->rd_cmpl_cnt_dblks(); }
+        inline std::size_t cmpl_offs() const { return _curr_fc->rd_cmpl_offs(); }
+        inline u_int32_t add_cmpl_cnt_dblks(u_int32_t a) { return _curr_fc->add_rd_cmpl_cnt_dblks(a); }
+
+        inline bool is_void() const { return _curr_fc->rd_void(); }
+        inline bool is_empty() const { return _curr_fc->rd_empty(); }
+        inline u_int32_t remaining_dblks() const { return _curr_fc->rd_remaining_dblks(); }
+        inline bool is_full() const { return _curr_fc->is_rd_full(); }
+        inline bool is_compl() const { return _curr_fc->is_rd_compl(); }
+        inline u_int32_t aio_outstanding_dblks() const { return _curr_fc->rd_aio_outstanding_dblks(); }
+        inline bool file_rotate() const { return _curr_fc->rd_file_rotate(); }
+        inline bool is_wr_aio_outstanding() const { return _curr_fc->wr_aio_outstanding_dblks() > 0; }
+
+        // Debug aid
+        std::string status_str() const;
+
+    protected:
+        void open_fh(const std::string& fn);
+        void close_fh();
+    }; // class rrfc
+
+} // namespace journal
+} // namespace mrg
+
+#endif // ifndef mrg_journal_rrfc_hpp

Added: qpid/trunk/qpid/cpp/src/qpid/legacystore/jrnl/slock.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/legacystore/jrnl/slock.cpp?rev=1424091&view=auto
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/legacystore/jrnl/slock.cpp (added)
+++ qpid/trunk/qpid/cpp/src/qpid/legacystore/jrnl/slock.cpp Wed Dec 19 20:34:56 2012
@@ -0,0 +1,36 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+/**
+ * \file slock.cpp
+ *
+ * Qpid asynchronous store plugin library
+ *
+ * File containing code for class mrg::journal::slock (scoped lock). See
+ * comments in file slock.hpp for details.
+ *
+ * \author Kim van der Riet
+ *
+ * Copyright (c) 2007, 2008 Red Hat Inc.
+ *
+ */
+
+#include "jrnl/slock.hpp"

Added: qpid/trunk/qpid/cpp/src/qpid/legacystore/jrnl/slock.hpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/legacystore/jrnl/slock.hpp?rev=1424091&view=auto
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/legacystore/jrnl/slock.hpp (added)
+++ qpid/trunk/qpid/cpp/src/qpid/legacystore/jrnl/slock.hpp Wed Dec 19 20:34:56 2012
@@ -0,0 +1,88 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+/**
+ * \file slock.hpp
+ *
+ * Qpid asynchronous store plugin library
+ *
+ * Messaging journal scoped lock class mrg::journal::slock and scoped try-lock
+ * class mrg::journal::stlock.
+ *
+ * \author Kim van der Riet
+ *
+ * Copyright (c) 2007, 2008 Red Hat, Inc.
+ *
+ */
+
+#ifndef mrg_journal_slock_hpp
+#define mrg_journal_slock_hpp
+
+#include "jrnl/jexception.hpp"
+#include "jrnl/smutex.hpp"
+#include <pthread.h>
+
+namespace mrg
+{
+namespace journal
+{
+
+    // Ultra-simple scoped lock class, auto-releases mutex when it goes out-of-scope
+    class slock
+    {
+    protected:
+        const smutex& _sm;
+    public:
+        inline slock(const smutex& sm) : _sm(sm)
+        {
+            PTHREAD_CHK(::pthread_mutex_lock(_sm.get()), "::pthread_mutex_lock", "slock", "slock");
+        }
+        inline ~slock()
+        {
+            PTHREAD_CHK(::pthread_mutex_unlock(_sm.get()), "::pthread_mutex_unlock", "slock", "~slock");
+        }
+    };
+
+    // Ultra-simple scoped try-lock class, auto-releases mutex when it goes out-of-scope
+    class stlock
+    {
+    protected:
+        const smutex& _sm;
+        bool _locked;
+    public:
+        inline stlock(const smutex& sm) : _sm(sm), _locked(false)
+        {
+            int ret = ::pthread_mutex_trylock(_sm.get());
+            _locked = (ret == 0); // check if lock obtained
+            if (!_locked && ret != EBUSY) PTHREAD_CHK(ret, "::pthread_mutex_trylock", "stlock", "stlock");
+        }
+        inline ~stlock()
+        {
+            if (_locked)
+                PTHREAD_CHK(::pthread_mutex_unlock(_sm.get()), "::pthread_mutex_unlock", "stlock", "~stlock");
+        }
+        inline bool locked() const { return _locked; }
+    };
+
+} // namespace journal
+} // namespace mrg
+
+#endif // ifndef mrg_journal_slock_hpp

Added: qpid/trunk/qpid/cpp/src/qpid/legacystore/jrnl/smutex.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/legacystore/jrnl/smutex.cpp?rev=1424091&view=auto
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/legacystore/jrnl/smutex.cpp (added)
+++ qpid/trunk/qpid/cpp/src/qpid/legacystore/jrnl/smutex.cpp Wed Dec 19 20:34:56 2012
@@ -0,0 +1,36 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+/**
+ * \file smutex.cpp
+ *
+ * Qpid asynchronous store plugin library
+ *
+ * File containing code for class mrg::journal::smutex (scoped mutex). See
+ * comments in file smutex.hpp for details.
+ *
+ * \author Kim van der Riet
+ *
+ * Copyright (c) 2010 Red Hat Inc.
+ *
+ */
+
+#include "jrnl/smutex.hpp"

Added: qpid/trunk/qpid/cpp/src/qpid/legacystore/jrnl/smutex.hpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/legacystore/jrnl/smutex.hpp?rev=1424091&view=auto
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/legacystore/jrnl/smutex.hpp (added)
+++ qpid/trunk/qpid/cpp/src/qpid/legacystore/jrnl/smutex.hpp Wed Dec 19 20:34:56 2012
@@ -0,0 +1,67 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+/**
+ * \file smutex.hpp
+ *
+ * Qpid asynchronous store plugin library
+ *
+ * Messaging journal scoped mutex class mrg::journal::smutex.
+ *
+ * \author Kim van der Riet
+ *
+ * Copyright (c) 2010 Red Hat, Inc.
+ *
+ */
+
+
+#ifndef mrg_journal_smutex_hpp
+#define mrg_journal_smutex_hpp
+
+#include "jrnl/jexception.hpp"
+#include <pthread.h>
+
+namespace mrg
+{
+namespace journal
+{
+
+    // Ultra-simple scoped mutex class that allows a posix mutex to be initialized and destroyed with error checks
+    class smutex
+    {
+    protected:
+        mutable pthread_mutex_t _m;
+    public:
+        inline smutex()
+        {
+            PTHREAD_CHK(::pthread_mutex_init(&_m, 0), "::pthread_mutex_init", "smutex", "smutex");
+        }
+        inline virtual ~smutex()
+        {
+            PTHREAD_CHK(::pthread_mutex_destroy(&_m), "::pthread_mutex_destroy", "smutex", "~smutex");
+        }
+        inline pthread_mutex_t* get() const { return &_m; }
+    };
+
+} // namespace journal
+} // namespace mrg
+
+#endif // ifndef mrg_journal_smutex_hpp

Added: qpid/trunk/qpid/cpp/src/qpid/legacystore/jrnl/time_ns.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/legacystore/jrnl/time_ns.cpp?rev=1424091&view=auto
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/legacystore/jrnl/time_ns.cpp (added)
+++ qpid/trunk/qpid/cpp/src/qpid/legacystore/jrnl/time_ns.cpp Wed Dec 19 20:34:56 2012
@@ -0,0 +1,58 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+/**
+ * \file time_ns.cpp
+ *
+ * Qpid asynchronous store plugin library
+ *
+ * Messaging journal time struct mrg::journal::time_ns, derived from
+ * the timespec struct and provided with helper functions.
+ *
+ * \author Kim van der Riet
+ *
+ * Copyright (c) 2008 Red Hat, Inc.
+ *
+ */
+
+#include "time_ns.hpp"
+
+#include <sstream>
+
+namespace mrg
+{
+namespace journal
+{
+
+const std::string
+time_ns::str(int precision) const
+{
+    const double t = tv_sec + (tv_nsec/1e9);
+    std::ostringstream oss;
+    oss.setf(std::ios::fixed, std::ios::floatfield);
+    oss.precision(precision);
+    oss << t;
+    return oss.str();
+}
+
+
+} // namespace journal
+} // namespace mrg

Added: qpid/trunk/qpid/cpp/src/qpid/legacystore/jrnl/time_ns.hpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/legacystore/jrnl/time_ns.hpp?rev=1424091&view=auto
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/legacystore/jrnl/time_ns.hpp (added)
+++ qpid/trunk/qpid/cpp/src/qpid/legacystore/jrnl/time_ns.hpp Wed Dec 19 20:34:56 2012
@@ -0,0 +1,108 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+/**
+ * \file time_ns.hpp
+ *
+ * Qpid asynchronous store plugin library
+ *
+ * Messaging journal time struct mrg::journal::time_ns, derived from
+ * the timespec struct and provided with helper functions.
+ *
+ * \author Kim van der Riet
+ *
+ * Copyright (c) 2008 Red Hat, Inc.
+ *
+ */
+
+#ifndef mrg_jtt_time_ns_hpp
+#define mrg_jtt_time_ns_hpp
+
+#include <cerrno>
+#include <ctime>
+#include <string>
+
+namespace mrg
+{
+namespace journal
+{
+
+struct time_ns : public timespec
+{
+    inline time_ns() { tv_sec = 0; tv_nsec = 0; }
+    inline time_ns(const std::time_t sec, const long nsec = 0) { tv_sec = sec; tv_nsec = nsec; }
+    inline time_ns(const time_ns& t) { tv_sec = t.tv_sec; tv_nsec = t.tv_nsec; }
+
+    inline void set_zero() { tv_sec = 0; tv_nsec = 0; }
+    inline bool is_zero() const { return tv_sec == 0 && tv_nsec == 0; }
+    inline int now() { if(::clock_gettime(CLOCK_REALTIME, this)) return errno; return 0; }
+    const std::string str(int precision = 6) const;
+
+    inline time_ns& operator=(const time_ns& rhs)
+        { tv_sec = rhs.tv_sec; tv_nsec = rhs.tv_nsec; return *this; }
+    inline time_ns& operator+=(const time_ns& rhs)
+    {
+        tv_nsec += rhs.tv_nsec;
+        if (tv_nsec >= 1000000000L) { tv_sec++; tv_nsec -= 1000000000L; }
+        tv_sec += rhs.tv_sec;
+        return *this;
+    }
+    inline time_ns& operator+=(const long ns)
+    {
+        tv_nsec += ns;
+        if (tv_nsec >= 1000000000L) { tv_sec++; tv_nsec -= 1000000000L; }
+        return *this;
+    }
+    inline time_ns& operator-=(const long ns)
+    {
+        tv_nsec -= ns;
+        if (tv_nsec < 0) { tv_sec--; tv_nsec += 1000000000L; }
+        return *this;
+    }
+    inline time_ns& operator-=(const time_ns& rhs)
+    {
+        tv_nsec -= rhs.tv_nsec;
+        if (tv_nsec < 0) { tv_sec--; tv_nsec += 1000000000L; }
+        tv_sec -= rhs.tv_sec;
+        return *this;
+    }
+    inline const time_ns operator+(const time_ns& rhs)
+        { time_ns t(*this); t += rhs; return t; }
+    inline const time_ns operator-(const time_ns& rhs)
+        { time_ns t(*this); t -= rhs; return t; }
+    inline bool operator==(const time_ns& rhs)
+       { return tv_sec == rhs.tv_sec && tv_nsec == rhs.tv_nsec; }
+    inline bool operator!=(const time_ns& rhs)
+       { return tv_sec != rhs.tv_sec || tv_nsec != rhs.tv_nsec; }
+    inline bool operator>(const time_ns& rhs)
+       { if(tv_sec == rhs.tv_sec) return tv_nsec > rhs.tv_nsec; return tv_sec > rhs.tv_sec; }
+    inline bool operator>=(const time_ns& rhs)
+       { if(tv_sec == rhs.tv_sec) return tv_nsec >= rhs.tv_nsec; return tv_sec >= rhs.tv_sec; }
+    inline bool operator<(const time_ns& rhs)
+       { if(tv_sec == rhs.tv_sec) return tv_nsec < rhs.tv_nsec; return tv_sec < rhs.tv_sec; }
+    inline bool operator<=(const time_ns& rhs)
+       { if(tv_sec == rhs.tv_sec) return tv_nsec <= rhs.tv_nsec; return tv_sec <= rhs.tv_sec; }
+};
+
+} // namespace journal
+} // namespace mrg
+
+#endif // ifndef mrg_jtt_time_ns_hpp

Added: qpid/trunk/qpid/cpp/src/qpid/legacystore/jrnl/txn_hdr.hpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/legacystore/jrnl/txn_hdr.hpp?rev=1424091&view=auto
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/legacystore/jrnl/txn_hdr.hpp (added)
+++ qpid/trunk/qpid/cpp/src/qpid/legacystore/jrnl/txn_hdr.hpp Wed Dec 19 20:34:56 2012
@@ -0,0 +1,128 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+/**
+ * \file txn_hdr.hpp
+ *
+ * Qpid asynchronous store plugin library
+ *
+ * File containing code for class mrg::journal::txn_hdr (transaction
+ * record header), used to start a transaction (commit or abort) record.
+ *
+ * \author Kim van der Riet
+ *
+ * Copyright (c) 2007, 2008 Red Hat, Inc.
+ *
+ */
+
+#ifndef mrg_journal_txn_hdr_hpp
+#define mrg_journal_txn_hdr_hpp
+
+#include <cstddef>
+#include "jrnl/rec_hdr.hpp"
+
+namespace mrg
+{
+namespace journal
+{
+
+#pragma pack(1)
+
+    /**
+    * \brief Struct for transaction commit and abort records.
+    *
+    * Struct for DTX commit and abort records. Only the magic distinguishes between them. Since
+    * this record must be used in the context of a valid XID, the xidsize field must not be zero.
+    * Immediately following this record is the XID itself which is xidsize bytes long, followed by
+    * a rec_tail.
+    *
+    * Note that this record had its own rid distinct from the rids of the record(s) making up the
+    * transaction it is committing or aborting.
+    *
+    * Record header info in binary format (24 bytes):
+    * <pre>
+    *   0                           7
+    * +---+---+---+---+---+---+---+---+  -+
+    * |     magic     | v | e | flags |   |
+    * +---+---+---+---+---+---+---+---+   | struct hdr
+    * |              rid              |   |
+    * +---+---+---+---+---+---+---+---+  -+
+    * |            xidsize            |
+    * +---+---+---+---+---+---+---+---+
+    * v = file version (If the format or encoding of this file changes, then this
+    *     number should be incremented)
+    * e = endian flag, false (0x00) for little endian, true (0x01) for big endian
+    * </pre>
+    *
+    * Note that journal files should be transferable between 32- and 64-bit
+    * hardware of the same endianness, but not between hardware of opposite
+    * entianness without some sort of binary conversion utility. Thus buffering
+    * will be needed for types that change size between 32- and 64-bit compiles.
+    */
+    struct txn_hdr : rec_hdr
+    {
+#if defined(JRNL_BIG_ENDIAN) && defined(JRNL_32_BIT)
+        u_int32_t _filler0;     ///< Big-endian filler for 32-bit size_t
+#endif
+        std::size_t _xidsize;        ///< XID size
+#if defined(JRNL_LITTLE_ENDIAN) && defined(JRNL_32_BIT)
+        u_int32_t _filler0;     ///< Little-endian filler for 32-bit size_t
+#endif
+
+        /**
+        * \brief Default constructor, which sets all values to 0.
+        */
+        txn_hdr(): rec_hdr(),
+#if defined(JRNL_BIG_ENDIAN) && defined(JRNL_32_BIT)
+            _filler0(0),
+#endif
+            _xidsize(0)
+#if defined(JRNL_LITTLE_ENDIAN) && defined(JRNL_32_BIT)
+            , _filler0(0)
+#endif
+        {}
+
+        /**
+        * \brief Convenience constructor which initializes values during construction.
+        */
+        txn_hdr(const u_int32_t magic, const u_int8_t version, const u_int64_t rid,
+                const std::size_t xidsize, const bool owi): rec_hdr(magic, version, rid, owi),
+#if defined(JRNL_BIG_ENDIAN) && defined(JRNL_32_BIT)
+            _filler0(0),
+#endif
+            _xidsize(xidsize)
+#if defined(JRNL_LITTLE_ENDIAN) && defined(JRNL_32_BIT)
+            , _filler0(0)
+#endif
+        {}
+
+        /**
+        * \brief Returns the size of the header in bytes.
+        */
+        inline static std::size_t size() { return sizeof(txn_hdr); }
+    };
+
+#pragma pack()
+
+} // namespace journal
+} // namespace mrg
+
+#endif // ifndef mrg_journal_txn_hdr_hpp

Added: qpid/trunk/qpid/cpp/src/qpid/legacystore/jrnl/txn_map.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/legacystore/jrnl/txn_map.cpp?rev=1424091&view=auto
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/legacystore/jrnl/txn_map.cpp (added)
+++ qpid/trunk/qpid/cpp/src/qpid/legacystore/jrnl/txn_map.cpp Wed Dec 19 20:34:56 2012
@@ -0,0 +1,259 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+/**
+ * \file txn_map.cpp
+ *
+ * Qpid asynchronous store plugin library
+ *
+ * File containing code for class mrg::journal::txn_map (transaction map). See
+ * comments in file txn_map.hpp for details.
+ *
+ * \author Kim van der Riet
+ *
+ * Copyright (c) 2007, 2008, 2009, 2010 Red Hat Inc.
+ *
+ */
+
+#include "jrnl/txn_map.hpp"
+
+#include <iomanip>
+#include "jrnl/jerrno.hpp"
+#include "jrnl/jexception.hpp"
+#include "jrnl/slock.hpp"
+#include <sstream>
+
+namespace mrg
+{
+namespace journal
+{
+
+// return/error codes
+int16_t txn_map::TMAP_RID_NOT_FOUND = -2;
+int16_t txn_map::TMAP_XID_NOT_FOUND = -1;
+int16_t txn_map::TMAP_OK = 0;
+int16_t txn_map::TMAP_NOT_SYNCED = 0;
+int16_t txn_map::TMAP_SYNCED = 1;
+
+txn_data_struct::txn_data_struct(const u_int64_t rid, const u_int64_t drid, const u_int16_t pfid,
+		const bool enq_flag, const bool commit_flag):
+        _rid(rid),
+        _drid(drid),
+        _pfid(pfid),
+        _enq_flag(enq_flag),
+        _commit_flag(commit_flag),
+        _aio_compl(false)
+{}
+
+txn_map::txn_map():
+        _map(),
+        _pfid_txn_cnt()
+{}
+
+txn_map::~txn_map() {}
+
+void
+txn_map::set_num_jfiles(const u_int16_t num_jfiles)
+{
+    _pfid_txn_cnt.resize(num_jfiles, 0);
+}
+
+u_int32_t
+txn_map::get_txn_pfid_cnt(const u_int16_t pfid) const
+{
+    return _pfid_txn_cnt.at(pfid);
+}
+
+bool
+txn_map::insert_txn_data(const std::string& xid, const txn_data& td)
+{
+    bool ok = true;
+    slock s(_mutex);
+    xmap_itr itr = _map.find(xid);
+    if (itr == _map.end()) // not found in map
+    {
+        txn_data_list list;
+        list.push_back(td);
+        std::pair<xmap_itr, bool> ret = _map.insert(xmap_param(xid, list));
+        if (!ret.second) // duplicate
+            ok = false;
+    }
+    else
+        itr->second.push_back(td);
+    _pfid_txn_cnt.at(td._pfid)++;
+    return ok;
+}
+
+const txn_data_list
+txn_map::get_tdata_list(const std::string& xid)
+{
+    slock s(_mutex);
+    return get_tdata_list_nolock(xid);
+}
+
+const txn_data_list
+txn_map::get_tdata_list_nolock(const std::string& xid)
+{
+    xmap_itr itr = _map.find(xid);
+    if (itr == _map.end()) // not found in map
+        return _empty_data_list;
+    return itr->second;
+}
+
+const txn_data_list
+txn_map::get_remove_tdata_list(const std::string& xid)
+{
+    slock s(_mutex);
+    xmap_itr itr = _map.find(xid);
+    if (itr == _map.end()) // not found in map
+        return _empty_data_list;
+    txn_data_list list = itr->second;
+    _map.erase(itr);
+    for (tdl_itr i=list.begin(); i!=list.end(); i++)
+        _pfid_txn_cnt.at(i->_pfid)--;
+    return list;
+}
+
+bool
+txn_map::in_map(const std::string& xid)
+{
+    slock s(_mutex);
+    xmap_itr itr= _map.find(xid);
+    return itr != _map.end();
+}
+
+u_int32_t
+txn_map::enq_cnt()
+{
+    return cnt(true);
+}
+
+u_int32_t
+txn_map::deq_cnt()
+{
+    return cnt(true);
+}
+
+u_int32_t
+txn_map::cnt(const bool enq_flag)
+{
+    slock s(_mutex);
+    u_int32_t c = 0;
+    for (xmap_itr i = _map.begin(); i != _map.end(); i++)
+    {
+        for (tdl_itr j = i->second.begin(); j < i->second.end(); j++)
+        {
+            if (j->_enq_flag == enq_flag)
+                c++;
+        }
+    }
+    return c;
+}
+
+int16_t
+txn_map::is_txn_synced(const std::string& xid)
+{
+    slock s(_mutex);
+    xmap_itr itr = _map.find(xid);
+    if (itr == _map.end()) // not found in map
+        return TMAP_XID_NOT_FOUND;
+    bool is_synced = true;
+    for (tdl_itr litr = itr->second.begin(); litr < itr->second.end(); litr++)
+    {
+        if (!litr->_aio_compl)
+        {
+            is_synced = false;
+            break;
+        }
+    }
+    return is_synced ? TMAP_SYNCED : TMAP_NOT_SYNCED;
+}
+
+int16_t
+txn_map::set_aio_compl(const std::string& xid, const u_int64_t rid)
+{
+    slock s(_mutex);
+    xmap_itr itr = _map.find(xid);
+    if (itr == _map.end()) // xid not found in map
+        return TMAP_XID_NOT_FOUND;
+    for (tdl_itr litr = itr->second.begin(); litr < itr->second.end(); litr++)
+    {
+        if (litr->_rid == rid)
+        {
+            litr->_aio_compl = true;
+            return TMAP_OK; // rid found
+        }
+    }
+    // xid present, but rid not found
+    return TMAP_RID_NOT_FOUND;
+}
+
+bool
+txn_map::data_exists(const std::string& xid, const u_int64_t rid)
+{
+    bool found = false;
+    {
+        slock s(_mutex);
+        txn_data_list tdl = get_tdata_list_nolock(xid);
+        tdl_itr itr = tdl.begin();
+        while (itr != tdl.end() && !found)
+        {
+            found = itr->_rid == rid;
+            itr++;
+        }
+    }
+    return found;
+}
+
+bool
+txn_map::is_enq(const u_int64_t rid)
+{
+    bool found = false;
+    {
+        slock s(_mutex);
+        for (xmap_itr i = _map.begin(); i != _map.end() && !found; i++)
+        {
+            txn_data_list list = i->second;
+            for (tdl_itr j = list.begin(); j < list.end() && !found; j++)
+            {
+                if (j->_enq_flag)
+                    found = j->_rid == rid;
+                else
+                    found = j->_drid == rid;
+            }
+        }
+    }
+    return found;
+}
+
+void
+txn_map::xid_list(std::vector<std::string>& xv)
+{
+    xv.clear();
+    {
+        slock s(_mutex);
+        for (xmap_itr itr = _map.begin(); itr != _map.end(); itr++)
+            xv.push_back(itr->first);
+    }
+}
+
+} // namespace journal
+} // namespace mrg

Added: qpid/trunk/qpid/cpp/src/qpid/legacystore/jrnl/txn_map.hpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/legacystore/jrnl/txn_map.hpp?rev=1424091&view=auto
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/legacystore/jrnl/txn_map.hpp (added)
+++ qpid/trunk/qpid/cpp/src/qpid/legacystore/jrnl/txn_map.hpp Wed Dec 19 20:34:56 2012
@@ -0,0 +1,162 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+/**
+ * \file txn_map.hpp
+ *
+ * Qpid asynchronous store plugin library
+ *
+ * File containing code for class mrg::journal::txn_map (transaction map).
+ * See class documentation for details.
+ *
+ * \author Kim van der Riet
+ *
+ * Copyright (c) 2007, 2008, 2009, 2010 Red Hat Inc.
+ *
+ */
+
+#ifndef mrg_journal_txn_map_hpp
+#define mrg_journal_txn_map_hpp
+
+namespace mrg
+{
+namespace journal
+{
+    class txn_map;
+}
+}
+
+#include "jrnl/smutex.hpp"
+#include <map>
+#include <pthread.h>
+#include <string>
+#include <sys/types.h>
+#include <vector>
+
+namespace mrg
+{
+namespace journal
+{
+
+    /**
+    * \struct txn_data_struct
+    * \brief Struct encapsulating transaction data necessary for processing a transaction
+    *     in the journal once it is closed with either a commit or abort.
+    */
+    struct txn_data_struct
+    {
+        u_int64_t _rid;     ///< Record id for this operation
+        u_int64_t _drid;    ///< Dequeue record id for this operation
+        u_int16_t _pfid;    ///< Physical file id, to be used when transferring to emap on commit
+        bool _enq_flag;     ///< If true, enq op, otherwise deq op
+        bool _commit_flag;  ///< (2PC transactions) Records 2PC complete c/a mode
+        bool _aio_compl;    ///< Initially false, set to true when record AIO returns
+        txn_data_struct(const u_int64_t rid, const u_int64_t drid, const u_int16_t pfid,
+                const bool enq_flag, const bool commit_flag = false);
+    };
+    typedef txn_data_struct txn_data;
+    typedef std::vector<txn_data> txn_data_list;
+    typedef txn_data_list::iterator tdl_itr;
+
+    /**
+    * \class txn_map
+    * \brief Class for storing transaction data for each open (ie not committed or aborted)
+    *     xid in the store. If aborted, records are discarded; if committed, they are
+    *     transferred to the enqueue map.
+    *
+    * The data is encapsulated by struct txn_data_struct. A vector containing the information
+    * for each operation included as part of the same transaction is mapped against the
+    * xid.
+    *
+    * The aio_compl flag is set true as each AIO write operation for the enqueue or dequeue
+    * returns. Checking that all of these flags are true for a given xid is the mechanism
+    * used to determine if the transaction is syncronized (through method is_txn_synced()).
+    *
+    * On transaction commit, then each operation is handled as follows:
+    *
+    * If an enqueue (_enq_flag is true), then the rid and pfid are transferred to the enq_map.
+    * If a dequeue (_enq_flag is false), then the rid stored in the drid field is used to
+    * remove the corresponding record from the enq_map.
+    *
+    * On transaction abort, then each operation is handled as follows:
+    *
+    * If an enqueue (_enq_flag is true), then the data is simply discarded.
+    * If a dequeue (_enq_flag is false), then the lock for the corresponding enqueue in enq_map
+    * (if not a part of the same transaction) is removed, and the data discarded.
+    *
+    * <pre>
+    *   key      data
+    *
+    *   xid1 --- vector< [ rid, drid, pfid, enq_flag, commit_flag, aio_compl ] >
+    *   xid2 --- vector< [ rid, drid, pfid, enq_flag, commit_flag, aio_compl ] >
+    *   xid3 --- vector< [ rid, drid, pfid, enq_flag, commit_flag, aio_compl ] >
+    *   ...
+    * </pre>
+    */
+    class txn_map
+    {
+    public:
+        // return/error codes
+        static int16_t TMAP_RID_NOT_FOUND;
+        static int16_t TMAP_XID_NOT_FOUND;
+        static int16_t TMAP_OK;
+        static int16_t TMAP_NOT_SYNCED;
+        static int16_t TMAP_SYNCED;
+
+    private:
+        typedef std::pair<std::string, txn_data_list> xmap_param;
+        typedef std::map<std::string, txn_data_list> xmap;
+        typedef xmap::iterator xmap_itr;
+
+        xmap _map;
+        smutex _mutex;
+        std::vector<u_int32_t> _pfid_txn_cnt;
+        const txn_data_list _empty_data_list;
+
+    public:
+        txn_map();
+        virtual ~txn_map();
+
+        void set_num_jfiles(const u_int16_t num_jfiles);
+        u_int32_t get_txn_pfid_cnt(const u_int16_t pfid) const;
+        bool insert_txn_data(const std::string& xid, const txn_data& td);
+        const txn_data_list get_tdata_list(const std::string& xid);
+        const txn_data_list get_remove_tdata_list(const std::string& xid);
+        bool in_map(const std::string& xid);
+        u_int32_t enq_cnt();
+        u_int32_t deq_cnt();
+        int16_t is_txn_synced(const std::string& xid); // -1=xid not found; 0=not synced; 1=synced
+        int16_t set_aio_compl(const std::string& xid, const u_int64_t rid); // -2=rid not found; -1=xid not found; 0=done
+        bool data_exists(const std::string& xid, const u_int64_t rid);
+        bool is_enq(const u_int64_t rid);
+        inline void clear() { _map.clear(); }
+        inline bool empty() const { return _map.empty(); }
+        inline size_t size() const { return _map.size(); }
+        void xid_list(std::vector<std::string>& xv);
+    private:
+        u_int32_t cnt(const bool enq_flag);
+        const txn_data_list get_tdata_list_nolock(const std::string& xid);
+    };
+
+} // namespace journal
+} // namespace mrg
+
+#endif // ifndef mrg_journal_txn_map_hpp

Added: qpid/trunk/qpid/cpp/src/qpid/legacystore/jrnl/txn_rec.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/legacystore/jrnl/txn_rec.cpp?rev=1424091&view=auto
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/legacystore/jrnl/txn_rec.cpp (added)
+++ qpid/trunk/qpid/cpp/src/qpid/legacystore/jrnl/txn_rec.cpp Wed Dec 19 20:34:56 2012
@@ -0,0 +1,450 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+/**
+ * \file txn_rec.cpp
+ *
+ * Qpid asynchronous store plugin library
+ *
+ * This file contains the code for the mrg::journal::txn_rec (journal dequeue
+ * record) class. See comments in file txn_rec.hpp for details.
+ *
+ * \author Kim van der Riet
+ *
+ * Copyright (c) 2007, 2008 Red Hat, Inc.
+ *
+ */
+
+#include "jrnl/txn_rec.hpp"
+
+#include <cassert>
+#include <cerrno>
+#include <cstdlib>
+#include <cstring>
+#include <iomanip>
+#include "jrnl/jerrno.hpp"
+#include "jrnl/jexception.hpp"
+#include <sstream>
+
+namespace mrg
+{
+namespace journal
+{
+
+txn_rec::txn_rec():
+        _txn_hdr(),
+        _xidp(0),
+        _buff(0),
+        _txn_tail()
+{
+    _txn_hdr._version = RHM_JDAT_VERSION;
+}
+
+txn_rec::txn_rec(const u_int32_t magic, const u_int64_t rid, const void* const xidp,
+        const std::size_t xidlen, const bool owi):
+        _txn_hdr(magic, RHM_JDAT_VERSION, rid, xidlen, owi),
+        _xidp(xidp),
+        _buff(0),
+        _txn_tail(_txn_hdr)
+{}
+
+txn_rec::~txn_rec()
+{
+    clean();
+}
+
+void
+txn_rec::reset(const u_int32_t magic)
+{
+    _txn_hdr._magic = magic;
+    _txn_hdr._rid = 0;
+    _txn_hdr._xidsize = 0;
+    _xidp = 0;
+    _buff = 0;
+    _txn_tail._xmagic = ~magic;
+    _txn_tail._rid = 0;
+}
+
+void
+txn_rec::reset(const u_int32_t magic, const  u_int64_t rid, const void* const xidp,
+        const std::size_t xidlen, const bool owi)
+{
+    _txn_hdr._magic = magic;
+    _txn_hdr._rid = rid;
+    _txn_hdr.set_owi(owi);
+    _txn_hdr._xidsize = xidlen;
+    _xidp = xidp;
+    _buff = 0;
+    _txn_tail._xmagic = ~magic;
+    _txn_tail._rid = rid;
+}
+
+u_int32_t
+txn_rec::encode(void* wptr, u_int32_t rec_offs_dblks, u_int32_t max_size_dblks)
+{
+    assert(wptr != 0);
+    assert(max_size_dblks > 0);
+    assert(_xidp != 0 && _txn_hdr._xidsize > 0);
+
+    std::size_t rec_offs = rec_offs_dblks * JRNL_DBLK_SIZE;
+    std::size_t rem = max_size_dblks * JRNL_DBLK_SIZE;
+    std::size_t wr_cnt = 0;
+    if (rec_offs_dblks) // Continuation of split dequeue record (over 2 or more pages)
+    {
+        if (size_dblks(rec_size()) - rec_offs_dblks > max_size_dblks) // Further split required
+        {
+            rec_offs -= sizeof(_txn_hdr);
+            std::size_t wsize = _txn_hdr._xidsize > rec_offs ? _txn_hdr._xidsize - rec_offs : 0;
+            std::size_t wsize2 = wsize;
+            if (wsize)
+            {
+                if (wsize > rem)
+                    wsize = rem;
+                std::memcpy(wptr, (const char*)_xidp + rec_offs, wsize);
+                wr_cnt += wsize;
+                rem -= wsize;
+            }
+            rec_offs -= _txn_hdr._xidsize - wsize2;
+            if (rem)
+            {
+                wsize = sizeof(_txn_tail) > rec_offs ? sizeof(_txn_tail) - rec_offs : 0;
+                wsize2 = wsize;
+                if (wsize)
+                {
+                    if (wsize > rem)
+                        wsize = rem;
+                    std::memcpy((char*)wptr + wr_cnt, (char*)&_txn_tail + rec_offs, wsize);
+                    wr_cnt += wsize;
+                    rem -= wsize;
+                }
+                rec_offs -= sizeof(_txn_tail) - wsize2;
+            }
+            assert(rem == 0);
+            assert(rec_offs == 0);
+        }
+        else // No further split required
+        {
+            rec_offs -= sizeof(_txn_hdr);
+            std::size_t wsize = _txn_hdr._xidsize > rec_offs ? _txn_hdr._xidsize - rec_offs : 0;
+            if (wsize)
+            {
+                std::memcpy(wptr, (const char*)_xidp + rec_offs, wsize);
+                wr_cnt += wsize;
+            }
+            rec_offs -= _txn_hdr._xidsize - wsize;
+            wsize = sizeof(_txn_tail) > rec_offs ? sizeof(_txn_tail) - rec_offs : 0;
+            if (wsize)
+            {
+                std::memcpy((char*)wptr + wr_cnt, (char*)&_txn_tail + rec_offs, wsize);
+                wr_cnt += wsize;
+#ifdef RHM_CLEAN
+                std::size_t rec_offs = rec_offs_dblks * JRNL_DBLK_SIZE;
+                std::size_t dblk_rec_size = size_dblks(rec_size() - rec_offs) * JRNL_DBLK_SIZE;
+                std::memset((char*)wptr + wr_cnt, RHM_CLEAN_CHAR, dblk_rec_size - wr_cnt);
+#endif
+            }
+            rec_offs -= sizeof(_txn_tail) - wsize;
+            assert(rec_offs == 0);
+        }
+    }
+    else // Start at beginning of data record
+    {
+        // Assumption: the header will always fit into the first dblk
+        std::memcpy(wptr, (void*)&_txn_hdr, sizeof(_txn_hdr));
+        wr_cnt = sizeof(_txn_hdr);
+        if (size_dblks(rec_size()) > max_size_dblks) // Split required
+        {
+            std::size_t wsize;
+            rem -= sizeof(_txn_hdr);
+            if (rem)
+            {
+                wsize = rem >= _txn_hdr._xidsize ? _txn_hdr._xidsize : rem;
+                std::memcpy((char*)wptr + wr_cnt, _xidp, wsize);
+                wr_cnt += wsize;
+                rem -= wsize;
+            }
+            if (rem)
+            {
+                wsize = rem >= sizeof(_txn_tail) ? sizeof(_txn_tail) : rem;
+                std::memcpy((char*)wptr + wr_cnt, (void*)&_txn_tail, wsize);
+                wr_cnt += wsize;
+                rem -= wsize;
+            }
+            assert(rem == 0);
+        }
+        else // No split required
+        {
+            std::memcpy((char*)wptr + wr_cnt, _xidp, _txn_hdr._xidsize);
+            wr_cnt += _txn_hdr._xidsize;
+            std::memcpy((char*)wptr + wr_cnt, (void*)&_txn_tail, sizeof(_txn_tail));
+            wr_cnt += sizeof(_txn_tail);
+#ifdef RHM_CLEAN
+            std::size_t dblk_rec_size = size_dblks(rec_size()) * JRNL_DBLK_SIZE;
+            std::memset((char*)wptr + wr_cnt, RHM_CLEAN_CHAR, dblk_rec_size - wr_cnt);
+#endif
+        }
+    }
+    return size_dblks(wr_cnt);
+}
+
+u_int32_t
+txn_rec::decode(rec_hdr& h, void* rptr, u_int32_t rec_offs_dblks, u_int32_t max_size_dblks)
+{
+    assert(rptr != 0);
+    assert(max_size_dblks > 0);
+
+    std::size_t rd_cnt = 0;
+    if (rec_offs_dblks) // Continuation of record on new page
+    {
+        const u_int32_t hdr_xid_dblks = size_dblks(txn_hdr::size() + _txn_hdr._xidsize);
+        const u_int32_t hdr_xid_tail_dblks = size_dblks(txn_hdr::size() +  _txn_hdr._xidsize +
+                rec_tail::size());
+        const std::size_t rec_offs = rec_offs_dblks * JRNL_DBLK_SIZE;
+
+        if (hdr_xid_tail_dblks - rec_offs_dblks <= max_size_dblks)
+        {
+            // Remainder of xid fits within this page
+            if (rec_offs - txn_hdr::size() < _txn_hdr._xidsize)
+            {
+                // Part of xid still outstanding, copy remainder of xid and tail
+                const std::size_t xid_offs = rec_offs - txn_hdr::size();
+                const std::size_t xid_rem = _txn_hdr._xidsize - xid_offs;
+                std::memcpy((char*)_buff + xid_offs, rptr, xid_rem);
+                rd_cnt = xid_rem;
+                std::memcpy((void*)&_txn_tail, ((char*)rptr + rd_cnt), sizeof(_txn_tail));
+                chk_tail();
+                rd_cnt += sizeof(_txn_tail);
+            }
+            else
+            {
+                // Tail or part of tail only outstanding, complete tail
+                const std::size_t tail_offs = rec_offs - txn_hdr::size() - _txn_hdr._xidsize;
+                const std::size_t tail_rem = rec_tail::size() - tail_offs;
+                std::memcpy((char*)&_txn_tail + tail_offs, rptr, tail_rem);
+                chk_tail();
+                rd_cnt = tail_rem;
+            }
+        }
+        else if (hdr_xid_dblks - rec_offs_dblks <= max_size_dblks)
+        {
+            // Remainder of xid fits within this page, tail split
+            const std::size_t xid_offs = rec_offs - txn_hdr::size();
+            const std::size_t xid_rem = _txn_hdr._xidsize - xid_offs;
+            std::memcpy((char*)_buff + xid_offs, rptr, xid_rem);
+            rd_cnt += xid_rem;
+            const std::size_t tail_rem = (max_size_dblks * JRNL_DBLK_SIZE) - rd_cnt;
+            if (tail_rem)
+            {
+                std::memcpy((void*)&_txn_tail, ((char*)rptr + xid_rem), tail_rem);
+                rd_cnt += tail_rem;
+            }
+        }
+        else
+        {
+            // Remainder of xid split
+            const std::size_t xid_cp_size = (max_size_dblks * JRNL_DBLK_SIZE);
+            std::memcpy((char*)_buff + rec_offs - txn_hdr::size(), rptr, xid_cp_size);
+            rd_cnt += xid_cp_size;
+        }
+    }
+    else // Start of record
+    {
+        // Get and check header
+        _txn_hdr.hdr_copy(h);
+        rd_cnt = sizeof(rec_hdr);
+#if defined(JRNL_BIG_ENDIAN) && defined(JRNL_32_BIT)
+        rd_cnt += sizeof(u_int32_t); // Filler 0
+#endif
+        _txn_hdr._xidsize = *(std::size_t*)((char*)rptr + rd_cnt);
+        rd_cnt = _txn_hdr.size();
+        chk_hdr();
+        _buff = std::malloc(_txn_hdr._xidsize);
+        MALLOC_CHK(_buff, "_buff", "txn_rec", "decode");
+        const u_int32_t hdr_xid_dblks = size_dblks(txn_hdr::size() + _txn_hdr._xidsize);
+        const u_int32_t hdr_xid_tail_dblks = size_dblks(txn_hdr::size() + _txn_hdr._xidsize +
+                rec_tail::size());
+
+        // Check if record (header + xid + tail) fits within this page, we can check the
+        // tail before the expense of copying data to memory
+        if (hdr_xid_tail_dblks <= max_size_dblks)
+        {
+            // Entire header, xid and tail fits within this page
+            std::memcpy(_buff, (char*)rptr + rd_cnt, _txn_hdr._xidsize);
+            rd_cnt += _txn_hdr._xidsize;
+            std::memcpy((void*)&_txn_tail, (char*)rptr + rd_cnt, sizeof(_txn_tail));
+            rd_cnt += sizeof(_txn_tail);
+            chk_tail();
+        }
+        else if (hdr_xid_dblks <= max_size_dblks)
+        {
+            // Entire header and xid fit within this page, tail split
+            std::memcpy(_buff, (char*)rptr + rd_cnt, _txn_hdr._xidsize);
+            rd_cnt += _txn_hdr._xidsize;
+            const std::size_t tail_rem = (max_size_dblks * JRNL_DBLK_SIZE) - rd_cnt;
+            if (tail_rem)
+            {
+                std::memcpy((void*)&_txn_tail, (char*)rptr + rd_cnt, tail_rem);
+                rd_cnt += tail_rem;
+            }
+        }
+        else
+        {
+            // Header fits within this page, xid split
+            const std::size_t xid_cp_size = (max_size_dblks * JRNL_DBLK_SIZE) - rd_cnt;
+            std::memcpy(_buff, (char*)rptr + rd_cnt, xid_cp_size);
+            rd_cnt += xid_cp_size;
+        }
+    }
+    return size_dblks(rd_cnt);
+}
+
+bool
+txn_rec::rcv_decode(rec_hdr h, std::ifstream* ifsp, std::size_t& rec_offs)
+{
+    if (rec_offs == 0)
+    {
+        // Read header, allocate for xid
+        _txn_hdr.hdr_copy(h);
+#if defined(JRNL_BIG_ENDIAN) && defined(JRNL_32_BIT)
+        ifsp->ignore(sizeof(u_int32_t)); // _filler0
+#endif
+        ifsp->read((char*)&_txn_hdr._xidsize, sizeof(std::size_t));
+#if defined(JRNL_LITTLE_ENDIAN) && defined(JRNL_32_BIT)
+        ifsp->ignore(sizeof(u_int32_t)); // _filler0
+#endif
+        rec_offs = sizeof(_txn_hdr);
+        _buff = std::malloc(_txn_hdr._xidsize);
+        MALLOC_CHK(_buff, "_buff", "txn_rec", "rcv_decode");
+    }
+    if (rec_offs < sizeof(_txn_hdr) + _txn_hdr._xidsize)
+    {
+        // Read xid (or continue reading xid)
+        std::size_t offs = rec_offs - sizeof(_txn_hdr);
+        ifsp->read((char*)_buff + offs, _txn_hdr._xidsize - offs);
+        std::size_t size_read = ifsp->gcount();
+        rec_offs += size_read;
+        if (size_read < _txn_hdr._xidsize - offs)
+        {
+            assert(ifsp->eof());
+            // As we may have read past eof, turn off fail bit
+            ifsp->clear(ifsp->rdstate()&(~std::ifstream::failbit));
+            assert(!ifsp->fail() && !ifsp->bad());
+            return false;
+        }
+    }
+    if (rec_offs < sizeof(_txn_hdr) + _txn_hdr._xidsize + sizeof(rec_tail))
+    {
+        // Read tail (or continue reading tail)
+        std::size_t offs = rec_offs - sizeof(_txn_hdr) - _txn_hdr._xidsize;
+        ifsp->read((char*)&_txn_tail + offs, sizeof(rec_tail) - offs);
+        std::size_t size_read = ifsp->gcount();
+        rec_offs += size_read;
+        if (size_read < sizeof(rec_tail) - offs)
+        {
+            assert(ifsp->eof());
+            // As we may have read past eof, turn off fail bit
+            ifsp->clear(ifsp->rdstate()&(~std::ifstream::failbit));
+            assert(!ifsp->fail() && !ifsp->bad());
+            return false;
+        }
+    }
+    ifsp->ignore(rec_size_dblks() * JRNL_DBLK_SIZE - rec_size());
+    chk_tail(); // Throws if tail invalid or record incomplete
+    assert(!ifsp->fail() && !ifsp->bad());
+    return true;
+}
+
+std::size_t
+txn_rec::get_xid(void** const xidpp)
+{
+    if (!_buff)
+    {
+        *xidpp = 0;
+        return 0;
+    }
+    *xidpp = _buff;
+    return _txn_hdr._xidsize;
+}
+
+std::string&
+txn_rec::str(std::string& str) const
+{
+    std::ostringstream oss;
+    if (_txn_hdr._magic == RHM_JDAT_TXA_MAGIC)
+        oss << "dtxa_rec: m=" << _txn_hdr._magic;
+    else
+        oss << "dtxc_rec: m=" << _txn_hdr._magic;
+    oss << " v=" << (int)_txn_hdr._version;
+    oss << " rid=" << _txn_hdr._rid;
+    oss << " xid=\"" << _xidp << "\"";
+    str.append(oss.str());
+    return str;
+}
+
+std::size_t
+txn_rec::xid_size() const
+{
+    return _txn_hdr._xidsize;
+}
+
+std::size_t
+txn_rec::rec_size() const
+{
+    return txn_hdr::size() + _txn_hdr._xidsize + rec_tail::size();
+}
+
+void
+txn_rec::chk_hdr() const
+{
+    jrec::chk_hdr(_txn_hdr);
+    if (_txn_hdr._magic != RHM_JDAT_TXA_MAGIC && _txn_hdr._magic != RHM_JDAT_TXC_MAGIC)
+    {
+        std::ostringstream oss;
+        oss << std::hex << std::setfill('0');
+        oss << "dtx magic: rid=0x" << std::setw(16) << _txn_hdr._rid;
+        oss << ": expected=(0x" << std::setw(8) << RHM_JDAT_TXA_MAGIC;
+        oss << " or 0x" << RHM_JDAT_TXC_MAGIC;
+        oss << ") read=0x" << std::setw(2) << (int)_txn_hdr._magic;
+        throw jexception(jerrno::JERR_JREC_BADRECHDR, oss.str(), "txn_rec", "chk_hdr");
+    }
+}
+
+void
+txn_rec::chk_hdr(u_int64_t rid) const
+{
+    chk_hdr();
+    jrec::chk_rid(_txn_hdr, rid);
+}
+
+void
+txn_rec::chk_tail() const
+{
+    jrec::chk_tail(_txn_tail, _txn_hdr);
+}
+
+void
+txn_rec::clean()
+{
+    // clean up allocated memory here
+}
+
+} // namespace journal
+} // namespace mrg



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org