You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kp...@apache.org on 2013/10/22 21:09:58 UTC

svn commit: r1534736 [5/8] - in /qpid/trunk/qpid/cpp/src: ./ qpid/linearstore/ qpid/linearstore/jrnl/ qpid/linearstore/jrnl/utils/ tests/linearstore/

Added: qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/deq_rec.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/deq_rec.cpp?rev=1534736&view=auto
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/deq_rec.cpp (added)
+++ qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/deq_rec.cpp Tue Oct 22 19:09:56 2013
@@ -0,0 +1,464 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+/**
+ * \file deq_rec.cpp
+ *
+ * Qpid asynchronous store plugin library
+ *
+ * This file contains the code for the mrg::journal::deq_rec (journal dequeue
+ * record) class. See comments in file deq_rec.h for details.
+ *
+ * \author Kim van der Riet
+ */
+
+#include "qpid/linearstore/jrnl/deq_rec.h"
+#include "qpid/linearstore/jrnl/utils/deq_hdr.h"
+#include "qpid/linearstore/jrnl/utils/rec_tail.h"
+
+#include <cassert>
+#include <cerrno>
+#include <cstdlib>
+#include <cstring>
+#include <iomanip>
+#include "qpid/linearstore/jrnl/jerrno.h"
+#include "qpid/linearstore/jrnl/jexception.h"
+#include <sstream>
+
+namespace qpid
+{
+namespace qls_jrnl
+{
+
+deq_rec::deq_rec():
+//        _deq_hdr(QLS_DEQ_MAGIC, QLS_JRNL_VERSION, 0, 0, 0, false),
+        _xidp(0),
+        _buff(0)
+//        _deq_tail(_deq_hdr)
+{
+    ::deq_hdr_init(&_deq_hdr, QLS_DEQ_MAGIC, QLS_JRNL_VERSION, 0, 0, 0, 0);
+    ::rec_tail_copy(&_deq_tail, &_deq_hdr._rhdr, 0);
+}
+
+deq_rec::deq_rec(const uint64_t rid, const uint64_t drid, const void* const xidp,
+        const std::size_t xidlen, const bool txn_coml_commit):
+//        _deq_hdr(QLS_DEQ_MAGIC, QLS_JRNL_VERSION, rid, drid, xidlen, owi, txn_coml_commit),
+        _xidp(xidp),
+        _buff(0)
+//        _deq_tail(_deq_hdr)
+{
+    ::deq_hdr_init(&_deq_hdr, QLS_DEQ_MAGIC, QLS_JRNL_VERSION, 0, rid, drid, xidlen);
+    ::rec_tail_copy(&_deq_tail, &_deq_hdr._rhdr, 0);
+	::set_txn_coml_commit(&_deq_hdr, txn_coml_commit);
+}
+
+deq_rec::~deq_rec()
+{
+    clean();
+}
+
+void
+deq_rec::reset()
+{
+    _deq_hdr._rhdr._rid = 0;
+//    _deq_hdr.set_owi(false);
+    ::set_txn_coml_commit(&_deq_hdr, false);
+    _deq_hdr._deq_rid = 0;
+    _deq_hdr._xidsize = 0;
+    _deq_tail._checksum = 0;
+    _deq_tail._rid = 0;
+    _xidp = 0;
+    _buff = 0;
+}
+
+void
+deq_rec::reset(const  uint64_t rid, const  uint64_t drid, const void* const xidp,
+        const std::size_t xidlen, const bool txn_coml_commit)
+{
+    _deq_hdr._rhdr._rid = rid;
+    ::set_txn_coml_commit(&_deq_hdr, txn_coml_commit);
+    _deq_hdr._deq_rid = drid;
+    _deq_hdr._xidsize = xidlen;
+    _deq_tail._rid = rid;
+    _xidp = xidp;
+    _buff = 0;
+}
+
+uint32_t
+deq_rec::encode(void* wptr, uint32_t rec_offs_dblks, uint32_t max_size_dblks)
+{
+    assert(wptr != 0);
+    assert(max_size_dblks > 0);
+    if (_xidp == 0)
+        assert(_deq_hdr._xidsize == 0);
+
+    std::size_t rec_offs = rec_offs_dblks * QLS_DBLK_SIZE_BYTES;
+    std::size_t rem = max_size_dblks * QLS_DBLK_SIZE_BYTES;
+    std::size_t wr_cnt = 0;
+    if (rec_offs_dblks) // Continuation of split dequeue record (over 2 or more pages)
+    {
+        if (size_dblks(rec_size()) - rec_offs_dblks > max_size_dblks) // Further split required
+        {
+            rec_offs -= sizeof(_deq_hdr);
+            std::size_t wsize = _deq_hdr._xidsize > rec_offs ? _deq_hdr._xidsize - rec_offs : 0;
+            std::size_t wsize2 = wsize;
+            if (wsize)
+            {
+                if (wsize > rem)
+                    wsize = rem;
+                std::memcpy(wptr, (const char*)_xidp + rec_offs, wsize);
+                wr_cnt += wsize;
+                rem -= wsize;
+            }
+            rec_offs -= _deq_hdr._xidsize - wsize2;
+            if (rem)
+            {
+                wsize = sizeof(_deq_tail) > rec_offs ? sizeof(_deq_tail) - rec_offs : 0;
+                wsize2 = wsize;
+                if (wsize)
+                {
+                    if (wsize > rem)
+                        wsize = rem;
+                    std::memcpy((char*)wptr + wr_cnt, (char*)&_deq_tail + rec_offs, wsize);
+                    wr_cnt += wsize;
+                    rem -= wsize;
+                }
+                rec_offs -= sizeof(_deq_tail) - wsize2;
+            }
+            assert(rem == 0);
+            assert(rec_offs == 0);
+        }
+        else // No further split required
+        {
+            rec_offs -= sizeof(_deq_hdr);
+            std::size_t wsize = _deq_hdr._xidsize > rec_offs ? _deq_hdr._xidsize - rec_offs : 0;
+            if (wsize)
+            {
+                std::memcpy(wptr, (const char*)_xidp + rec_offs, wsize);
+                wr_cnt += wsize;
+            }
+            rec_offs -= _deq_hdr._xidsize - wsize;
+            wsize = sizeof(_deq_tail) > rec_offs ? sizeof(_deq_tail) - rec_offs : 0;
+            if (wsize)
+            {
+                std::memcpy((char*)wptr + wr_cnt, (char*)&_deq_tail + rec_offs, wsize);
+                wr_cnt += wsize;
+#ifdef QLS_CLEAN
+                std::size_t rec_offs = rec_offs_dblks * QLS_DBLK_SIZE_BYTES;
+                std::size_t dblk_rec_size = size_dblks(rec_size() - rec_offs) * QLS_DBLK_SIZE_BYTES;
+                std::memset((char*)wptr + wr_cnt, QLS_CLEAN_CHAR, dblk_rec_size - wr_cnt);
+#endif
+            }
+            rec_offs -= sizeof(_deq_tail) - wsize;
+            assert(rec_offs == 0);
+        }
+    }
+    else // Start at beginning of data record
+    {
+        // Assumption: the header will always fit into the first dblk
+        std::memcpy(wptr, (void*)&_deq_hdr, sizeof(_deq_hdr));
+        wr_cnt = sizeof(_deq_hdr);
+        if (size_dblks(rec_size()) > max_size_dblks) // Split required - can only occur with xid
+        {
+            std::size_t wsize;
+            rem -= sizeof(_deq_hdr);
+            if (rem)
+            {
+                wsize = rem >= _deq_hdr._xidsize ? _deq_hdr._xidsize : rem;
+                std::memcpy((char*)wptr + wr_cnt, _xidp, wsize);
+                wr_cnt += wsize;
+                rem -= wsize;
+            }
+            if (rem)
+            {
+                wsize = rem >= sizeof(_deq_tail) ? sizeof(_deq_tail) : rem;
+                std::memcpy((char*)wptr + wr_cnt, (void*)&_deq_tail, wsize);
+                wr_cnt += wsize;
+                rem -= wsize;
+            }
+            assert(rem == 0);
+        }
+        else // No split required
+        {
+            if (_deq_hdr._xidsize)
+            {
+                std::memcpy((char*)wptr + wr_cnt, _xidp, _deq_hdr._xidsize);
+                wr_cnt += _deq_hdr._xidsize;
+                std::memcpy((char*)wptr + wr_cnt, (void*)&_deq_tail, sizeof(_deq_tail));
+                wr_cnt += sizeof(_deq_tail);
+            }
+#ifdef QLS_CLEAN
+            std::size_t dblk_rec_size = size_dblks(rec_size()) * QLS_DBLK_SIZE_BYTES;
+            std::memset((char*)wptr + wr_cnt, QLS_CLEAN_CHAR, dblk_rec_size - wr_cnt);
+#endif
+        }
+    }
+    return size_dblks(wr_cnt);
+}
+
+uint32_t
+deq_rec::decode(rec_hdr_t& h, void* rptr, uint32_t rec_offs_dblks, uint32_t max_size_dblks)
+{
+    assert(rptr != 0);
+    assert(max_size_dblks > 0);
+
+    std::size_t rd_cnt = 0;
+    if (rec_offs_dblks) // Continuation of record on new page
+    {
+        const uint32_t hdr_xid_dblks = size_dblks(sizeof(deq_hdr_t) + _deq_hdr._xidsize);
+        const uint32_t hdr_xid_tail_dblks = size_dblks(sizeof(deq_hdr_t) + _deq_hdr._xidsize +
+                sizeof(rec_tail_t));
+        const std::size_t rec_offs = rec_offs_dblks * QLS_DBLK_SIZE_BYTES;
+
+        if (hdr_xid_tail_dblks - rec_offs_dblks <= max_size_dblks)
+        {
+            // Remainder of xid fits within this page
+            if (rec_offs - sizeof(deq_hdr_t) < _deq_hdr._xidsize)
+            {
+                // Part of xid still outstanding, copy remainder of xid and tail
+                const std::size_t xid_offs = rec_offs - sizeof(deq_hdr_t);
+                const std::size_t xid_rem = _deq_hdr._xidsize - xid_offs;
+                std::memcpy((char*)_buff + xid_offs, rptr, xid_rem);
+                rd_cnt = xid_rem;
+                std::memcpy((void*)&_deq_tail, ((char*)rptr + rd_cnt), sizeof(_deq_tail));
+                chk_tail();
+                rd_cnt += sizeof(_deq_tail);
+            }
+            else
+            {
+                // Tail or part of tail only outstanding, complete tail
+                const std::size_t tail_offs = rec_offs - sizeof(deq_hdr_t) - _deq_hdr._xidsize;
+                const std::size_t tail_rem = sizeof(rec_tail_t) - tail_offs;
+                std::memcpy((char*)&_deq_tail + tail_offs, rptr, tail_rem);
+                chk_tail();
+                rd_cnt = tail_rem;
+            }
+        }
+        else if (hdr_xid_dblks - rec_offs_dblks <= max_size_dblks)
+        {
+            // Remainder of xid fits within this page, tail split
+            const std::size_t xid_offs = rec_offs - sizeof(deq_hdr_t);
+            const std::size_t xid_rem = _deq_hdr._xidsize - xid_offs;
+            std::memcpy((char*)_buff + xid_offs, rptr, xid_rem);
+            rd_cnt += xid_rem;
+            const std::size_t tail_rem = (max_size_dblks * QLS_DBLK_SIZE_BYTES) - rd_cnt;
+            if (tail_rem)
+            {
+                std::memcpy((void*)&_deq_tail, ((char*)rptr + xid_rem), tail_rem);
+                rd_cnt += tail_rem;
+            }
+        }
+        else
+        {
+            // Remainder of xid split
+            const std::size_t xid_cp_size = (max_size_dblks * QLS_DBLK_SIZE_BYTES);
+            std::memcpy((char*)_buff + rec_offs - sizeof(deq_hdr_t), rptr, xid_cp_size);
+            rd_cnt += xid_cp_size;
+        }
+    }
+    else // Start of record
+    {
+        // Get and check header
+        //_deq_hdr.hdr_copy(h);
+        ::rec_hdr_copy(&_deq_hdr._rhdr, &h);
+        rd_cnt = sizeof(rec_hdr_t);
+        _deq_hdr._deq_rid = *(uint64_t*)((char*)rptr + rd_cnt);
+        rd_cnt += sizeof(uint64_t);
+        _deq_hdr._xidsize = *(std::size_t*)((char*)rptr + rd_cnt);
+        rd_cnt = sizeof(deq_hdr_t);
+        chk_hdr();
+        if (_deq_hdr._xidsize)
+        {
+            _buff = std::malloc(_deq_hdr._xidsize);
+            MALLOC_CHK(_buff, "_buff", "deq_rec", "decode");
+            const uint32_t hdr_xid_dblks = size_dblks(sizeof(deq_hdr_t) + _deq_hdr._xidsize);
+            const uint32_t hdr_xid_tail_dblks = size_dblks(sizeof(deq_hdr_t) +  _deq_hdr._xidsize +
+                    sizeof(rec_tail_t));
+
+            // Check if record (header + xid + tail) fits within this page, we can check the
+            // tail before the expense of copying data to memory
+            if (hdr_xid_tail_dblks <= max_size_dblks)
+            {
+                // Entire header, xid and tail fits within this page
+                std::memcpy(_buff, (char*)rptr + rd_cnt, _deq_hdr._xidsize);
+                rd_cnt += _deq_hdr._xidsize;
+                std::memcpy((void*)&_deq_tail, (char*)rptr + rd_cnt, sizeof(_deq_tail));
+                rd_cnt += sizeof(_deq_tail);
+                chk_tail();
+            }
+            else if (hdr_xid_dblks <= max_size_dblks)
+            {
+                // Entire header and xid fit within this page, tail split
+                std::memcpy(_buff, (char*)rptr + rd_cnt, _deq_hdr._xidsize);
+                rd_cnt += _deq_hdr._xidsize;
+                const std::size_t tail_rem = (max_size_dblks * QLS_DBLK_SIZE_BYTES) - rd_cnt;
+                if (tail_rem)
+                {
+                    std::memcpy((void*)&_deq_tail, (char*)rptr + rd_cnt, tail_rem);
+                    rd_cnt += tail_rem;
+                }
+            }
+            else
+            {
+                // Header fits within this page, xid split
+                const std::size_t xid_cp_size = (max_size_dblks * QLS_DBLK_SIZE_BYTES) - rd_cnt;
+                std::memcpy(_buff, (char*)rptr + rd_cnt, xid_cp_size);
+                rd_cnt += xid_cp_size;
+            }
+        }
+    }
+    return size_dblks(rd_cnt);
+}
+
+bool
+deq_rec::rcv_decode(rec_hdr_t h, std::ifstream* ifsp, std::size_t& rec_offs)
+{
+    if (rec_offs == 0)
+    {
+        //_deq_hdr.hdr_copy(h);
+        ::rec_hdr_copy(&_deq_hdr._rhdr, &h);
+        ifsp->read((char*)&_deq_hdr._deq_rid, sizeof(uint64_t));
+        ifsp->read((char*)&_deq_hdr._xidsize, sizeof(std::size_t));
+#if defined(JRNL_32_BIT)
+        ifsp->ignore(sizeof(uint32_t)); // _filler0
+#endif
+        rec_offs = sizeof(_deq_hdr);
+        // Read header, allocate (if req'd) for xid
+        if (_deq_hdr._xidsize)
+        {
+            _buff = std::malloc(_deq_hdr._xidsize);
+            MALLOC_CHK(_buff, "_buff", "enq_rec", "rcv_decode");
+        }
+    }
+    if (rec_offs < sizeof(_deq_hdr) + _deq_hdr._xidsize)
+    {
+        // Read xid (or continue reading xid)
+        std::size_t offs = rec_offs - sizeof(_deq_hdr);
+        ifsp->read((char*)_buff + offs, _deq_hdr._xidsize - offs);
+        std::size_t size_read = ifsp->gcount();
+        rec_offs += size_read;
+        if (size_read < _deq_hdr._xidsize - offs)
+        {
+            assert(ifsp->eof());
+            // As we may have read past eof, turn off fail bit
+            ifsp->clear(ifsp->rdstate()&(~std::ifstream::failbit));
+            assert(!ifsp->fail() && !ifsp->bad());
+            return false;
+        }
+    }
+    if (rec_offs < sizeof(_deq_hdr) +
+            (_deq_hdr._xidsize ? _deq_hdr._xidsize + sizeof(rec_tail_t) : 0))
+    {
+        // Read tail (or continue reading tail)
+        std::size_t offs = rec_offs - sizeof(_deq_hdr) - _deq_hdr._xidsize;
+        ifsp->read((char*)&_deq_tail + offs, sizeof(rec_tail_t) - offs);
+        std::size_t size_read = ifsp->gcount();
+        rec_offs += size_read;
+        if (size_read < sizeof(rec_tail_t) - offs)
+        {
+            assert(ifsp->eof());
+            // As we may have read past eof, turn off fail bit
+            ifsp->clear(ifsp->rdstate()&(~std::ifstream::failbit));
+            assert(!ifsp->fail() && !ifsp->bad());
+            return false;
+        }
+    }
+    ifsp->ignore(rec_size_dblks() * QLS_DBLK_SIZE_BYTES - rec_size());
+    if (_deq_hdr._xidsize)
+        chk_tail(); // Throws if tail invalid or record incomplete
+    assert(!ifsp->fail() && !ifsp->bad());
+    return true;
+}
+
+std::size_t
+deq_rec::get_xid(void** const xidpp)
+{
+    if (!_buff)
+    {
+        *xidpp = 0;
+        return 0;
+    }
+    *xidpp = _buff;
+    return _deq_hdr._xidsize;
+}
+
+std::string&
+deq_rec::str(std::string& str) const
+{
+    std::ostringstream oss;
+    oss << "deq_rec: m=" << _deq_hdr._rhdr._magic;
+    oss << " v=" << (int)_deq_hdr._rhdr._version;
+    oss << " rid=" << _deq_hdr._rhdr._rid;
+    oss << " drid=" << _deq_hdr._deq_rid;
+    if (_xidp)
+        oss << " xid=\"" << _xidp << "\"";
+    str.append(oss.str());
+    return str;
+}
+
+std::size_t
+deq_rec::xid_size() const
+{
+    return _deq_hdr._xidsize;
+}
+
+std::size_t
+deq_rec::rec_size() const
+{
+    return sizeof(deq_hdr_t) + (_deq_hdr._xidsize ? _deq_hdr._xidsize + sizeof(rec_tail_t) : 0);
+}
+
+void
+deq_rec::chk_hdr() const
+{
+    jrec::chk_hdr(_deq_hdr._rhdr);
+    if (_deq_hdr._rhdr._magic != QLS_DEQ_MAGIC)
+    {
+        std::ostringstream oss;
+        oss << std::hex << std::setfill('0');
+        oss << "deq magic: rid=0x" << std::setw(16) << _deq_hdr._rhdr._rid;
+        oss << ": expected=0x" << std::setw(8) << QLS_DEQ_MAGIC;
+        oss << " read=0x" << std::setw(2) << (int)_deq_hdr._rhdr._magic;
+        throw jexception(jerrno::JERR_JREC_BADRECHDR, oss.str(), "deq_rec", "chk_hdr");
+    }
+}
+
+void
+deq_rec::chk_hdr(uint64_t rid) const
+{
+    chk_hdr();
+    jrec::chk_rid(_deq_hdr._rhdr, rid);
+}
+
+void
+deq_rec::chk_tail() const
+{
+    jrec::chk_tail(_deq_tail, _deq_hdr._rhdr);
+}
+
+void
+deq_rec::clean()
+{
+    // clean up allocated memory here
+}
+
+} // namespace journal
+} // namespace mrg

Added: qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/deq_rec.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/deq_rec.h?rev=1534736&view=auto
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/deq_rec.h (added)
+++ qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/deq_rec.h Tue Oct 22 19:09:56 2013
@@ -0,0 +1,91 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#ifndef QPID_LEGACYSTORE_JRNL_DEQ_REQ_H
+#define QPID_LEGACYSTORE_JRNL_DEQ_REQ_H
+
+namespace qpid
+{
+namespace qls_jrnl
+{
+class deq_rec;
+}}
+
+#include <cstddef>
+#include "qpid/linearstore/jrnl/utils/deq_hdr.h"
+#include "qpid/linearstore/jrnl/jrec.h"
+
+namespace qpid
+{
+namespace qls_jrnl
+{
+
+    /**
+    * \class deq_rec
+    * \brief Class to handle a single journal dequeue record.
+    */
+    class deq_rec : public jrec
+    {
+    private:
+        deq_hdr_t _deq_hdr;           ///< Dequeue header
+        const void* _xidp;          ///< xid pointer for encoding (writing to disk)
+        void* _buff;                ///< Pointer to buffer to receive data read from disk
+        rec_tail_t _deq_tail;         ///< Record tail, only encoded if XID is present
+
+    public:
+        // constructor used for read operations and xid will have memory allocated
+        deq_rec();
+        // constructor used for write operations, where xid already exists
+        deq_rec(const uint64_t rid, const uint64_t drid, const void* const xidp,
+                const std::size_t xidlen, const bool txn_coml_commit);
+        virtual ~deq_rec();
+
+        // Prepare instance for use in reading data from journal
+        void reset();
+        // Prepare instance for use in writing data to journal
+        void reset(const  uint64_t rid, const  uint64_t drid, const void* const xidp,
+                const std::size_t xidlen, const bool txn_coml_commit);
+        uint32_t encode(void* wptr, uint32_t rec_offs_dblks, uint32_t max_size_dblks);
+        uint32_t decode(rec_hdr_t& h, void* rptr, uint32_t rec_offs_dblks,
+                uint32_t max_size_dblks);
+        // Decode used for recover
+        bool rcv_decode(rec_hdr_t h, std::ifstream* ifsp, std::size_t& rec_offs);
+
+        inline bool is_txn_coml_commit() const { return ::is_txn_coml_commit(&_deq_hdr); }
+        inline uint64_t rid() const { return _deq_hdr._rhdr._rid; }
+        inline uint64_t deq_rid() const { return _deq_hdr._deq_rid; }
+        std::size_t get_xid(void** const xidpp);
+        std::string& str(std::string& str) const;
+        inline std::size_t data_size() const { return 0; } // This record never carries data
+        std::size_t xid_size() const;
+        std::size_t rec_size() const;
+
+    private:
+        virtual void chk_hdr() const;
+        virtual void chk_hdr(uint64_t rid) const;
+        virtual void chk_tail() const;
+        virtual void clean();
+    }; // class deq_rec
+
+} // namespace journal
+} // namespace mrg
+
+#endif // ifndef QPID_LEGACYSTORE_JRNL_DEQ_REQ_H

Added: qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/enq_map.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/enq_map.cpp?rev=1534736&view=auto
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/enq_map.cpp (added)
+++ qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/enq_map.cpp Tue Oct 22 19:09:56 2013
@@ -0,0 +1,186 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "qpid/linearstore/jrnl/enq_map.h"
+
+#include <iomanip>
+#include "qpid/linearstore/jrnl/jerrno.h"
+#include "qpid/linearstore/jrnl/slock.h"
+#include <sstream>
+
+
+namespace qpid
+{
+namespace qls_jrnl
+{
+
+// static return/error codes
+int16_t enq_map::EMAP_DUP_RID = -3;
+int16_t enq_map::EMAP_LOCKED = -2;
+int16_t enq_map::EMAP_RID_NOT_FOUND = -1;
+int16_t enq_map::EMAP_OK = 0;
+int16_t enq_map::EMAP_FALSE = 0;
+int16_t enq_map::EMAP_TRUE = 1;
+
+enq_map::enq_map():
+        _map(){}
+
+enq_map::~enq_map() {}
+
+
+short
+enq_map::insert_pfid(const uint64_t rid, const uint64_t pfid, const std::streampos file_posn)
+{
+    return insert_pfid(rid, pfid, file_posn, false);
+}
+
+short
+enq_map::insert_pfid(const uint64_t rid, const uint64_t pfid, const std::streampos file_posn, const bool locked)
+{
+    std::pair<emap_itr, bool> ret;
+    emap_data_struct_t rec(pfid, file_posn, locked);
+    {
+        slock s(_mutex);
+        ret = _map.insert(emap_param(rid, rec));
+    }
+    if (ret.second == false)
+        return EMAP_DUP_RID;
+    return EMAP_OK;
+}
+
+short
+enq_map::get_pfid(const uint64_t rid, uint64_t& pfid)
+{
+    slock s(_mutex);
+    emap_itr itr = _map.find(rid);
+    if (itr == _map.end()) // not found in map
+        return EMAP_RID_NOT_FOUND;
+    if (itr->second._lock)
+        return EMAP_LOCKED;
+    pfid = itr->second._pfid;
+    return EMAP_OK;
+}
+
+short
+enq_map::get_remove_pfid(const uint64_t rid, uint64_t& pfid, const bool txn_flag)
+{
+    slock s(_mutex);
+    emap_itr itr = _map.find(rid);
+    if (itr == _map.end()) // not found in map
+        return EMAP_RID_NOT_FOUND;
+    if (itr->second._lock && !txn_flag) // locked, but not a commit/abort
+        return EMAP_LOCKED;
+    pfid = itr->second._pfid;
+    _map.erase(itr);
+    return EMAP_OK;
+}
+
+short
+enq_map::get_file_posn(const uint64_t rid, std::streampos& file_posn) {
+    slock s(_mutex);
+    emap_itr itr = _map.find(rid);
+    if (itr == _map.end()) // not found in map
+        return EMAP_RID_NOT_FOUND;
+    if (itr->second._lock)
+        return EMAP_LOCKED;
+    file_posn = itr->second._file_posn;
+    return EMAP_OK;
+}
+
+short
+enq_map::get_data(const uint64_t rid, emap_data_struct_t& eds) {
+    slock s(_mutex);
+    emap_itr itr = _map.find(rid);
+    if (itr == _map.end()) // not found in map
+        return EMAP_RID_NOT_FOUND;
+    eds._pfid = itr->second._pfid;
+    eds._file_posn = itr->second._file_posn;
+    eds._lock = itr->second._lock;
+    return EMAP_OK;
+}
+
+bool
+enq_map::is_enqueued(const uint64_t rid, bool ignore_lock)
+{
+    slock s(_mutex);
+    emap_itr itr = _map.find(rid);
+    if (itr == _map.end()) // not found in map
+        return false;
+    if (!ignore_lock && itr->second._lock) // locked
+        return false;
+    return true;
+}
+
+short
+enq_map::lock(const uint64_t rid)
+{
+    slock s(_mutex);
+    emap_itr itr = _map.find(rid);
+    if (itr == _map.end()) // not found in map
+        return EMAP_RID_NOT_FOUND;
+    itr->second._lock = true;
+    return EMAP_OK;
+}
+
+short
+enq_map::unlock(const uint64_t rid)
+{
+    slock s(_mutex);
+    emap_itr itr = _map.find(rid);
+    if (itr == _map.end()) // not found in map
+        return EMAP_RID_NOT_FOUND;
+    itr->second._lock = false;
+    return EMAP_OK;
+}
+
+short
+enq_map::is_locked(const uint64_t rid)
+{
+    slock s(_mutex);
+    emap_itr itr = _map.find(rid);
+    if (itr == _map.end()) // not found in map
+        return EMAP_RID_NOT_FOUND;
+    return itr->second._lock ? EMAP_TRUE : EMAP_FALSE;
+}
+
+void
+enq_map::rid_list(std::vector<uint64_t>& rv)
+{
+    rv.clear();
+    {
+        slock s(_mutex);
+        for (emap_itr itr = _map.begin(); itr != _map.end(); itr++)
+            rv.push_back(itr->first);
+    }
+}
+
+void
+enq_map::pfid_list(std::vector<uint64_t>& fv)
+{
+    fv.clear();
+    {
+        slock s(_mutex);
+        for (emap_itr itr = _map.begin(); itr != _map.end(); itr++)
+            fv.push_back(itr->second._pfid);
+    }
+}
+
+}}

Added: qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/enq_map.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/enq_map.h?rev=1534736&view=auto
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/enq_map.h (added)
+++ qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/enq_map.h Tue Oct 22 19:09:56 2013
@@ -0,0 +1,112 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#ifndef QPID_LEGACYSTORE_JRNL_ENQ_MAP_H
+#define QPID_LEGACYSTORE_JRNL_ENQ_MAP_H
+
+namespace qpid
+{
+namespace qls_jrnl
+{
+class enq_map;
+}}
+
+#include "qpid/linearstore/jrnl/jexception.h"
+#include "qpid/linearstore/jrnl/smutex.h"
+#include <map>
+#include <pthread.h>
+#include <vector>
+
+namespace qpid
+{
+namespace qls_jrnl
+{
+
+    /**
+    * \class enq_map
+    * \brief Class for storing the physical file id (pfid) and a transaction locked flag for each enqueued
+    *     data block using the record id (rid) as a key. This is the primary mechanism for
+    *     deterimining the enqueue low water mark: if a pfid exists in this map, then there is
+    *     at least one still-enqueued record in that file. (The transaction map must also be
+    *     clear, however.)
+    *
+    * Map rids against pfid and lock status. As records are enqueued, they are added to this
+    * map, and as they are dequeued, they are removed. An enqueue is locked when a transactional
+    * dequeue is pending that has been neither committed nor aborted.
+    * <pre>
+    *   key      data
+    *
+    *   rid1 --- [ pfid, txn_lock ]
+    *   rid2 --- [ pfid, txn_lock ]
+    *   rid3 --- [ pfid, txn_lock ]
+    *   ...
+    * </pre>
+    */
+    class enq_map
+    {
+    public:
+        // return/error codes
+        static short EMAP_DUP_RID;
+        static short EMAP_LOCKED;
+        static short EMAP_RID_NOT_FOUND;
+        static short EMAP_OK;
+        static short EMAP_FALSE;
+        static short EMAP_TRUE;
+
+        typedef struct emap_data_struct_t {
+            uint64_t        _pfid;
+            std::streampos  _file_posn;
+            bool            _lock;
+            emap_data_struct_t() : _pfid(0), _file_posn(0), _lock(false) {}
+            emap_data_struct_t(const uint64_t pfid, const std::streampos file_posn, const bool lock) : _pfid(pfid), _file_posn(file_posn), _lock(lock) {}
+        } emqp_data_struct_t;
+        typedef std::pair<uint64_t, emap_data_struct_t> emap_param;
+        typedef std::map<uint64_t, emap_data_struct_t> emap;
+        typedef emap::iterator emap_itr;
+
+    private:
+        emap _map;
+        smutex _mutex;
+
+    public:
+        enq_map();
+        virtual ~enq_map();
+
+        short insert_pfid(const uint64_t rid, const uint64_t pfid, const std::streampos file_posn); // 0=ok; -3=duplicate rid;
+        short insert_pfid(const uint64_t rid, const uint64_t pfid, const std::streampos file_posn, const bool locked); // 0=ok; -3=duplicate rid;
+        short get_pfid(const uint64_t rid, uint64_t& pfid); // >=0=pfid; -1=rid not found; -2=locked
+        short get_remove_pfid(const uint64_t rid, uint64_t& pfid, const bool txn_flag = false); // >=0=pfid; -1=rid not found; -2=locked
+        short get_file_posn(const uint64_t rid, std::streampos& file_posn); // -1=rid not found; -2=locked
+        short get_data(const uint64_t rid, emap_data_struct_t& eds);
+        bool is_enqueued(const uint64_t rid, bool ignore_lock = false);
+        short lock(const uint64_t rid); // 0=ok; -1=rid not found
+        short unlock(const uint64_t rid); // 0=ok; -1=rid not found
+        short is_locked(const uint64_t rid); // 1=true; 0=false; -1=rid not found
+        inline void clear() { _map.clear(); }
+        inline bool empty() const { return _map.empty(); }
+        inline uint32_t size() const { return uint32_t(_map.size()); }
+        void rid_list(std::vector<uint64_t>& rv);
+        void pfid_list(std::vector<uint64_t>& fv);
+    };
+
+}}
+
+#endif // ifndef QPID_LEGACYSTORE_JRNL_ENQ_MAP_H

Added: qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/enq_rec.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/enq_rec.cpp?rev=1534736&view=auto
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/enq_rec.cpp (added)
+++ qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/enq_rec.cpp Tue Oct 22 19:09:56 2013
@@ -0,0 +1,621 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "qpid/linearstore/jrnl/enq_rec.h"
+
+#include <cassert>
+#include <cerrno>
+#include <cstdlib>
+#include <cstring>
+#include <iomanip>
+#include "qpid/linearstore/jrnl/jerrno.h"
+#include "qpid/linearstore/jrnl/jexception.h"
+#include <sstream>
+
+namespace qpid
+{
+namespace qls_jrnl
+{
+
+// Constructor used for read operations, where buf contains preallocated space to receive data.
+enq_rec::enq_rec():
+        jrec(), // superclass
+        //_enq_hdr(QLS_ENQ_MAGIC, QLS_JRNL_VERSION, 0, 0, 0, false, false),
+        _xidp(0),
+        _data(0),
+        _buff(0)
+        //_enq_tail(_enq_hdr)
+{
+    ::enq_hdr_init(&_enq_hdr, QLS_ENQ_MAGIC, QLS_JRNL_VERSION, 0, 0, 0, false);
+    ::rec_tail_copy(&_enq_tail, &_enq_hdr._rhdr, 0);
+}
+
+// Constructor used for transactional write operations, where dbuf contains data to be written.
+enq_rec::enq_rec(const uint64_t rid, const void* const dbuf, const std::size_t dlen,
+        const void* const xidp, const std::size_t xidlen, const bool transient):
+        jrec(), // superclass
+        //_enq_hdr(QLS_ENQ_MAGIC, QLS_JRNL_VERSION, rid, xidlen, dlen, owi, transient),
+        _xidp(xidp),
+        _data(dbuf),
+        _buff(0)
+        //_enq_tail(_enq_hdr)
+{
+    ::enq_hdr_init(&_enq_hdr, QLS_ENQ_MAGIC, QLS_JRNL_VERSION, 0, rid, xidlen, dlen);
+    ::rec_tail_copy(&_enq_tail, &_enq_hdr._rhdr, 0);
+    ::set_enq_transient(&_enq_hdr, transient);
+}
+
+enq_rec::~enq_rec()
+{
+    clean();
+}
+
+// Prepare instance for use in reading data from journal, where buf contains preallocated space
+// to receive data.
+void
+enq_rec::reset()
+{
+    _enq_hdr._rhdr._rid = 0;
+    ::set_enq_transient(&_enq_hdr, false);
+    _enq_hdr._xidsize = 0;
+    _enq_hdr._dsize = 0;
+    _xidp = 0;
+    _data = 0;
+    _buff = 0;
+    _enq_tail._rid = 0;
+}
+
+// Prepare instance for use in writing transactional data to journal, where dbuf contains data to
+// be written.
+void
+enq_rec::reset(const uint64_t rid, const void* const dbuf, const std::size_t dlen,
+        const void* const xidp, const std::size_t xidlen, const bool transient,
+        const bool external)
+{
+    _enq_hdr._rhdr._rid = rid;
+    ::set_enq_transient(&_enq_hdr, transient);
+    ::set_enq_external(&_enq_hdr, external);
+    _enq_hdr._xidsize = xidlen;
+    _enq_hdr._dsize = dlen;
+    _xidp = xidp;
+    _data = dbuf;
+    _buff = 0;
+    _enq_tail._rid = rid;
+}
+
+uint32_t
+enq_rec::encode(void* wptr, uint32_t rec_offs_dblks, uint32_t max_size_dblks)
+{
+    assert(wptr != 0);
+    assert(max_size_dblks > 0);
+    if (_xidp == 0)
+        assert(_enq_hdr._xidsize == 0);
+
+    std::size_t rec_offs = rec_offs_dblks * QLS_DBLK_SIZE_BYTES;
+    std::size_t rem = max_size_dblks * QLS_DBLK_SIZE_BYTES;
+    std::size_t wr_cnt = 0;
+    if (rec_offs_dblks) // Continuation of split data record (over 2 or more pages)
+    {
+        if (size_dblks(rec_size()) - rec_offs_dblks > max_size_dblks) // Further split required
+        {
+            rec_offs -= sizeof(_enq_hdr);
+            std::size_t wsize = _enq_hdr._xidsize > rec_offs ? _enq_hdr._xidsize - rec_offs : 0;
+            std::size_t wsize2 = wsize;
+            if (wsize)
+            {
+                if (wsize > rem)
+                    wsize = rem;
+                std::memcpy(wptr, (const char*)_xidp + rec_offs, wsize);
+                wr_cnt = wsize;
+                rem -= wsize;
+            }
+            rec_offs -= _enq_hdr._xidsize - wsize2;
+            if (rem && !::is_enq_external(&_enq_hdr))
+            {
+                wsize = _enq_hdr._dsize > rec_offs ? _enq_hdr._dsize - rec_offs : 0;
+                wsize2 = wsize;
+                if (wsize)
+                {
+                    if (wsize > rem)
+                        wsize = rem;
+                    std::memcpy((char*)wptr + wr_cnt, (const char*)_data + rec_offs, wsize);
+                    wr_cnt += wsize;
+                    rem -= wsize;
+                }
+                rec_offs -= _enq_hdr._dsize - wsize2;
+            }
+            if (rem)
+            {
+                wsize = sizeof(_enq_tail) > rec_offs ? sizeof(_enq_tail) - rec_offs : 0;
+                wsize2 = wsize;
+                if (wsize)
+                {
+                    if (wsize > rem)
+                        wsize = rem;
+                    std::memcpy((char*)wptr + wr_cnt, (char*)&_enq_tail + rec_offs, wsize);
+                    wr_cnt += wsize;
+                    rem -= wsize;
+                }
+                rec_offs -= sizeof(_enq_tail) - wsize2;
+            }
+            assert(rem == 0);
+            assert(rec_offs == 0);
+        }
+        else // No further split required
+        {
+            rec_offs -= sizeof(_enq_hdr);
+            std::size_t wsize = _enq_hdr._xidsize > rec_offs ? _enq_hdr._xidsize - rec_offs : 0;
+            if (wsize)
+            {
+                std::memcpy(wptr, (const char*)_xidp + rec_offs, wsize);
+                wr_cnt += wsize;
+            }
+            rec_offs -= _enq_hdr._xidsize - wsize;
+            wsize = _enq_hdr._dsize > rec_offs ? _enq_hdr._dsize - rec_offs : 0;
+            if (wsize && !::is_enq_external(&_enq_hdr))
+            {
+                std::memcpy((char*)wptr + wr_cnt, (const char*)_data + rec_offs, wsize);
+                wr_cnt += wsize;
+            }
+            rec_offs -= _enq_hdr._dsize - wsize;
+            wsize = sizeof(_enq_tail) > rec_offs ? sizeof(_enq_tail) - rec_offs : 0;
+            if (wsize)
+            {
+                std::memcpy((char*)wptr + wr_cnt, (char*)&_enq_tail + rec_offs, wsize);
+                wr_cnt += wsize;
+#ifdef QLS_CLEAN
+                std::size_t rec_offs = rec_offs_dblks * QLS_DBLK_SIZE_BYTES;
+                std::size_t dblk_rec_size = size_dblks(rec_size() - rec_offs) * QLS_DBLK_SIZE_BYTES;
+                std::memset((char*)wptr + wr_cnt, QLS_CLEAN_CHAR, dblk_rec_size - wr_cnt);
+#endif
+            }
+            rec_offs -= sizeof(_enq_tail) - wsize;
+            assert(rec_offs == 0);
+        }
+    }
+    else // Start at beginning of data record
+    {
+        // Assumption: the header will always fit into the first dblk
+        std::memcpy(wptr, (void*)&_enq_hdr, sizeof(_enq_hdr));
+        wr_cnt = sizeof(_enq_hdr);
+        if (size_dblks(rec_size()) > max_size_dblks) // Split required
+        {
+            std::size_t wsize;
+            rem -= sizeof(_enq_hdr);
+            if (rem)
+            {
+                wsize = rem >= _enq_hdr._xidsize ? _enq_hdr._xidsize : rem;
+                std::memcpy((char*)wptr + wr_cnt,  _xidp, wsize);
+                wr_cnt += wsize;
+                rem -= wsize;
+            }
+            if (rem && !::is_enq_external(&_enq_hdr))
+            {
+                wsize = rem >= _enq_hdr._dsize ? _enq_hdr._dsize : rem;
+                std::memcpy((char*)wptr + wr_cnt, _data, wsize);
+                wr_cnt += wsize;
+                rem -= wsize;
+            }
+            if (rem)
+            {
+                wsize = rem >= sizeof(_enq_tail) ? sizeof(_enq_tail) : rem;
+                std::memcpy((char*)wptr + wr_cnt, (void*)&_enq_tail, wsize);
+                wr_cnt += wsize;
+                rem -= wsize;
+            }
+            assert(rem == 0);
+        }
+        else // No split required
+        {
+            if (_enq_hdr._xidsize)
+            {
+                std::memcpy((char*)wptr + wr_cnt, _xidp, _enq_hdr._xidsize);
+                wr_cnt += _enq_hdr._xidsize;
+            }
+            if (!::is_enq_external(&_enq_hdr))
+            {
+                std::memcpy((char*)wptr + wr_cnt, _data, _enq_hdr._dsize);
+                wr_cnt += _enq_hdr._dsize;
+            }
+            std::memcpy((char*)wptr + wr_cnt, (void*)&_enq_tail, sizeof(_enq_tail));
+            wr_cnt += sizeof(_enq_tail);
+#ifdef QLS_CLEAN
+            std::size_t dblk_rec_size = size_dblks(rec_size()) * QLS_DBLK_SIZE_BYTES;
+            std::memset((char*)wptr + wr_cnt, QLS_CLEAN_CHAR, dblk_rec_size - wr_cnt);
+#endif
+        }
+    }
+    return size_dblks(wr_cnt);
+}
+
+uint32_t
+enq_rec::decode(rec_hdr_t& h, void* rptr, uint32_t rec_offs_dblks, uint32_t max_size_dblks)
+{
+    assert(rptr != 0);
+    assert(max_size_dblks > 0);
+
+    std::size_t rd_cnt = 0;
+    if (rec_offs_dblks) // Continuation of record on new page
+    {
+        const uint32_t hdr_xid_data_size = sizeof(enq_hdr_t) + _enq_hdr._xidsize +
+                (::is_enq_external(&_enq_hdr) ? 0 : _enq_hdr._dsize);
+        const uint32_t hdr_xid_data_tail_size = hdr_xid_data_size + sizeof(rec_tail_t);
+        const uint32_t hdr_data_dblks = size_dblks(hdr_xid_data_size);
+        const uint32_t hdr_tail_dblks = size_dblks(hdr_xid_data_tail_size);
+        const std::size_t rec_offs = rec_offs_dblks * QLS_DBLK_SIZE_BYTES;
+        const std::size_t offs = rec_offs - sizeof(enq_hdr_t);
+
+        if (hdr_tail_dblks - rec_offs_dblks <= max_size_dblks)
+        {
+            // Remainder of record fits within this page
+            if (offs < _enq_hdr._xidsize)
+            {
+                // some XID still outstanding, copy remainder of XID, data and tail
+                const std::size_t rem = _enq_hdr._xidsize + _enq_hdr._dsize - offs;
+                std::memcpy((char*)_buff + offs, rptr, rem);
+                rd_cnt += rem;
+                std::memcpy((void*)&_enq_tail, ((char*)rptr + rd_cnt), sizeof(_enq_tail));
+                chk_tail();
+                rd_cnt += sizeof(_enq_tail);
+            }
+            else if (offs < _enq_hdr._xidsize + _enq_hdr._dsize && !::is_enq_external(&_enq_hdr))
+            {
+                // some data still outstanding, copy remainder of data and tail
+                const std::size_t data_offs = offs - _enq_hdr._xidsize;
+                const std::size_t data_rem = _enq_hdr._dsize - data_offs;
+                std::memcpy((char*)_buff + offs, rptr, data_rem);
+                rd_cnt += data_rem;
+                std::memcpy((void*)&_enq_tail, ((char*)rptr + rd_cnt), sizeof(_enq_tail));
+                chk_tail();
+                rd_cnt += sizeof(_enq_tail);
+            }
+            else
+            {
+                // Tail or part of tail only outstanding, complete tail
+                const std::size_t tail_offs = rec_offs - sizeof(enq_hdr_t) - _enq_hdr._xidsize -
+                        _enq_hdr._dsize;
+                const std::size_t tail_rem = sizeof(rec_tail_t) - tail_offs;
+                std::memcpy((char*)&_enq_tail + tail_offs, rptr, tail_rem);
+                chk_tail();
+                rd_cnt = tail_rem;
+            }
+        }
+        else if (hdr_data_dblks - rec_offs_dblks <= max_size_dblks)
+        {
+            // Remainder of xid & data fits within this page; tail split
+
+            /*
+             * TODO: This section needs revision. Since it is known that the end of the page falls within the
+             * tail record, it is only necessary to write from the current offset to the end of the page under
+             * all circumstances. The multiple if/else combinations may be eliminated, as well as one memcpy()
+             * operation.
+             *
+             * Also note that Coverity has detected a possible memory overwrite in this block. It occurs if
+             * both the following two if() stmsts (numbered) are false. With rd_cnt = 0, this would result in
+             * the value of tail_rem > sizeof(tail_rec). Practically, this could only happen if the start and
+             * end of a page both fall within the same tail record, in which case the tail would have to be
+             * (much!) larger. However, the logic here does not account for this possibility.
+             *
+             * If the optimization above is undertaken, this code would probably be removed.
+             */
+            if (offs < _enq_hdr._xidsize) // 1
+            {
+                // some XID still outstanding, copy remainder of XID and data
+                const std::size_t rem = _enq_hdr._xidsize + _enq_hdr._dsize - offs;
+                std::memcpy((char*)_buff + offs, rptr, rem);
+                rd_cnt += rem;
+            }
+            else if (offs < _enq_hdr._xidsize + _enq_hdr._dsize && !::is_enq_external(&_enq_hdr)) // 2
+            {
+                // some data still outstanding, copy remainder of data
+                const std::size_t data_offs = offs - _enq_hdr._xidsize;
+                const std::size_t data_rem = _enq_hdr._dsize - data_offs;
+                std::memcpy((char*)_buff + offs, rptr, data_rem);
+                rd_cnt += data_rem;
+            }
+            const std::size_t tail_rem = (max_size_dblks * QLS_DBLK_SIZE_BYTES) - rd_cnt;
+            if (tail_rem)
+            {
+                std::memcpy((void*)&_enq_tail, ((char*)rptr + rd_cnt), tail_rem);
+                rd_cnt += tail_rem;
+            }
+        }
+        else
+        {
+            // Since xid and data are contiguous, both fit within current page - copy whole page
+            const std::size_t data_cp_size = (max_size_dblks * QLS_DBLK_SIZE_BYTES);
+            std::memcpy((char*)_buff + offs, rptr, data_cp_size);
+            rd_cnt += data_cp_size;
+        }
+    }
+    else // Start of record
+    {
+        // Get and check header
+        //_enq_hdr.hdr_copy(h);
+        ::rec_hdr_copy(&_enq_hdr._rhdr, &h);
+        rd_cnt = sizeof(rec_hdr_t);
+        _enq_hdr._xidsize = *(std::size_t*)((char*)rptr + rd_cnt);
+        rd_cnt += sizeof(std::size_t);
+#if defined(JRNL_32_BIT)
+        rd_cnt += sizeof(uint32_t); // Filler 0
+#endif
+        _enq_hdr._dsize = *(std::size_t*)((char*)rptr + rd_cnt);
+        rd_cnt = sizeof(enq_hdr_t);
+        chk_hdr();
+        if (_enq_hdr._xidsize + (::is_enq_external(&_enq_hdr) ? 0 : _enq_hdr._dsize))
+        {
+            _buff = std::malloc(_enq_hdr._xidsize + (::is_enq_external(&_enq_hdr) ? 0 : _enq_hdr._dsize));
+            MALLOC_CHK(_buff, "_buff", "enq_rec", "decode");
+
+            const uint32_t hdr_xid_size = sizeof(enq_hdr_t) + _enq_hdr._xidsize;
+            const uint32_t hdr_xid_data_size = hdr_xid_size + (::is_enq_external(&_enq_hdr) ? 0 : _enq_hdr._dsize);
+            const uint32_t hdr_xid_data_tail_size = hdr_xid_data_size + sizeof(rec_tail_t);
+            const uint32_t hdr_xid_dblks  = size_dblks(hdr_xid_size);
+            const uint32_t hdr_data_dblks = size_dblks(hdr_xid_data_size);
+            const uint32_t hdr_tail_dblks = size_dblks(hdr_xid_data_tail_size);
+            // Check if record (header + data + tail) fits within this page, we can check the
+            // tail before the expense of copying data to memory
+            if (hdr_tail_dblks <= max_size_dblks)
+            {
+                // Header, xid, data and tail fits within this page
+                if (_enq_hdr._xidsize)
+                {
+                    std::memcpy(_buff, (char*)rptr + rd_cnt, _enq_hdr._xidsize);
+                    rd_cnt += _enq_hdr._xidsize;
+                }
+                if (_enq_hdr._dsize && !::is_enq_external(&_enq_hdr))
+                {
+                    std::memcpy((char*)_buff + _enq_hdr._xidsize, (char*)rptr + rd_cnt,
+                            _enq_hdr._dsize);
+                    rd_cnt += _enq_hdr._dsize;
+                }
+                std::memcpy((void*)&_enq_tail, (char*)rptr + rd_cnt, sizeof(_enq_tail));
+                chk_tail();
+                rd_cnt += sizeof(_enq_tail);
+            }
+            else if (hdr_data_dblks <= max_size_dblks)
+            {
+                // Header, xid and data fit within this page, tail split or separated
+                if (_enq_hdr._xidsize)
+                {
+                    std::memcpy(_buff, (char*)rptr + rd_cnt, _enq_hdr._xidsize);
+                    rd_cnt += _enq_hdr._xidsize;
+                }
+                if (_enq_hdr._dsize && !::is_enq_external(&_enq_hdr))
+                {
+                    std::memcpy((char*)_buff + _enq_hdr._xidsize, (char*)rptr + rd_cnt,
+                            _enq_hdr._dsize);
+                    rd_cnt += _enq_hdr._dsize;
+                }
+                const std::size_t tail_rem = (max_size_dblks * QLS_DBLK_SIZE_BYTES) - rd_cnt;
+                if (tail_rem)
+                {
+                    std::memcpy((void*)&_enq_tail, (char*)rptr + rd_cnt, tail_rem);
+                    rd_cnt += tail_rem;
+                }
+            }
+            else if (hdr_xid_dblks <= max_size_dblks)
+            {
+                // Header and xid fits within this page, data split or separated
+                if (_enq_hdr._xidsize)
+                {
+                    std::memcpy(_buff, (char*)rptr + rd_cnt, _enq_hdr._xidsize);
+                    rd_cnt += _enq_hdr._xidsize;
+                }
+                if (_enq_hdr._dsize && !::is_enq_external(&_enq_hdr))
+                {
+                    const std::size_t data_cp_size = (max_size_dblks * QLS_DBLK_SIZE_BYTES) - rd_cnt;
+                    std::memcpy((char*)_buff + _enq_hdr._xidsize, (char*)rptr + rd_cnt, data_cp_size);
+                    rd_cnt += data_cp_size;
+                }
+            }
+            else
+            {
+                // Header fits within this page, xid split or separated
+                const std::size_t data_cp_size = (max_size_dblks * QLS_DBLK_SIZE_BYTES) - rd_cnt;
+                std::memcpy(_buff, (char*)rptr + rd_cnt, data_cp_size);
+                rd_cnt += data_cp_size;
+            }
+        }
+    }
+    return size_dblks(rd_cnt);
+}
+
+bool
+enq_rec::rcv_decode(rec_hdr_t h, std::ifstream* ifsp, std::size_t& rec_offs)
+{
+    if (rec_offs == 0)
+    {
+        // Read header, allocate (if req'd) for xid
+        //_enq_hdr.hdr_copy(h);
+        ::rec_hdr_copy(&_enq_hdr._rhdr, &h);
+        ifsp->read((char*)&_enq_hdr._xidsize, sizeof(std::size_t));
+#if defined(JRNL_32_BIT)
+        ifsp->ignore(sizeof(uint32_t)); // _filler0
+#endif
+        ifsp->read((char*)&_enq_hdr._dsize, sizeof(std::size_t));
+#if defined(JRNL_32_BIT)
+        ifsp->ignore(sizeof(uint32_t)); // _filler1
+#endif
+        rec_offs = sizeof(_enq_hdr);
+        if (_enq_hdr._xidsize)
+        {
+            _buff = std::malloc(_enq_hdr._xidsize);
+            MALLOC_CHK(_buff, "_buff", "enq_rec", "rcv_decode");
+        }
+    }
+    if (rec_offs < sizeof(_enq_hdr) + _enq_hdr._xidsize)
+    {
+        // Read xid (or continue reading xid)
+        std::size_t offs = rec_offs - sizeof(_enq_hdr);
+        ifsp->read((char*)_buff + offs, _enq_hdr._xidsize - offs);
+        std::size_t size_read = ifsp->gcount();
+        rec_offs += size_read;
+        if (size_read < _enq_hdr._xidsize - offs)
+        {
+            assert(ifsp->eof());
+            // As we may have read past eof, turn off fail bit
+            ifsp->clear(ifsp->rdstate()&(~std::ifstream::failbit));
+            assert(!ifsp->fail() && !ifsp->bad());
+            return false;
+        }
+    }
+    if (!::is_enq_external(&_enq_hdr))
+    {
+        if (rec_offs < sizeof(_enq_hdr) + _enq_hdr._xidsize +  _enq_hdr._dsize)
+        {
+            // Ignore data (or continue ignoring data)
+            std::size_t offs = rec_offs - sizeof(_enq_hdr) - _enq_hdr._xidsize;
+            ifsp->ignore(_enq_hdr._dsize - offs);
+            std::size_t size_read = ifsp->gcount();
+            rec_offs += size_read;
+            if (size_read < _enq_hdr._dsize - offs)
+            {
+                assert(ifsp->eof());
+                // As we may have read past eof, turn off fail bit
+                ifsp->clear(ifsp->rdstate()&(~std::ifstream::failbit));
+                assert(!ifsp->fail() && !ifsp->bad());
+                return false;
+            }
+        }
+    }
+    if (rec_offs < sizeof(_enq_hdr) + _enq_hdr._xidsize +
+            (::is_enq_external(&_enq_hdr) ? 0 : _enq_hdr._dsize) + sizeof(rec_tail_t))
+    {
+        // Read tail (or continue reading tail)
+        std::size_t offs = rec_offs - sizeof(_enq_hdr) - _enq_hdr._xidsize;
+        if (!::is_enq_external(&_enq_hdr))
+            offs -= _enq_hdr._dsize;
+        ifsp->read((char*)&_enq_tail + offs, sizeof(rec_tail_t) - offs);
+        std::size_t size_read = ifsp->gcount();
+        rec_offs += size_read;
+        if (size_read < sizeof(rec_tail_t) - offs)
+        {
+            assert(ifsp->eof());
+            // As we may have read past eof, turn off fail bit
+            ifsp->clear(ifsp->rdstate()&(~std::ifstream::failbit));
+            assert(!ifsp->fail() && !ifsp->bad());
+            return false;
+        }
+    }
+    ifsp->ignore(rec_size_dblks() * QLS_DBLK_SIZE_BYTES - rec_size());
+    chk_tail(); // Throws if tail invalid or record incomplete
+    assert(!ifsp->fail() && !ifsp->bad());
+    return true;
+}
+
+std::size_t
+enq_rec::get_xid(void** const xidpp)
+{
+    if (!_buff || !_enq_hdr._xidsize)
+    {
+        *xidpp = 0;
+        return 0;
+    }
+    *xidpp = _buff;
+    return _enq_hdr._xidsize;
+}
+
+std::size_t
+enq_rec::get_data(void** const datapp)
+{
+    if (!_buff)
+    {
+        *datapp = 0;
+        return 0;
+    }
+    if (::is_enq_external(&_enq_hdr))
+        *datapp = 0;
+    else
+        *datapp = (void*)((char*)_buff + _enq_hdr._xidsize);
+    return _enq_hdr._dsize;
+}
+
+std::string&
+enq_rec::str(std::string& str) const
+{
+    std::ostringstream oss;
+    oss << "enq_rec: m=" << _enq_hdr._rhdr._magic;
+    oss << " v=" << (int)_enq_hdr._rhdr._version;
+    oss << " rid=" << _enq_hdr._rhdr._rid;
+    if (_xidp)
+        oss << " xid=\"" << _xidp << "\"";
+    oss << " len=" << _enq_hdr._dsize;
+    str.append(oss.str());
+    return str;
+}
+
+std::size_t
+enq_rec::rec_size() const
+{
+    return rec_size(_enq_hdr._xidsize, _enq_hdr._dsize, ::is_enq_external(&_enq_hdr));
+}
+
+std::size_t
+enq_rec::rec_size(const std::size_t xidsize, const std::size_t dsize, const bool external)
+{
+    if (external)
+        return sizeof(enq_hdr_t) + xidsize + sizeof(rec_tail_t);
+    return sizeof(enq_hdr_t) + xidsize + dsize + sizeof(rec_tail_t);
+}
+
+void
+enq_rec::set_rid(const uint64_t rid)
+{
+    _enq_hdr._rhdr._rid = rid;
+    _enq_tail._rid = rid;
+}
+
+void
+enq_rec::chk_hdr() const
+{
+    jrec::chk_hdr(_enq_hdr._rhdr);
+    if (_enq_hdr._rhdr._magic != QLS_ENQ_MAGIC)
+    {
+        std::ostringstream oss;
+        oss << std::hex << std::setfill('0');
+        oss << "enq magic: rid=0x" << std::setw(16) << _enq_hdr._rhdr._rid;
+        oss << ": expected=0x" << std::setw(8) << QLS_ENQ_MAGIC;
+        oss << " read=0x" << std::setw(2) << (int)_enq_hdr._rhdr._magic;
+        throw jexception(jerrno::JERR_JREC_BADRECHDR, oss.str(), "enq_rec", "chk_hdr");
+    }
+}
+
+void
+enq_rec::chk_hdr(uint64_t rid) const
+{
+    chk_hdr();
+    jrec::chk_rid(_enq_hdr._rhdr, rid);
+}
+
+void
+enq_rec::chk_tail() const
+{
+    jrec::chk_tail(_enq_tail, _enq_hdr._rhdr);
+}
+
+void
+enq_rec::clean()
+{
+    // clean up allocated memory here
+}
+
+}}

Added: qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/enq_rec.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/enq_rec.h?rev=1534736&view=auto
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/enq_rec.h (added)
+++ qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/enq_rec.h Tue Oct 22 19:09:56 2013
@@ -0,0 +1,104 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#ifndef QPID_LEGACYSTORE_JRNL_ENQ_REC_H
+#define QPID_LEGACYSTORE_JRNL_ENQ_REC_H
+
+namespace qpid
+{
+namespace qls_jrnl
+{
+class enq_rec;
+}}
+
+#include <cstddef>
+#include "qpid/linearstore/jrnl/utils/enq_hdr.h"
+#include "qpid/linearstore/jrnl/jrec.h"
+
+namespace qpid
+{
+namespace qls_jrnl
+{
+
+    /**
+    * \class enq_rec
+    * \brief Class to handle a single journal enqueue record.
+    */
+    class enq_rec : public jrec
+    {
+    private:
+        enq_hdr_t _enq_hdr;
+        const void* _xidp;          ///< xid pointer for encoding (for writing to disk)
+        const void* _data;          ///< Pointer to data to be written to disk
+        void* _buff;                ///< Pointer to buffer to receive data read from disk
+        rec_tail_t _enq_tail;
+
+    public:
+        /**
+        * \brief Constructor used for read operations.
+        */
+        enq_rec();
+
+        /**
+        * \brief Constructor used for write operations, where mbuf contains data to be written.
+        */
+        enq_rec(const uint64_t rid, const void* const dbuf, const std::size_t dlen,
+                const void* const xidp, const std::size_t xidlen, const bool transient);
+
+        /**
+        * \brief Destructor
+        */
+        virtual ~enq_rec();
+
+        // Prepare instance for use in reading data from journal, xid and data will be allocated
+        void reset();
+        // Prepare instance for use in writing data to journal
+        void reset(const uint64_t rid, const void* const dbuf, const std::size_t dlen,
+                const void* const xidp, const std::size_t xidlen, const bool transient,
+                const bool external);
+
+        uint32_t encode(void* wptr, uint32_t rec_offs_dblks, uint32_t max_size_dblks);
+        uint32_t decode(rec_hdr_t& h, void* rptr, uint32_t rec_offs_dblks, uint32_t max_size_dblks);
+        // Decode used for recover
+        bool rcv_decode(rec_hdr_t h, std::ifstream* ifsp, std::size_t& rec_offs);
+
+        std::size_t get_xid(void** const xidpp);
+        std::size_t get_data(void** const datapp);
+        inline bool is_transient() const { return ::is_enq_transient(&_enq_hdr); }
+        inline bool is_external() const { return ::is_enq_external(&_enq_hdr); }
+        std::string& str(std::string& str) const;
+        inline std::size_t data_size() const { return _enq_hdr._dsize; }
+        inline std::size_t xid_size() const { return _enq_hdr._xidsize; }
+        std::size_t rec_size() const;
+        static std::size_t rec_size(const std::size_t xidsize, const std::size_t dsize, const bool external);
+        inline uint64_t rid() const { return _enq_hdr._rhdr._rid; }
+        void set_rid(const uint64_t rid);
+
+    private:
+        void chk_hdr() const;
+        void chk_hdr(uint64_t rid) const;
+        void chk_tail() const;
+        virtual void clean();
+    }; // class enq_rec
+
+}}
+
+#endif // ifndef QPID_LEGACYSTORE_JRNL_ENQ_REC_H

Added: qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/enums.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/enums.h?rev=1534736&view=auto
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/enums.h (added)
+++ qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/enums.h Tue Oct 22 19:09:56 2013
@@ -0,0 +1,99 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#ifndef QPID_LINEARSTORE_JRNL_ENUMS_H
+#define QPID_LINEARSTORE_JRNL_ENUMS_H
+
+namespace qpid
+{
+namespace qls_jrnl
+{
+
+    // TODO: Change this to flags, as multiple of these conditions may exist simultaneously
+    /**
+    * \brief Enumeration of possible return states from journal read and write operations.
+    */
+    enum _iores
+    {
+        RHM_IORES_SUCCESS = 0,  ///< Success: IO operation completed noramlly.
+        RHM_IORES_PAGE_AIOWAIT, ///< IO operation suspended - next page is waiting for AIO.
+        RHM_IORES_FILE_AIOWAIT, ///< IO operation suspended - next file is waiting for AIO.
+        RHM_IORES_EMPTY,        ///< During read operations, nothing further is available to read.
+//        RHM_IORES_RCINVALID,    ///< Read page cache is invalid (ie obsolete or uninitialized)
+//        RHM_IORES_ENQCAPTHRESH, ///< Enqueue capacity threshold (limit) reached.
+//        RHM_IORES_FULL,         ///< During write operations, the journal files are full.
+//        RHM_IORES_BUSY,         ///< Another blocking operation is in progress.
+        RHM_IORES_TXPENDING     ///< Operation blocked by pending transaction.
+//        RHM_IORES_NOTIMPL       ///< Function is not implemented.
+    };
+    typedef _iores iores;
+
+    static inline const char* iores_str(iores res)
+    {
+        switch (res)
+        {
+            case RHM_IORES_SUCCESS: return "RHM_IORES_SUCCESS";
+            case RHM_IORES_PAGE_AIOWAIT: return "RHM_IORES_PAGE_AIOWAIT";
+            case RHM_IORES_FILE_AIOWAIT: return "RHM_IORES_FILE_AIOWAIT";
+            case RHM_IORES_EMPTY: return "RHM_IORES_EMPTY";
+//            case RHM_IORES_RCINVALID: return "RHM_IORES_RCINVALID";
+//            case RHM_IORES_ENQCAPTHRESH: return "RHM_IORES_ENQCAPTHRESH";
+//            case RHM_IORES_FULL: return "RHM_IORES_FULL";
+//            case RHM_IORES_BUSY: return "RHM_IORES_BUSY";
+            case RHM_IORES_TXPENDING: return "RHM_IORES_TXPENDING";
+//            case RHM_IORES_NOTIMPL: return "RHM_IORES_NOTIMPL";
+        }
+        return "<iores unknown>";
+    }
+
+/*
+    enum _log_level
+    {
+        LOG_TRACE = 0,
+        LOG_DEBUG,
+        LOG_INFO,
+        LOG_NOTICE,
+        LOG_WARN,
+        LOG_ERROR,
+        LOG_CRITICAL
+    };
+    typedef _log_level log_level_t;
+
+    static inline const char* log_level_str(log_level_t ll)
+    {
+        switch (ll)
+        {
+            case LOG_TRACE: return "TRACE";
+            case LOG_DEBUG: return "DEBUG";
+            case LOG_INFO: return "INFO";
+            case LOG_NOTICE: return "NOTICE";
+            case LOG_WARN: return "WARN";
+            case LOG_ERROR: return "ERROR";
+            case LOG_CRITICAL: return "CRITICAL";
+        }
+        return "<log level unknown>";
+    }
+*/
+
+
+}}
+
+#endif // ifndef QPID_LINEARSTORE_JRNL_ENUMS_H

Added: qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/jcfg.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/jcfg.h?rev=1534736&view=auto
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/jcfg.h (added)
+++ qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/jcfg.h Tue Oct 22 19:09:56 2013
@@ -0,0 +1,58 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#ifndef QPID_LEGACYSTORE_JRNL_JCFG_H
+#define QPID_LEGACYSTORE_JRNL_JCFG_H
+
+#define QLS_SBLK_SIZE_BYTES             4096        /**< Disk softblock size in bytes, should match size used on disk media */
+#define QLS_AIO_ALIGN_BOUNDARY_BYTES    QLS_SBLK_SIZE_BYTES /** Memory alignment boundary used for DMA */
+/**
+* <b>Rule:</b> Data block size (QLS_DBLK_SIZE_BYTES) MUST be a power of 2 AND
+* a power of 2 factor of the disk softblock size (QLS_SBLK_SIZE_BYTES):
+* <pre>
+* n * QLS_DBLK_SIZE_BYTES == QLS_SBLK_SIZE_BYTES (n = 1,2,4,8...)
+* </pre>
+*/
+#define QLS_DBLK_SIZE_BYTES             128         /**< Data block size in bytes (CANNOT BE LESS THAN 32!) */
+#define QLS_SBLK_SIZE_DBLKS             (QLS_SBLK_SIZE_BYTES / QLS_DBLK_SIZE_BYTES) /**< Disk softblock size in multiples of QLS_DBLK_SIZE_BYTES */
+#define QLS_SBLK_SIZE_KIB               (QLS_SBLK_SIZE_BYTES / 1024) /**< Disk softblock size in KiB */
+
+#define QLS_WMGR_DEF_PAGE_SIZE_KIB      32          /**< Journal write page size in KiB (default) */
+#define QLS_WMGR_DEF_PAGE_SIZE_SBLKS    (QLS_WMGR_DEF_PAGE_SIZE_KIB / QLS_SBLK_SIZE_KIB) /**< Journal write page size in softblocks (default) */
+#define QLS_WMGR_DEF_PAGES              32          /**< Number of pages to use in wmgr (default) */
+
+#define QLS_WMGR_MAXDTOKPP              1024        /**< Max. dtoks (data blocks) per page in wmgr */
+#define QLS_WMGR_MAXWAITUS              100         /**< Max. wait time (us) before submitting AIO */
+
+#define QLS_JRNL_FILE_EXTENSION         ".jrnl"     /**< Extension for journal data files */
+#define QLS_TXA_MAGIC                   0x61534c51  /**< ("RHMa" in little endian) Magic for dtx abort hdrs */
+#define QLS_TXC_MAGIC                   0x63534c51  /**< ("RHMc" in little endian) Magic for dtx commit hdrs */
+#define QLS_DEQ_MAGIC                   0x64534c51  /**< ("QLSd" in little endian) Magic for deq rec hdrs */
+#define QLS_ENQ_MAGIC                   0x65534c51  /**< ("QLSe" in little endian) Magic for enq rec hdrs */
+#define QLS_FILE_MAGIC                  0x66534c51  /**< ("QLSf" in little endian) Magic for file hdrs */
+#define QLS_EMPTY_MAGIC                 0x78534c51  /**< ("QLSx" in little endian) Magic for empty dblk */
+#define QLS_JRNL_VERSION                2           /**< Version (of file layout) */
+#define QLS_JRNL_FHDR_RES_SIZE_SBLKS    1           /**< Journal file header reserved size in sblks (as defined by QLS_SBLK_SIZE_BYTES) */
+
+#define QLS_CLEAN                                   /**< If defined, writes QLS_CLEAN_CHAR to all filled areas on disk */
+#define QLS_CLEAN_CHAR                  0xff        /**< Char used to clear empty space on disk */
+
+#endif /* ifndef QPID_LEGACYSTORE_JRNL_JCFG_H */

Added: qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/jcntl.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/jcntl.cpp?rev=1534736&view=auto
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/jcntl.cpp (added)
+++ qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/jcntl.cpp Tue Oct 22 19:09:56 2013
@@ -0,0 +1,549 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "qpid/linearstore/jrnl/jcntl.h"
+
+#include <algorithm>
+#include <cassert>
+#include <cerrno>
+#include <cstdlib>
+#include <cstring>
+#include <fstream>
+#include <iomanip>
+#include <iostream>
+#include <qpid/linearstore/jrnl/EmptyFilePool.h>
+#include <qpid/linearstore/jrnl/EmptyFilePoolManager.h>
+#include "qpid/linearstore/jrnl/jerrno.h"
+#include "qpid/linearstore/jrnl/JournalLog.h"
+#include "qpid/linearstore/jrnl/utils/enq_hdr.h"
+#include <limits>
+#include <sstream>
+#include <unistd.h>
+
+namespace qpid
+{
+namespace qls_jrnl
+{
+
+#define AIO_CMPL_TIMEOUT_SEC   5
+#define AIO_CMPL_TIMEOUT_NSEC  0
+#define FINAL_AIO_CMPL_TIMEOUT_SEC   15
+#define FINAL_AIO_CMPL_TIMEOUT_NSEC  0
+
+// Static
+timespec jcntl::_aio_cmpl_timeout; ///< Timeout for blocking libaio returns
+timespec jcntl::_final_aio_cmpl_timeout; ///< Timeout for blocking libaio returns when stopping or finalizing
+bool jcntl::_init = init_statics();
+bool jcntl::init_statics()
+{
+    _aio_cmpl_timeout.tv_sec = AIO_CMPL_TIMEOUT_SEC;
+    _aio_cmpl_timeout.tv_nsec = AIO_CMPL_TIMEOUT_NSEC;
+    _final_aio_cmpl_timeout.tv_sec = FINAL_AIO_CMPL_TIMEOUT_SEC;
+    _final_aio_cmpl_timeout.tv_nsec = FINAL_AIO_CMPL_TIMEOUT_NSEC;
+    return true;
+}
+
+
+// Functions
+
+jcntl::jcntl(const std::string& jid,
+             const std::string& jdir,
+             JournalLog& jrnl_log):
+    _jid(jid),
+    _jdir(jdir),
+    _init_flag(false),
+    _stop_flag(false),
+    _readonly_flag(false),
+    _jrnl_log(jrnl_log),
+    _linearFileController(*this),
+    _emptyFilePoolPtr(0),
+    _emap(),
+    _tmap(),
+    _wmgr(this, _emap, _tmap, _linearFileController),
+    _recoveryManager(_jdir.dirname(), _jid, _emap, _tmap, jrnl_log)
+{}
+
+jcntl::~jcntl()
+{
+    if (_init_flag && !_stop_flag)
+        try { stop(true); }
+        catch (const jexception& e) { std::cerr << e << std::endl; }
+    _linearFileController.finalize();
+}
+
+void
+jcntl::initialize(EmptyFilePool* efpp,
+                  const uint16_t wcache_num_pages,
+                  const uint32_t wcache_pgsize_sblks,
+                  aio_callback* const cbp)
+{
+    _init_flag = false;
+    _stop_flag = false;
+    _readonly_flag = false;
+
+    _emap.clear();
+    _tmap.clear();
+
+    _linearFileController.finalize();
+
+//    _lpmgr.finalize();
+
+    // Set new file geometry parameters
+//    assert(num_jfiles >= JRNL_MIN_NUM_FILES);
+//    assert(num_jfiles <= JRNL_MAX_NUM_FILES);
+//    _emap.set_num_jfiles(num_jfiles);
+//    _tmap.set_num_jfiles(num_jfiles);
+
+//    assert(jfsize_sblks >= JRNL_MIN_FILE_SIZE);
+//    assert(jfsize_sblks <= JRNL_MAX_FILE_SIZE);
+//    _jfsize_sblks = jfsize_sblks;
+
+    // Clear any existing journal files
+    _jdir.clear_dir();
+//    _lpmgr.initialize(num_jfiles, ae, ae_max_jfiles, this, &new_fcntl); // Creates new journal files
+
+    _linearFileController.initialize(_jdir.dirname(), efpp, 0ULL);
+    _linearFileController.pullEmptyFileFromEfp();
+//    std::cout << _linearFileController.status(2);
+//    _wrfc.initialize(_jfsize_sblks);
+//    _rrfc.initialize();
+//    _rrfc.set_findex(0);
+//    _rmgr.initialize(cbp);
+    _wmgr.initialize(cbp, wcache_pgsize_sblks, wcache_num_pages, QLS_WMGR_MAXDTOKPP, QLS_WMGR_MAXWAITUS);
+
+    // Write info file (<basename>.jinf) to disk
+//    write_infofile();
+
+    _init_flag = true;
+}
+
+void
+jcntl::recover(EmptyFilePoolManager* efpmp,
+               const uint16_t wcache_num_pages,
+               const uint32_t wcache_pgsize_sblks,
+               aio_callback* const cbp,
+               const std::vector<std::string>* prep_txn_list_ptr,
+               uint64_t& highest_rid)
+{
+    _init_flag = false;
+    _stop_flag = false;
+    _readonly_flag = false;
+
+    _emap.clear();
+    _tmap.clear();
+
+    _linearFileController.finalize();
+
+//    _lpmgr.finalize();
+
+//    assert(num_jfiles >= JRNL_MIN_NUM_FILES);
+//    assert(num_jfiles <= JRNL_MAX_NUM_FILES);
+//    assert(jfsize_sblks >= JRNL_MIN_FILE_SIZE);
+//    assert(jfsize_sblks <= JRNL_MAX_FILE_SIZE);
+//    _jfsize_sblks = jfsize_sblks;
+
+    // Verify journal dir and journal files
+    _jdir.verify_dir();
+//    _rcvdat.reset(num_jfiles/*, ae, ae_max_jfiles*/);
+
+//    rcvr_janalyze(prep_txn_list_ptr, efpm);
+    efpIdentity_t efpIdentity;
+    _recoveryManager.analyzeJournals(prep_txn_list_ptr, efpmp, &_emptyFilePoolPtr);
+
+    highest_rid = _recoveryManager.getHighestRecordId();
+//    if (_rcvdat._jfull)
+//        throw jexception(jerrno::JERR_JCNTL_RECOVERJFULL, "jcntl", "recover");
+    _jrnl_log.log(/*LOG_DEBUG*/JournalLog::LOG_INFO, _jid, _recoveryManager.toString(_jid, true));
+
+//    _lpmgr.recover(_rcvdat, this, &new_fcntl);
+
+    _linearFileController.initialize(_jdir.dirname(), _emptyFilePoolPtr, _recoveryManager.getHighestFileNumber());
+//    _linearFileController.setFileNumberCounter(_recoveryManager.getHighestFileNumber());
+    _recoveryManager.setLinearFileControllerJournals(&qpid::qls_jrnl::LinearFileController::addJournalFile, &_linearFileController);
+//    _wrfc.initialize(_jfsize_sblks, &_rcvdat);
+//    _rrfc.initialize();
+//    _rrfc.set_findex(_rcvdat.ffid());
+//    _rmgr.initialize(cbp);
+    _wmgr.initialize(cbp, wcache_pgsize_sblks, wcache_num_pages, QLS_WMGR_MAXDTOKPP, QLS_WMGR_MAXWAITUS,
+            (_recoveryManager.isLastFileFull() ? 0 : _recoveryManager.getEndOffset()));
+
+    _readonly_flag = true;
+    _init_flag = true;
+}
+
+void
+jcntl::recover_complete()
+{
+    if (!_readonly_flag)
+        throw jexception(jerrno::JERR_JCNTL_NOTRECOVERED, "jcntl", "recover_complete");
+//    for (uint16_t i=0; i<_lpmgr.num_jfiles(); i++)
+//        _lpmgr.get_fcntlp(i)->reset(&_rcvdat);
+//    _wrfc.initialize(_jfsize_sblks, &_rcvdat);
+//    _rrfc.initialize();
+//    _rrfc.set_findex(_rcvdat.ffid());
+//    _rmgr.recover_complete();
+    _readonly_flag = false;
+}
+
+void
+jcntl::delete_jrnl_files()
+{
+    stop(true); // wait for AIO to complete
+    _linearFileController.purgeFilesToEfp();
+    _jdir.delete_dir();
+}
+
+
+iores
+jcntl::enqueue_data_record(const void* const data_buff,
+                           const std::size_t tot_data_len,
+                           const std::size_t this_data_len,
+                           data_tok* dtokp,
+                           const bool transient)
+{
+    iores r;
+    check_wstatus("enqueue_data_record");
+    {
+        slock s(_wr_mutex);
+        while (handle_aio_wait(_wmgr.enqueue(data_buff, tot_data_len, this_data_len, dtokp, 0, 0, transient, false), r,
+                        dtokp)) ;
+    }
+    return r;
+}
+
+iores
+jcntl::enqueue_extern_data_record(const std::size_t tot_data_len,
+                                  data_tok* dtokp,
+                                  const bool transient)
+{
+    iores r;
+    check_wstatus("enqueue_extern_data_record");
+    {
+        slock s(_wr_mutex);
+        while (handle_aio_wait(_wmgr.enqueue(0, tot_data_len, 0, dtokp, 0, 0, transient, true), r, dtokp)) ;
+    }
+    return r;
+}
+
+iores
+jcntl::enqueue_txn_data_record(const void* const data_buff,
+                               const std::size_t tot_data_len,
+                               const std::size_t this_data_len,
+                               data_tok* dtokp,
+                               const std::string& xid,
+                               const bool transient)
+{
+    iores r;
+    check_wstatus("enqueue_tx_data_record");
+    {
+        slock s(_wr_mutex);
+        while (handle_aio_wait(_wmgr.enqueue(data_buff, tot_data_len, this_data_len, dtokp, xid.data(), xid.size(),
+                        transient, false), r, dtokp)) ;
+    }
+    return r;
+}
+
+iores
+jcntl::enqueue_extern_txn_data_record(const std::size_t tot_data_len,
+                                      data_tok* dtokp,
+                                      const std::string& xid,
+                                      const bool transient)
+{
+    iores r;
+    check_wstatus("enqueue_extern_txn_data_record");
+    {
+        slock s(_wr_mutex);
+        while (handle_aio_wait(_wmgr.enqueue(0, tot_data_len, 0, dtokp, xid.data(), xid.size(), transient, true), r,
+                        dtokp)) ;
+    }
+    return r;
+}
+
+iores
+jcntl::read_data_record(void** const datapp,
+                        std::size_t& dsize,
+                        void** const xidpp,
+                        std::size_t& xidsize,
+                        bool& transient,
+                        bool& external,
+                        data_tok* const dtokp,
+                        bool ignore_pending_txns)
+{
+    check_rstatus("read_data");
+    if (_recoveryManager.readNextRemainingRecord(datapp, dsize, xidpp, xidsize, transient, external, dtokp, ignore_pending_txns))
+        return RHM_IORES_SUCCESS;
+    return RHM_IORES_EMPTY;
+/*
+    if (!dtokp->is_readable()) {
+        std::ostringstream oss;
+        oss << std::hex << std::setfill('0');
+        oss << "dtok_id=0x" << std::setw(8) << dtokp->id();
+        oss << "; dtok_rid=0x" << std::setw(16) << dtokp->rid();
+        oss << "; dtok_wstate=" << dtokp->wstate_str();
+        throw jexception(jerrno::JERR_JCNTL_ENQSTATE, oss.str(), "jcntl", "read_data_record");
+    }
+    std::vector<uint64_t> ridl;
+    _emap.rid_list(ridl);
+    enq_map::emap_data_struct_t eds;
+    for (std::vector<uint64_t>::const_iterator i=ridl.begin(); i!=ridl.end(); ++i) {
+        short res = _emap.get_data(*i, eds);
+        if (res == enq_map::EMAP_OK) {
+            std::ifstream ifs(_recoveryManager._fm[eds._pfid].c_str(), std::ifstream::in | std::ifstream::binary);
+            if (!ifs.good()) {
+                std::ostringstream oss;
+                oss << "rid=" << (*i) << " pfid=" << eds._pfid << " file=" << _recoveryManager._fm[eds._pfid] << " file_posn=" << eds._file_posn;
+                throw jexception(jerrno::JERR_RCVM_OPENRD, oss.str(), "jcntl", "read_data_record");
+            }
+            ifs.seekg(eds._file_posn, std::ifstream::beg);
+            ::enq_hdr_t eh;
+            ifs.read((char*)&eh, sizeof(::enq_hdr_t));
+            if (!::validate_enq_hdr(&eh, QLS_ENQ_MAGIC, QLS_JRNL_VERSION, *i)) {
+                std::ostringstream oss;
+                oss << "rid=" << (*i) << " pfid=" << eds._pfid << " file=" << _recoveryManager._fm[eds._pfid] << " file_posn=" << eds._file_posn;
+                throw jexception(jerrno::JERR_JCNTL_INVALIDENQHDR, oss.str(), "jcntl", "read_data_record");
+            }
+            dsize = eh._dsize;
+            xidsize = eh._xidsize;
+            transient = ::is_enq_transient(&eh);
+            external = ::is_enq_external(&eh);
+            if (xidsize) {
+                *xidpp = ::malloc(xidsize);
+                ifs.read((char*)(*xidpp), xidsize);
+            } else {
+                *xidpp = 0;
+            }
+            if (dsize) {
+                *datapp = ::malloc(dsize);
+                ifs.read((char*)(*datapp), dsize);
+            } else {
+                *datapp = 0;
+            }
+        }
+    }
+*/
+/*
+    check_rstatus("read_data");
+    iores res = _rmgr.read(datapp, dsize, xidpp, xidsize, transient, external, dtokp, ignore_pending_txns);
+    if (res == RHM_IORES_RCINVALID)
+    {
+        get_wr_events(0); // check for outstanding write events
+        iores sres = _rmgr.synchronize(); // flushes all outstanding read events
+        if (sres != RHM_IORES_SUCCESS)
+            return sres;
+        // TODO: Does linear store need this?
+//        _rmgr.wait_for_validity(&_aio_cmpl_timeout, true); // throw if timeout occurs
+        res = _rmgr.read(datapp, dsize, xidpp, xidsize, transient, external, dtokp, ignore_pending_txns);
+    }
+    return res;
+*/
+    return RHM_IORES_SUCCESS;
+}
+
+iores
+jcntl::dequeue_data_record(data_tok* const dtokp,
+                           const bool txn_coml_commit)
+{
+    iores r;
+    check_wstatus("dequeue_data");
+    {
+        slock s(_wr_mutex);
+        while (handle_aio_wait(_wmgr.dequeue(dtokp, 0, 0, txn_coml_commit), r, dtokp)) ;
+    }
+    return r;
+}
+
+iores
+jcntl::dequeue_txn_data_record(data_tok* const dtokp,
+                               const std::string& xid,
+                               const bool txn_coml_commit)
+{
+    iores r;
+    check_wstatus("dequeue_data");
+    {
+        slock s(_wr_mutex);
+        while (handle_aio_wait(_wmgr.dequeue(dtokp, xid.data(), xid.size(), txn_coml_commit), r, dtokp)) ;
+    }
+    return r;
+}
+
+iores
+jcntl::txn_abort(data_tok* const dtokp,
+                 const std::string& xid)
+{
+    iores r;
+    check_wstatus("txn_abort");
+    {
+        slock s(_wr_mutex);
+        while (handle_aio_wait(_wmgr.abort(dtokp, xid.data(), xid.size()), r, dtokp)) ;
+    }
+    return r;
+}
+
+iores
+jcntl::txn_commit(data_tok* const dtokp,
+                  const std::string& xid)
+{
+    iores r;
+    check_wstatus("txn_commit");
+    {
+        slock s(_wr_mutex);
+        while (handle_aio_wait(_wmgr.commit(dtokp, xid.data(), xid.size()), r, dtokp)) ;
+    }
+    return r;
+}
+
+bool
+jcntl::is_txn_synced(const std::string& xid)
+{
+    slock s(_wr_mutex);
+    bool res = _wmgr.is_txn_synced(xid);
+    return res;
+}
+
+int32_t
+jcntl::get_wr_events(timespec* const timeout)
+{
+    stlock t(_wr_mutex);
+    if (!t.locked())
+        return jerrno::LOCK_TAKEN;
+    return _wmgr.get_events(timeout, false);
+}
+
+void
+jcntl::stop(const bool block_till_aio_cmpl)
+{
+    if (_readonly_flag)
+        check_rstatus("stop");
+    else
+        check_wstatus("stop");
+    _stop_flag = true;
+    if (!_readonly_flag)
+        flush(block_till_aio_cmpl);
+    _linearFileController.finalize();
+}
+
+LinearFileController&
+jcntl::getLinearFileControllerRef() {
+    return _linearFileController;
+}
+
+iores
+jcntl::flush(const bool block_till_aio_cmpl)
+{
+    if (!_init_flag)
+        return RHM_IORES_SUCCESS;
+    if (_readonly_flag)
+        throw jexception(jerrno::JERR_JCNTL_READONLY, "jcntl", "flush");
+    iores res;
+    {
+        slock s(_wr_mutex);
+        res = _wmgr.flush();
+    }
+    if (block_till_aio_cmpl)
+        aio_cmpl_wait();
+    return res;
+}
+
+// Protected/Private functions
+
+void
+jcntl::check_wstatus(const char* fn_name) const
+{
+    if (!_init_flag)
+        throw jexception(jerrno::JERR__NINIT, "jcntl", fn_name);
+    if (_readonly_flag)
+        throw jexception(jerrno::JERR_JCNTL_READONLY, "jcntl", fn_name);
+    if (_stop_flag)
+        throw jexception(jerrno::JERR_JCNTL_STOPPED, "jcntl", fn_name);
+}
+
+void
+jcntl::check_rstatus(const char* fn_name) const
+{
+    if (!_init_flag)
+        throw jexception(jerrno::JERR__NINIT, "jcntl", fn_name);
+    if (_stop_flag)
+        throw jexception(jerrno::JERR_JCNTL_STOPPED, "jcntl", fn_name);
+}
+
+
+void
+jcntl::aio_cmpl_wait()
+{
+    //while (_wmgr.get_aio_evt_rem())
+    while (true)
+    {
+        uint32_t aer;
+        {
+            slock s(_wr_mutex);
+            aer = _wmgr.get_aio_evt_rem();
+        }
+        if (aer == 0) break; // no events left
+        if (get_wr_events(&_aio_cmpl_timeout) == jerrno::AIO_TIMEOUT)
+            throw jexception(jerrno::JERR_JCNTL_AIOCMPLWAIT, "jcntl", "aio_cmpl_wait");
+    }
+}
+
+
+bool
+jcntl::handle_aio_wait(const iores res, iores& resout, const data_tok* dtp)
+{
+    resout = res;
+    if (res == RHM_IORES_PAGE_AIOWAIT)
+    {
+        while (_wmgr.curr_pg_blocked())
+        {
+            if (_wmgr.get_aio_evt_rem() == 0) {
+std::cout << "&&&&&& jcntl::handle_aio_wait() " << _wmgr.status_str() << std::endl; // DEBUG
+                throw jexception("_wmgr.curr_pg_blocked() with no events remaining"); // TODO - complete exception
+            }
+            if (_wmgr.get_events(&_aio_cmpl_timeout, false) == jerrno::AIO_TIMEOUT)
+            {
+                std::ostringstream oss;
+                oss << "get_events() returned JERR_JCNTL_AIOCMPLWAIT; wmgr_status: " << _wmgr.status_str();
+                _jrnl_log.log(JournalLog::LOG_CRITICAL, _jid, oss.str());
+                throw jexception(jerrno::JERR_JCNTL_AIOCMPLWAIT, "jcntl", "handle_aio_wait");
+            }
+        }
+        return true;
+    }
+    else if (res == RHM_IORES_FILE_AIOWAIT)
+    {
+//        while (_wmgr.curr_file_blocked())
+//        {
+//            if (_wmgr.get_events(pmgr::UNUSED, &_aio_cmpl_timeout) == jerrno::AIO_TIMEOUT)
+//            {
+//                std::ostringstream oss;
+//                oss << "get_events() returned JERR_JCNTL_AIOCMPLWAIT; wmgr_status: " << _wmgr.status_str();
+//                this->log(LOG_CRITICAL, oss.str());
+//                throw jexception(jerrno::JERR_JCNTL_AIOCMPLWAIT, "jcntl", "handle_aio_wait");
+//            }
+//        }
+//        _wrfc.wr_reset();
+        resout = RHM_IORES_SUCCESS;
+        data_tok::write_state ws = dtp->wstate();
+        return ws == data_tok::ENQ_PART || ws == data_tok::DEQ_PART || ws == data_tok::ABORT_PART ||
+                ws == data_tok::COMMIT_PART;
+    }
+    return false;
+}
+
+}}



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org