You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kp...@apache.org on 2013/10/22 21:09:58 UTC

svn commit: r1534736 [7/8] - in /qpid/trunk/qpid/cpp/src: ./ qpid/linearstore/ qpid/linearstore/jrnl/ qpid/linearstore/jrnl/utils/ tests/linearstore/

Added: qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/jexception.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/jexception.h?rev=1534736&view=auto
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/jexception.h (added)
+++ qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/jexception.h Tue Oct 22 19:09:56 2013
@@ -0,0 +1,126 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#ifndef QPID_LEGACYSTORE_JRNL_JEXCEPTION_H
+#define QPID_LEGACYSTORE_JRNL_JEXCEPTION_H
+
+namespace qpid
+{
+namespace qls_jrnl
+{
+class jexception;
+}}
+
+#include <cerrno>
+#include <cstdio>
+#include <cstdlib>
+#include <cstring>
+#include <exception>
+#include "qpid/linearstore/jrnl/jerrno.h"
+#include <sstream>
+#include <string>
+
+// Macro for formatting commom system errors
+#define FORMAT_SYSERR(errno) " errno=" << errno << " (" << std::strerror(errno) << ")"
+
+#define MALLOC_CHK(ptr, var, cls, fn) if(ptr == 0) { \
+    clean(); \
+    std::ostringstream oss; \
+    oss << var << ": malloc() failed: " << FORMAT_SYSERR(errno); \
+    throw jexception(jerrno::JERR__MALLOC, oss.str(), cls, fn); \
+    }
+
+// TODO: The following is a temporary bug-tracking aid which forces a core.
+// Replace with the commented out version below when BZ484048 is resolved.
+#define PTHREAD_CHK(err, pfn, cls, fn) if(err != 0) { \
+    std::ostringstream oss; \
+    oss << cls << "::" << fn << "(): " << pfn; \
+    errno = err; \
+    ::perror(oss.str().c_str()); \
+    ::abort(); \
+    }
+/*
+#define PTHREAD_CHK(err, pfn, cls, fn) if(err != 0) { \
+    std::ostringstream oss; \
+    oss << pfn << " failed: " << FORMAT_SYSERR(err); \
+    throw jexception(jerrno::JERR__PTHREAD, oss.str(), cls, fn); \
+    }
+*/
+
+#define ASSERT(cond, msg) if(cond == 0) { \
+    std::cerr << msg << std::endl; \
+    ::abort(); \
+    }
+
+namespace qpid
+{
+namespace qls_jrnl
+{
+    /**
+    * \class jexception
+    * \brief Generic journal exception class
+    */
+    class jexception : public std::exception
+    {
+    private:
+        uint32_t _err_code;
+        std::string _additional_info;
+        std::string _throwing_class;
+        std::string _throwing_fn;
+        std::string _what;
+        void format();
+
+    public:
+        jexception() throw ();
+
+        jexception(const uint32_t err_code) throw ();
+
+        jexception(const char* additional_info) throw ();
+        jexception(const std::string& additional_info) throw ();
+
+        jexception(const uint32_t err_code, const char* additional_info) throw ();
+        jexception(const uint32_t err_code, const std::string& additional_info) throw ();
+
+        jexception(const uint32_t err_code, const char* throwing_class, const char* throwing_fn)
+                throw ();
+        jexception(const uint32_t err_code, const std::string& throwing_class,
+                const std::string& throwing_fn) throw ();
+
+        jexception(const uint32_t err_code, const char* additional_info,
+                const char* throwing_class, const char* throwing_fn) throw ();
+        jexception(const uint32_t err_code, const std::string& additional_info,
+                const std::string& throwing_class, const std::string& throwing_fn) throw ();
+
+        virtual ~jexception() throw ();
+        virtual const char* what() const throw (); // override std::exception::what()
+
+        inline uint32_t err_code() const throw () { return _err_code; }
+        inline const std::string additional_info() const throw () { return _additional_info; }
+        inline const std::string throwing_class() const throw () { return _throwing_class; }
+        inline const std::string throwing_fn() const throw () { return _throwing_fn; }
+
+        friend std::ostream& operator<<(std::ostream& os, const jexception& je);
+        friend std::ostream& operator<<(std::ostream& os, const jexception* jePtr);
+    }; // class jexception
+
+}}
+
+#endif // ifndef QPID_LEGACYSTORE_JRNL_JEXCEPTION_H

Added: qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/jrec.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/jrec.cpp?rev=1534736&view=auto
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/jrec.cpp (added)
+++ qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/jrec.cpp Tue Oct 22 19:09:56 2013
@@ -0,0 +1,107 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "qpid/linearstore/jrnl/jrec.h"
+
+#include <iomanip>
+#include "qpid/linearstore/jrnl/jerrno.h"
+#include "qpid/linearstore/jrnl/jexception.h"
+#include <sstream>
+
+namespace qpid
+{
+namespace qls_jrnl
+{
+
+jrec::jrec() {}
+jrec::~jrec() {}
+
+void
+jrec::chk_hdr(const rec_hdr_t& hdr)
+{
+    if (hdr._magic == 0)
+    {
+        std::ostringstream oss;
+        oss << std::hex << std::setfill('0');
+        oss << "enq magic NULL: rid=0x" << hdr._rid;
+        throw jexception(jerrno::JERR_JREC_BADRECHDR, oss.str(), "jrec", "chk_hdr");
+    }
+    if (hdr._version != QLS_JRNL_VERSION)
+    {
+        std::ostringstream oss;
+        oss << std::hex << std::setfill('0');
+        oss << "version: rid=0x" << hdr._rid;
+        oss << ": expected=0x" << std::setw(2) << (int)QLS_JRNL_VERSION;
+        oss << " read=0x" << std::setw(2) << (int)hdr._version;
+        throw jexception(jerrno::JERR_JREC_BADRECHDR, oss.str(), "jrec", "chk_hdr");
+    }
+//#if defined (JRNL_LITTLE_ENDIAN)
+//    uint8_t endian_flag = RHM_LENDIAN_FLAG;
+//#else
+//    uint8_t endian_flag = RHM_BENDIAN_FLAG;
+//#endif
+//    if (hdr._eflag != endian_flag)
+//    {
+//        std::ostringstream oss;
+//        oss << std::hex << std::setfill('0');
+//        oss << "endian_flag: rid=" << hdr._rid;
+//        oss << ": expected=0x" << std::setw(2) << (int)endian_flag;
+//        oss << " read=0x" << std::setw(2) << (int)hdr._eflag;
+//        throw jexception(jerrno::JERR_JREC_BADRECHDR, oss.str(), "jrec", "chk_hdr");
+//    }
+}
+
+void
+jrec::chk_rid(const rec_hdr_t& hdr, const uint64_t rid)
+{
+    if (hdr._rid != rid)
+    {
+        std::ostringstream oss;
+        oss << std::hex << std::setfill('0');
+        oss << "rid mismatch: expected=0x" << rid;
+        oss << " read=0x" << hdr._rid;
+        throw jexception(jerrno::JERR_JREC_BADRECHDR, oss.str(), "jrec", "chk_hdr");
+    }
+}
+
+void
+jrec::chk_tail(const rec_tail_t& tail, const rec_hdr_t& hdr)
+{
+    if (tail._xmagic != ~hdr._magic)
+    {
+        std::ostringstream oss;
+        oss << std::hex << std::setfill('0');
+        oss << "magic: rid=0x" << hdr._rid;
+        oss << ": expected=0x" << ~hdr._magic;
+        oss << " read=0x" << tail._xmagic;
+        throw jexception(jerrno::JERR_JREC_BADRECTAIL, oss.str(), "jrec", "chk_tail");
+    }
+    if (tail._rid != hdr._rid)
+    {
+        std::ostringstream oss;
+        oss << std::hex << std::setfill('0');
+        oss << "rid: rid=0x" << hdr._rid;
+        oss << ": read=0x" << tail._rid;
+        throw jexception(jerrno::JERR_JREC_BADRECTAIL, oss.str(), "jrec", "chk_tail");
+    }
+}
+
+}}

Added: qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/jrec.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/jrec.h?rev=1534736&view=auto
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/jrec.h (added)
+++ qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/jrec.h Tue Oct 22 19:09:56 2013
@@ -0,0 +1,170 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#ifndef QPID_LEGACYSTORE_JRNL_JREC_H
+#define QPID_LEGACYSTORE_JRNL_JREC_H
+
+namespace qpid
+{
+namespace qls_jrnl
+{
+class jrec;
+}}
+
+#include <cstddef>
+#include <fstream>
+#include "qpid/linearstore/jrnl/jcfg.h"
+#include "qpid/linearstore/jrnl/utils/rec_hdr.h"
+#include "qpid/linearstore/jrnl/utils/rec_tail.h"
+#include <string>
+
+namespace qpid
+{
+namespace qls_jrnl
+{
+
+    /**
+    * \class jrec
+    * \brief Abstract class for all file jrecords, both data and log. This class establishes
+    *     the common data format and structure for these jrecords.
+    */
+    class jrec
+    {
+    public:
+        jrec();
+        virtual ~jrec();
+
+        /**
+        * \brief Encode this instance of jrec into the write buffer at the disk-block-aligned
+        *   pointer wptr starting at position rec_offs_dblks in the encoded record to a
+        *   maximum size of max_size_dblks.
+        *
+        * This call encodes the content of the data contianed in this instance of jrec into a
+        * disk-softblock-aligned (defined by JRNL_SBLK_SIZE) buffer pointed to by parameter
+        * wptr. No more than paramter max_size_dblks data-blocks may be written to the buffer.
+        * The parameter rec_offs_dblks is the offset in data-blocks within the fully encoded
+        * data block this instance represents at which to start encoding.
+        *
+        * Encoding entails writing the record header (struct enq_hdr), the data and the record tail
+        * (struct enq_tail). The record must be data-block-aligned (defined by JRNL_DBLK_SIZE),
+        * thus any remaining space in the final data-block is ignored; the returned value is the
+        * number of data-blocks consumed from the page by the encode action. Provided the initial
+        * alignment requirements are met, records may be of arbitrary size and may span multiple
+        * data-blocks, disk-blocks and/or pages.
+        *
+        * Since the record size in data-blocks is known, the general usage pattern is to call
+        * encode() as many times as is needed to fully encode the data. Each call to encode()
+        * will encode as much of the record as it can to what remains of the current page cache,
+        * and will return the number of data-blocks actually encoded.
+        *
+        * <b>Example:</b> Assume that record r1 was previously written to page 0, and that this
+        * is an instance representing record r2. Being larger than the page size ps, r2 would span
+        * multiple pages as follows:
+        * <pre>
+        *       |<---ps--->|
+        *       +----------+----------+----------+----...
+        *       |      |r2a|   r2b    |  r2c   | |
+        *       |<-r1-><----------r2---------->  |
+        *       +----------+----------+----------+----...
+        * page:      p0         p1         p2
+        * </pre>
+        * Encoding record r2 will require multiple calls to encode; one for each page which
+        * is involved. Record r2 is divided logically into sections r2a, r2b and r2c at the
+        * points where the page boundaries intersect with the record. Assuming a page size
+        * of ps, the page boundary pointers are represented by their names p0, p1... and the
+        * sizes of the record segments are represented by their names r1, r2a, r2b..., the calls
+        * should be as follows:
+        * <pre>
+        * encode(p0+r1, 0, ps-r1); (returns r2a data-blocks)
+        * encode(p1, r2a, ps);     (returns r2b data-blocks which equals ps)
+        * encode(p2, r2a+r2b, ps); (returns r2c data-blocks)
+        * </pre>
+        *
+        * \param wptr Data-block-aligned pointer to position in page buffer where encoding is to
+        *   take place.
+        * \param rec_offs_dblks Offset in data-blocks within record from which to start encoding.
+        * \param max_size_dblks Maximum number of data-blocks to write to pointer wptr.
+        * \returns Number of data-blocks encoded.
+        */
+        virtual uint32_t encode(void* wptr, uint32_t rec_offs_dblks,
+                uint32_t max_size_dblks) = 0;
+
+        /**
+        * \brief Decode into this instance of jrec from the read buffer at the disk-block-aligned
+        *   pointer rptr starting at position jrec_offs_dblks in the encoded record to a
+        *   maximum size of max_size_blks.
+        *
+        * This call decodes a record in the page buffer pointed to by the data-block-aligned
+        * (defined by JRNL_DBLK_SIZE) parameter rptr into this instance of jrec. No more than
+        * paramter max_size_dblks data-blocks may be read from the buffer. The parameter
+        * jrec_offs_dblks is the offset in data-blocks within the encoded record at which to start
+        * decoding.
+        *
+        * Decoding entails reading the record header, the data and the tail. The record is
+        * data-block-aligned (defined by JRNL_DBLK_SIZE); the returned value is the number of
+        * data-blocks read from the buffer by the decode action. As the record data size is only
+        * known once the header is read, the number of calls required to complete reading the
+        * record will depend on the vlaues within this instance which are set when the
+        * header is decoded.
+        *
+        * A non-zero value for jrec_offs_dblks implies that this is not the first call to
+        * decode and the record data will be appended at this offset.
+        *
+        * \param h Reference to instance of struct hdr, already read from page buffer and used
+        *   to determine record type
+        * \param rptr Data-block-aligned pointer to position in page buffer where decoding is to
+        *   begin.
+        * \param rec_offs_dblks Offset within record from which to start appending the decoded
+        *   record.
+        * \param max_size_dblks Maximum number of data-blocks to read from pointer rptr.
+        * \returns Number of data-blocks read (consumed).
+        */
+        virtual uint32_t decode(rec_hdr_t& h, void* rptr, uint32_t rec_offs_dblks,
+                uint32_t max_size_dblks) = 0;
+
+        virtual bool rcv_decode(rec_hdr_t h, std::ifstream* ifsp, std::size_t& rec_offs) = 0;
+
+        virtual std::string& str(std::string& str) const = 0;
+        virtual std::size_t data_size() const = 0;
+        virtual std::size_t xid_size() const = 0;
+        virtual std::size_t rec_size() const = 0;
+        inline virtual uint32_t rec_size_dblks() const { return size_dblks(rec_size()); }
+        static inline uint32_t size_dblks(const std::size_t size)
+                { return size_blks(size, QLS_DBLK_SIZE_BYTES); }
+        static inline uint32_t size_sblks(const std::size_t size)
+                { return size_blks(size, QLS_SBLK_SIZE_BYTES); }
+        static inline uint32_t size_blks(const std::size_t size, const std::size_t blksize)
+                { return (size + blksize - 1)/blksize; }
+        virtual uint64_t rid() const = 0;
+
+    protected:
+        virtual void chk_hdr() const = 0;
+        virtual void chk_hdr(uint64_t rid) const = 0;
+        virtual void chk_tail() const = 0;
+        static void chk_hdr(const rec_hdr_t& hdr);
+        static void chk_rid(const rec_hdr_t& hdr, uint64_t rid);
+        static void chk_tail(const rec_tail_t& tail, const rec_hdr_t& hdr);
+        virtual void clean() = 0;
+    }; // class jrec
+
+}}
+
+#endif // ifndef QPID_LEGACYSTORE_JRNL_JREC_H

Added: qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/pmgr.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/pmgr.cpp?rev=1534736&view=auto
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/pmgr.cpp (added)
+++ qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/pmgr.cpp Tue Oct 22 19:09:56 2013
@@ -0,0 +1,202 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "qpid/linearstore/jrnl/pmgr.h"
+
+#include <cerrno>
+#include <cstdlib>
+#include <cstring>
+#include "qpid/linearstore/jrnl/jcfg.h"
+#include "qpid/linearstore/jrnl/jcntl.h"
+#include "qpid/linearstore/jrnl/jerrno.h"
+#include <sstream>
+
+namespace qpid
+{
+namespace qls_jrnl
+{
+
+pmgr::page_cb::page_cb(uint16_t index):
+        _index(index),
+        _state(UNUSED),
+        _frid(0),
+        _wdblks(0),
+        _rdblks(0),
+        _pdtokl(0),
+        _jfp(0),
+        _pbuff(0)
+{}
+
+// TODO: almost identical to pmgr::page_state_str() below - resolve
+const char*
+pmgr::page_cb::state_str() const
+{
+    switch(_state)
+    {
+        case UNUSED:
+            return "UNUSED";
+        case IN_USE:
+            return "IN_USE";
+        case AIO_PENDING:
+            return "AIO_PENDING";
+    }
+    return "<unknown>";
+}
+
+// static
+const uint32_t pmgr::_sblkSizeBytes = QLS_SBLK_SIZE_BYTES;
+
+pmgr::pmgr(jcntl* jc, enq_map& emap, txn_map& tmap):
+        _cache_pgsize_sblks(0),
+        _cache_num_pages(0),
+        _jc(jc),
+        _emap(emap),
+        _tmap(tmap),
+        _page_base_ptr(0),
+        _page_ptr_arr(0),
+        _page_cb_arr(0),
+        _aio_cb_arr(0),
+        _aio_event_arr(0),
+        _ioctx(0),
+        _pg_index(0),
+        _pg_cntr(0),
+        _pg_offset_dblks(0),
+        _aio_evt_rem(0),
+        _cbp(0),
+        _enq_rec(),
+        _deq_rec(),
+        _txn_rec()
+{}
+
+pmgr::~pmgr()
+{
+    pmgr::clean();
+}
+
+void
+pmgr::initialize(aio_callback* const cbp, const uint32_t cache_pgsize_sblks, const uint16_t cache_num_pages)
+{
+    // As static use of this class keeps old values around, clean up first...
+    pmgr::clean();
+    _pg_index = 0;
+    _pg_cntr = 0;
+    _pg_offset_dblks = 0;
+    _aio_evt_rem = 0;
+    _cache_pgsize_sblks = cache_pgsize_sblks;
+    _cache_num_pages = cache_num_pages;
+    _cbp = cbp;
+
+    // 1. Allocate page memory (as a single block)
+    std::size_t cache_pgsize = _cache_num_pages * _cache_pgsize_sblks * _sblkSizeBytes;
+    if (::posix_memalign(&_page_base_ptr, QLS_AIO_ALIGN_BOUNDARY_BYTES, cache_pgsize))
+    {
+        clean();
+        std::ostringstream oss;
+        oss << "posix_memalign(): alignment=" << QLS_AIO_ALIGN_BOUNDARY_BYTES << " size=" << cache_pgsize;
+        oss << FORMAT_SYSERR(errno);
+        throw jexception(jerrno::JERR__MALLOC, oss.str(), "pmgr", "initialize");
+    }
+
+    // 2. Allocate array of page pointers
+    _page_ptr_arr = (void**)std::malloc(_cache_num_pages * sizeof(void*));
+    MALLOC_CHK(_page_ptr_arr, "_page_ptr_arr", "pmgr", "initialize");
+
+    // 3. Allocate and initialize page control block (page_cb) array
+    _page_cb_arr = (page_cb*)std::malloc(_cache_num_pages * sizeof(page_cb));
+    MALLOC_CHK(_page_cb_arr, "_page_cb_arr", "pmgr", "initialize");
+    std::memset(_page_cb_arr, 0, _cache_num_pages * sizeof(page_cb));
+
+    // 4. Allocate IO control block (iocb) array
+    _aio_cb_arr = (aio_cb*)std::malloc(_cache_num_pages * sizeof(aio_cb));
+    MALLOC_CHK(_aio_cb_arr, "_aio_cb_arr", "pmgr", "initialize");
+
+    // 5. Set page pointers in _page_ptr_arr, _page_cb_arr and iocbs to pages within page block
+    for (uint16_t i=0; i<_cache_num_pages; i++)
+    {
+        _page_ptr_arr[i] = (void*)((char*)_page_base_ptr + _cache_pgsize_sblks * _sblkSizeBytes * i);
+        _page_cb_arr[i]._index = i;
+        _page_cb_arr[i]._state = UNUSED;
+        _page_cb_arr[i]._pbuff = _page_ptr_arr[i];
+        _page_cb_arr[i]._pdtokl = new std::deque<data_tok*>;
+        _page_cb_arr[i]._pdtokl->clear();
+        _aio_cb_arr[i].data = (void*)&_page_cb_arr[i];
+    }
+
+    // 6. Allocate io_event array, max one event per cache page plus one for each file
+    const uint16_t max_aio_evts = _cache_num_pages + 1; // One additional event for file header writes
+    _aio_event_arr = (aio_event*)std::malloc(max_aio_evts * sizeof(aio_event));
+    MALLOC_CHK(_aio_event_arr, "_aio_event_arr", "pmgr", "initialize");
+
+    // 7. Initialize AIO context
+    if (int ret = aio::queue_init(max_aio_evts, &_ioctx))
+    {
+        std::ostringstream oss;
+        oss << "io_queue_init() failed: " << FORMAT_SYSERR(-ret);
+        throw jexception(jerrno::JERR__AIO, oss.str(), "pmgr", "initialize");
+    }
+}
+
+void
+pmgr::clean()
+{
+    // Clean up allocated memory here
+
+    if (_ioctx)
+        aio::queue_release(_ioctx);
+
+    std::free(_page_base_ptr);
+    _page_base_ptr = 0;
+
+    if (_page_cb_arr)
+    {
+        for (int i=0; i<_cache_num_pages; i++)
+            delete _page_cb_arr[i]._pdtokl;
+        std::free(_page_ptr_arr);
+        _page_ptr_arr = 0;
+    }
+
+    std::free(_page_cb_arr);
+    _page_cb_arr = 0;
+
+    std::free(_aio_cb_arr);
+    _aio_cb_arr = 0;
+
+    std::free(_aio_event_arr);
+    _aio_event_arr = 0;
+}
+
+// TODO: almost identical to pmgr::page_cb::state_str() above - resolve
+const char*
+pmgr::page_state_str(page_state ps)
+{
+    switch (ps)
+    {
+        case UNUSED:
+            return "UNUSED";
+        case IN_USE:
+            return "IN_USE";
+        case AIO_PENDING:
+            return "AIO_PENDING";
+    }
+    return "<page_state unknown>";
+}
+
+}}

Added: qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/pmgr.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/pmgr.h?rev=1534736&view=auto
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/pmgr.h (added)
+++ qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/pmgr.h Tue Oct 22 19:09:56 2013
@@ -0,0 +1,127 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#ifndef QPID_LEGACYSTORE_JRNL_PMGR_H
+#define QPID_LEGACYSTORE_JRNL_PMGR_H
+
+namespace qpid
+{
+namespace qls_jrnl
+{
+    class pmgr;
+    class jcntl;
+}}
+
+#include <deque>
+#include "qpid/linearstore/jrnl/aio.h"
+#include "qpid/linearstore/jrnl/aio_callback.h"
+#include "qpid/linearstore/jrnl/data_tok.h"
+#include "qpid/linearstore/jrnl/deq_rec.h"
+#include "qpid/linearstore/jrnl/enq_map.h"
+#include "qpid/linearstore/jrnl/enq_rec.h"
+#include "qpid/linearstore/jrnl/txn_map.h"
+#include "qpid/linearstore/jrnl/txn_rec.h"
+
+namespace qpid
+{
+namespace qls_jrnl
+{
+class JournalFile;
+
+    /**
+    * \brief Abstract class for managing either read or write page cache of arbitrary size and
+    *    number of cache_num_pages.
+    */
+    class pmgr
+    {
+    public:
+        /**
+        * \brief Enumeration of possible stats of a page within a page cache.
+        */
+        enum page_state
+        {
+            UNUSED,                     ///< A page is uninitialized, contains no data.
+            IN_USE,                     ///< Page is in use.
+            AIO_PENDING                 ///< An AIO request outstanding.
+        };
+
+        /**
+        * \brief Page control block, carries control and state information for each page in the
+        *     cache.
+        */
+        struct page_cb
+        {
+            uint16_t _index;            ///< Index of this page
+            page_state _state;          ///< Status of page
+            uint64_t _frid;             ///< First rid in page (used for fhdr init)
+            uint32_t _wdblks;           ///< Total number of dblks in page so far
+            uint32_t _rdblks;           ///< Total number of dblks in page
+            std::deque<data_tok*>* _pdtokl; ///< Page message tokens list
+            JournalFile* _jfp;          ///< Journal file for incrementing compl counts
+            void* _pbuff;               ///< Page buffer
+
+            page_cb(uint16_t index);   ///< Convenience constructor
+            const char* state_str() const; ///< Return state as string for this pcb
+        };
+
+    protected:
+        static const uint32_t _sblkSizeBytes; ///< Disk softblock size
+        uint32_t _cache_pgsize_sblks;   ///< Size of page cache cache_num_pages
+        uint16_t _cache_num_pages;      ///< Number of page cache cache_num_pages
+        jcntl* _jc;                     ///< Pointer to journal controller
+        enq_map& _emap;                 ///< Ref to enqueue map
+        txn_map& _tmap;                 ///< Ref to transaction map
+        void* _page_base_ptr;           ///< Base pointer to page memory
+        void** _page_ptr_arr;           ///< Array of pointers to cache_num_pages in page memory
+        page_cb* _page_cb_arr;          ///< Array of page_cb structs
+        aio_cb* _aio_cb_arr;            ///< Array of iocb structs
+        aio_event* _aio_event_arr;      ///< Array of io_events
+        io_context_t _ioctx;            ///< AIO context for read/write operations
+        uint16_t _pg_index;             ///< Index of current page being used
+        uint32_t _pg_cntr;              ///< Page counter; determines if file rotation req'd
+        uint32_t _pg_offset_dblks;      ///< Page offset (used so far) in data blocks
+        uint32_t _aio_evt_rem;          ///< Remaining AIO events
+        aio_callback* _cbp;             ///< Pointer to callback object
+
+        enq_rec _enq_rec;               ///< Enqueue record used for encoding/decoding
+        deq_rec _deq_rec;               ///< Dequeue record used for encoding/decoding
+        txn_rec _txn_rec;               ///< Transaction record used for encoding/decoding
+
+    public:
+        pmgr(jcntl* jc, enq_map& emap, txn_map& tmap);
+        virtual ~pmgr();
+
+        virtual int32_t get_events(timespec* const timeout, bool flush) = 0;
+        inline uint32_t get_aio_evt_rem() const { return _aio_evt_rem; }
+        static const char* page_state_str(page_state ps);
+        inline uint32_t cache_pgsize_sblks() const { return _cache_pgsize_sblks; }
+        inline uint16_t cache_num_pages() const { return _cache_num_pages; }
+
+    protected:
+        virtual void initialize(aio_callback* const cbp, const uint32_t cache_pgsize_sblks,
+                const uint16_t cache_num_pages);
+        virtual void rotate_page() = 0;
+        virtual void clean();
+    };
+
+}}
+
+#endif // ifndef QPID_LEGACYSTORE_JRNL_PMGR_H

Added: qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/slock.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/slock.h?rev=1534736&view=auto
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/slock.h (added)
+++ qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/slock.h Tue Oct 22 19:09:56 2013
@@ -0,0 +1,73 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#ifndef QPID_LEGACYSTORE_JRNL_SLOCK_H
+#define QPID_LEGACYSTORE_JRNL_SLOCK_H
+
+#include "qpid/linearstore/jrnl/jexception.h"
+#include "qpid/linearstore/jrnl/smutex.h"
+#include <pthread.h>
+
+namespace qpid
+{
+namespace qls_jrnl
+{
+
+    // Ultra-simple scoped lock class, auto-releases mutex when it goes out-of-scope
+    class slock
+    {
+    protected:
+        const smutex& _sm;
+    public:
+        inline slock(const smutex& sm) : _sm(sm)
+        {
+            PTHREAD_CHK(::pthread_mutex_lock(_sm.get()), "::pthread_mutex_lock", "slock", "slock");
+        }
+        inline ~slock()
+        {
+            PTHREAD_CHK(::pthread_mutex_unlock(_sm.get()), "::pthread_mutex_unlock", "slock", "~slock");
+        }
+    };
+
+    // Ultra-simple scoped try-lock class, auto-releases mutex when it goes out-of-scope
+    class stlock
+    {
+    protected:
+        const smutex& _sm;
+        bool _locked;
+    public:
+        inline stlock(const smutex& sm) : _sm(sm), _locked(false)
+        {
+            int ret = ::pthread_mutex_trylock(_sm.get());
+            _locked = (ret == 0); // check if lock obtained
+            if (!_locked && ret != EBUSY) PTHREAD_CHK(ret, "::pthread_mutex_trylock", "stlock", "stlock");
+        }
+        inline ~stlock()
+        {
+            if (_locked)
+                PTHREAD_CHK(::pthread_mutex_unlock(_sm.get()), "::pthread_mutex_unlock", "stlock", "~stlock");
+        }
+        inline bool locked() const { return _locked; }
+    };
+
+}}
+
+#endif // ifndef QPID_LEGACYSTORE_JRNL_SLOCK_H

Added: qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/smutex.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/smutex.h?rev=1534736&view=auto
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/smutex.h (added)
+++ qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/smutex.h Tue Oct 22 19:09:56 2013
@@ -0,0 +1,52 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#ifndef QPID_LEGACYSTORE_JRNL_SMUTEX_H
+#define QPID_LEGACYSTORE_JRNL_SMUTEX_H
+
+#include "qpid/linearstore/jrnl/jexception.h"
+#include <pthread.h>
+
+namespace qpid
+{
+namespace qls_jrnl
+{
+
+    // Ultra-simple scoped mutex class that allows a posix mutex to be initialized and destroyed with error checks
+    class smutex
+    {
+    protected:
+        mutable pthread_mutex_t _m;
+    public:
+        inline smutex()
+        {
+            PTHREAD_CHK(::pthread_mutex_init(&_m, 0), "::pthread_mutex_init", "smutex", "smutex");
+        }
+        inline virtual ~smutex()
+        {
+            PTHREAD_CHK(::pthread_mutex_destroy(&_m), "::pthread_mutex_destroy", "smutex", "~smutex");
+        }
+        inline pthread_mutex_t* get() const { return &_m; }
+    };
+
+}}
+
+#endif // ifndef QPID_LEGACYSTORE_JRNL_SMUTEX_H

Added: qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/time_ns.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/time_ns.cpp?rev=1534736&view=auto
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/time_ns.cpp (added)
+++ qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/time_ns.cpp Tue Oct 22 19:09:56 2013
@@ -0,0 +1,43 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "qpid/linearstore/jrnl/time_ns.h"
+
+#include <sstream>
+
+namespace qpid
+{
+namespace qls_jrnl
+{
+
+const std::string
+time_ns::str(int precision) const
+{
+    const double t = tv_sec + (tv_nsec/1e9);
+    std::ostringstream oss;
+    oss.setf(std::ios::fixed, std::ios::floatfield);
+    oss.precision(precision);
+    oss << t;
+    return oss.str();
+}
+
+
+}}

Added: qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/time_ns.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/time_ns.h?rev=1534736&view=auto
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/time_ns.h (added)
+++ qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/time_ns.h Tue Oct 22 19:09:56 2013
@@ -0,0 +1,93 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#ifndef QPID_LEGACYSTORE_JRNL_TIME_NS_H
+#define QPID_LEGACYSTORE_JRNL_TIME_NS_H
+
+#include <cerrno>
+#include <ctime>
+#include <string>
+
+namespace qpid
+{
+namespace qls_jrnl
+{
+
+struct time_ns : public timespec
+{
+    inline time_ns() { tv_sec = 0; tv_nsec = 0; }
+    inline time_ns(const std::time_t sec, const long nsec = 0) { tv_sec = sec; tv_nsec = nsec; }
+    inline time_ns(const time_ns& t) { tv_sec = t.tv_sec; tv_nsec = t.tv_nsec; }
+
+    inline void set_zero() { tv_sec = 0; tv_nsec = 0; }
+    inline bool is_zero() const { return tv_sec == 0 && tv_nsec == 0; }
+    inline int now() { if(::clock_gettime(CLOCK_REALTIME, this)) return errno; return 0; }
+    const std::string str(int precision = 6) const;
+
+    inline time_ns& operator=(const time_ns& rhs)
+        { tv_sec = rhs.tv_sec; tv_nsec = rhs.tv_nsec; return *this; }
+    inline time_ns& operator+=(const time_ns& rhs)
+    {
+        tv_nsec += rhs.tv_nsec;
+        if (tv_nsec >= 1000000000L) { tv_sec++; tv_nsec -= 1000000000L; }
+        tv_sec += rhs.tv_sec;
+        return *this;
+    }
+    inline time_ns& operator+=(const long ns)
+    {
+        tv_nsec += ns;
+        if (tv_nsec >= 1000000000L) { tv_sec++; tv_nsec -= 1000000000L; }
+        return *this;
+    }
+    inline time_ns& operator-=(const long ns)
+    {
+        tv_nsec -= ns;
+        if (tv_nsec < 0) { tv_sec--; tv_nsec += 1000000000L; }
+        return *this;
+    }
+    inline time_ns& operator-=(const time_ns& rhs)
+    {
+        tv_nsec -= rhs.tv_nsec;
+        if (tv_nsec < 0) { tv_sec--; tv_nsec += 1000000000L; }
+        tv_sec -= rhs.tv_sec;
+        return *this;
+    }
+    inline const time_ns operator+(const time_ns& rhs)
+        { time_ns t(*this); t += rhs; return t; }
+    inline const time_ns operator-(const time_ns& rhs)
+        { time_ns t(*this); t -= rhs; return t; }
+    inline bool operator==(const time_ns& rhs)
+       { return tv_sec == rhs.tv_sec && tv_nsec == rhs.tv_nsec; }
+    inline bool operator!=(const time_ns& rhs)
+       { return tv_sec != rhs.tv_sec || tv_nsec != rhs.tv_nsec; }
+    inline bool operator>(const time_ns& rhs)
+       { if(tv_sec == rhs.tv_sec) return tv_nsec > rhs.tv_nsec; return tv_sec > rhs.tv_sec; }
+    inline bool operator>=(const time_ns& rhs)
+       { if(tv_sec == rhs.tv_sec) return tv_nsec >= rhs.tv_nsec; return tv_sec >= rhs.tv_sec; }
+    inline bool operator<(const time_ns& rhs)
+       { if(tv_sec == rhs.tv_sec) return tv_nsec < rhs.tv_nsec; return tv_sec < rhs.tv_sec; }
+    inline bool operator<=(const time_ns& rhs)
+       { if(tv_sec == rhs.tv_sec) return tv_nsec <= rhs.tv_nsec; return tv_sec <= rhs.tv_sec; }
+};
+
+}}
+
+#endif // ifndef QPID_LEGACYSTORE_JRNL_TIME_NS_H

Added: qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/txn_map.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/txn_map.cpp?rev=1534736&view=auto
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/txn_map.cpp (added)
+++ qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/txn_map.cpp Tue Oct 22 19:09:56 2013
@@ -0,0 +1,248 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "qpid/linearstore/jrnl/txn_map.h"
+
+#include <iomanip>
+#include "qpid/linearstore/jrnl/jerrno.h"
+#include "qpid/linearstore/jrnl/jexception.h"
+#include "qpid/linearstore/jrnl/slock.h"
+#include <sstream>
+
+namespace qpid
+{
+namespace qls_jrnl
+{
+
+// return/error codes
+int16_t txn_map::TMAP_RID_NOT_FOUND = -2;
+int16_t txn_map::TMAP_XID_NOT_FOUND = -1;
+int16_t txn_map::TMAP_OK = 0;
+int16_t txn_map::TMAP_NOT_SYNCED = 0;
+int16_t txn_map::TMAP_SYNCED = 1;
+
+txn_data_struct::txn_data_struct(const uint64_t rid, const uint64_t drid, const uint16_t pfid,
+		const bool enq_flag, const bool commit_flag):
+        _rid(rid),
+        _drid(drid),
+        _pfid(pfid),
+        _enq_flag(enq_flag),
+        _commit_flag(commit_flag),
+        _aio_compl(false)
+{}
+
+txn_map::txn_map():
+        _map()/*,
+        _pfid_txn_cnt()*/
+{}
+
+txn_map::~txn_map() {}
+
+/*
+void
+txn_map::set_num_jfiles(const uint16_t num_jfiles)
+{
+    _pfid_txn_cnt.resize(num_jfiles, 0);
+}
+*/
+
+/*
+uint32_t
+txn_map::get_txn_pfid_cnt(const uint16_t pfid) const
+{
+    return _pfid_txn_cnt.at(pfid);
+}
+*/
+
+bool
+txn_map::insert_txn_data(const std::string& xid, const txn_data& td)
+{
+    bool ok = true;
+    slock s(_mutex);
+    xmap_itr itr = _map.find(xid);
+    if (itr == _map.end()) // not found in map
+    {
+        txn_data_list list;
+        list.push_back(td);
+        std::pair<xmap_itr, bool> ret = _map.insert(xmap_param(xid, list));
+        if (!ret.second) // duplicate
+            ok = false;
+    }
+    else
+        itr->second.push_back(td);
+//    _pfid_txn_cnt.at(td._pfid)++;
+    return ok;
+}
+
+const txn_data_list
+txn_map::get_tdata_list(const std::string& xid)
+{
+    slock s(_mutex);
+    return get_tdata_list_nolock(xid);
+}
+
+const txn_data_list
+txn_map::get_tdata_list_nolock(const std::string& xid)
+{
+    xmap_itr itr = _map.find(xid);
+    if (itr == _map.end()) // not found in map
+        return _empty_data_list;
+    return itr->second;
+}
+
+const txn_data_list
+txn_map::get_remove_tdata_list(const std::string& xid)
+{
+    slock s(_mutex);
+    xmap_itr itr = _map.find(xid);
+    if (itr == _map.end()) // not found in map
+        return _empty_data_list;
+    txn_data_list list = itr->second;
+    _map.erase(itr);
+//    for (tdl_itr i=list.begin(); i!=list.end(); i++)
+//        _pfid_txn_cnt.at(i->_pfid)--;
+    return list;
+}
+
+bool
+txn_map::in_map(const std::string& xid)
+{
+    slock s(_mutex);
+    xmap_itr itr= _map.find(xid);
+    return itr != _map.end();
+}
+
+uint32_t
+txn_map::enq_cnt()
+{
+    return cnt(true);
+}
+
+uint32_t
+txn_map::deq_cnt()
+{
+    return cnt(true);
+}
+
+uint32_t
+txn_map::cnt(const bool enq_flag)
+{
+    slock s(_mutex);
+    uint32_t c = 0;
+    for (xmap_itr i = _map.begin(); i != _map.end(); i++)
+    {
+        for (tdl_itr j = i->second.begin(); j < i->second.end(); j++)
+        {
+            if (j->_enq_flag == enq_flag)
+                c++;
+        }
+    }
+    return c;
+}
+
+int16_t
+txn_map::is_txn_synced(const std::string& xid)
+{
+    slock s(_mutex);
+    xmap_itr itr = _map.find(xid);
+    if (itr == _map.end()) // not found in map
+        return TMAP_XID_NOT_FOUND;
+    bool is_synced = true;
+    for (tdl_itr litr = itr->second.begin(); litr < itr->second.end(); litr++)
+    {
+        if (!litr->_aio_compl)
+        {
+            is_synced = false;
+            break;
+        }
+    }
+    return is_synced ? TMAP_SYNCED : TMAP_NOT_SYNCED;
+}
+
+int16_t
+txn_map::set_aio_compl(const std::string& xid, const uint64_t rid)
+{
+    slock s(_mutex);
+    xmap_itr itr = _map.find(xid);
+    if (itr == _map.end()) // xid not found in map
+        return TMAP_XID_NOT_FOUND;
+    for (tdl_itr litr = itr->second.begin(); litr < itr->second.end(); litr++)
+    {
+        if (litr->_rid == rid)
+        {
+            litr->_aio_compl = true;
+            return TMAP_OK; // rid found
+        }
+    }
+    // xid present, but rid not found
+    return TMAP_RID_NOT_FOUND;
+}
+
+bool
+txn_map::data_exists(const std::string& xid, const uint64_t rid)
+{
+    bool found = false;
+    {
+        slock s(_mutex);
+        txn_data_list tdl = get_tdata_list_nolock(xid);
+        tdl_itr itr = tdl.begin();
+        while (itr != tdl.end() && !found)
+        {
+            found = itr->_rid == rid;
+            itr++;
+        }
+    }
+    return found;
+}
+
+bool
+txn_map::is_enq(const uint64_t rid)
+{
+    bool found = false;
+    {
+        slock s(_mutex);
+        for (xmap_itr i = _map.begin(); i != _map.end() && !found; i++)
+        {
+            txn_data_list list = i->second;
+            for (tdl_itr j = list.begin(); j < list.end() && !found; j++)
+            {
+                if (j->_enq_flag)
+                    found = j->_rid == rid;
+                else
+                    found = j->_drid == rid;
+            }
+        }
+    }
+    return found;
+}
+
+void
+txn_map::xid_list(std::vector<std::string>& xv)
+{
+    xv.clear();
+    {
+        slock s(_mutex);
+        for (xmap_itr itr = _map.begin(); itr != _map.end(); itr++)
+            xv.push_back(itr->first);
+    }
+}
+
+}}

Added: qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/txn_map.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/txn_map.h?rev=1534736&view=auto
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/txn_map.h (added)
+++ qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/txn_map.h Tue Oct 22 19:09:56 2013
@@ -0,0 +1,145 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#ifndef QPID_LEGACYSTORE_JRNL_TXN_MAP_H
+#define QPID_LEGACYSTORE_JRNL_TXN_MAP_H
+
+namespace qpid
+{
+namespace qls_jrnl
+{
+    class txn_map;
+}}
+
+#include "qpid/linearstore/jrnl/smutex.h"
+#include <map>
+#include <pthread.h>
+#include <string>
+#include <vector>
+
+namespace qpid
+{
+namespace qls_jrnl
+{
+
+    /**
+    * \struct txn_data_struct
+    * \brief Struct encapsulating transaction data necessary for processing a transaction
+    *     in the journal once it is closed with either a commit or abort.
+    */
+    struct txn_data_struct
+    {
+        uint64_t _rid;      ///< Record id for this operation
+        uint64_t _drid;     ///< Dequeue record id for this operation
+        uint16_t _pfid;     ///< Physical file id, to be used when transferring to emap on commit
+        bool _enq_flag;     ///< If true, enq op, otherwise deq op
+        bool _commit_flag;  ///< (2PC transactions) Records 2PC complete c/a mode
+        bool _aio_compl;    ///< Initially false, set to true when record AIO returns
+        txn_data_struct(const uint64_t rid, const uint64_t drid, const uint16_t pfid,
+                const bool enq_flag, const bool commit_flag = false);
+    };
+    typedef txn_data_struct txn_data;
+    typedef std::vector<txn_data> txn_data_list;
+    typedef txn_data_list::iterator tdl_itr;
+
+    /**
+    * \class txn_map
+    * \brief Class for storing transaction data for each open (ie not committed or aborted)
+    *     xid in the store. If aborted, records are discarded; if committed, they are
+    *     transferred to the enqueue map.
+    *
+    * The data is encapsulated by struct txn_data_struct. A vector containing the information
+    * for each operation included as part of the same transaction is mapped against the
+    * xid.
+    *
+    * The aio_compl flag is set true as each AIO write operation for the enqueue or dequeue
+    * returns. Checking that all of these flags are true for a given xid is the mechanism
+    * used to determine if the transaction is syncronized (through method is_txn_synced()).
+    *
+    * On transaction commit, then each operation is handled as follows:
+    *
+    * If an enqueue (_enq_flag is true), then the rid and pfid are transferred to the enq_map.
+    * If a dequeue (_enq_flag is false), then the rid stored in the drid field is used to
+    * remove the corresponding record from the enq_map.
+    *
+    * On transaction abort, then each operation is handled as follows:
+    *
+    * If an enqueue (_enq_flag is true), then the data is simply discarded.
+    * If a dequeue (_enq_flag is false), then the lock for the corresponding enqueue in enq_map
+    * (if not a part of the same transaction) is removed, and the data discarded.
+    *
+    * <pre>
+    *   key      data
+    *
+    *   xid1 --- vector< [ rid, drid, pfid, enq_flag, commit_flag, aio_compl ] >
+    *   xid2 --- vector< [ rid, drid, pfid, enq_flag, commit_flag, aio_compl ] >
+    *   xid3 --- vector< [ rid, drid, pfid, enq_flag, commit_flag, aio_compl ] >
+    *   ...
+    * </pre>
+    */
+    class txn_map
+    {
+    public:
+        // return/error codes
+        static int16_t TMAP_RID_NOT_FOUND;
+        static int16_t TMAP_XID_NOT_FOUND;
+        static int16_t TMAP_OK;
+        static int16_t TMAP_NOT_SYNCED;
+        static int16_t TMAP_SYNCED;
+
+    private:
+        typedef std::pair<std::string, txn_data_list> xmap_param;
+        typedef std::map<std::string, txn_data_list> xmap;
+        typedef xmap::iterator xmap_itr;
+
+        xmap _map;
+        smutex _mutex;
+//        std::vector<uint32_t> _pfid_txn_cnt;
+        const txn_data_list _empty_data_list;
+
+    public:
+        txn_map();
+        virtual ~txn_map();
+
+//        void set_num_jfiles(const uint16_t num_jfiles);
+//        uint32_t get_txn_pfid_cnt(const uint16_t pfid) const;
+        bool insert_txn_data(const std::string& xid, const txn_data& td);
+        const txn_data_list get_tdata_list(const std::string& xid);
+        const txn_data_list get_remove_tdata_list(const std::string& xid);
+        bool in_map(const std::string& xid);
+        uint32_t enq_cnt();
+        uint32_t deq_cnt();
+        int16_t is_txn_synced(const std::string& xid); // -1=xid not found; 0=not synced; 1=synced
+        int16_t set_aio_compl(const std::string& xid, const uint64_t rid); // -2=rid not found; -1=xid not found; 0=done
+        bool data_exists(const std::string& xid, const uint64_t rid);
+        bool is_enq(const uint64_t rid);
+        inline void clear() { _map.clear(); }
+        inline bool empty() const { return _map.empty(); }
+        inline size_t size() const { return _map.size(); }
+        void xid_list(std::vector<std::string>& xv);
+    private:
+        uint32_t cnt(const bool enq_flag);
+        const txn_data_list get_tdata_list_nolock(const std::string& xid);
+    };
+
+}}
+
+#endif // ifndef QPID_LEGACYSTORE_JRNL_TXN_MAP_H

Added: qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/txn_rec.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/txn_rec.cpp?rev=1534736&view=auto
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/txn_rec.cpp (added)
+++ qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/txn_rec.cpp Tue Oct 22 19:09:56 2013
@@ -0,0 +1,440 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "qpid/linearstore/jrnl/txn_rec.h"
+
+#include <cassert>
+#include <cerrno>
+#include <cstdlib>
+#include <cstring>
+#include <iomanip>
+#include "qpid/linearstore/jrnl/jerrno.h"
+#include "qpid/linearstore/jrnl/jexception.h"
+#include <sstream>
+
+namespace qpid
+{
+namespace qls_jrnl
+{
+
+txn_rec::txn_rec():
+//        _txn_hdr(),
+        _xidp(0),
+        _buff(0)
+//        _txn_tail()
+{
+    ::txn_hdr_init(&_txn_hdr, 0, QLS_JRNL_VERSION, 0, 0, 0);
+    ::rec_tail_init(&_txn_tail, 0, 0, 0);
+}
+
+txn_rec::txn_rec(const uint32_t magic, const uint64_t rid, const void* const xidp,
+        const std::size_t xidlen/*, const bool owi*/):
+//        _txn_hdr(magic, RHM_JDAT_VERSION, rid, xidlen, owi),
+        _xidp(xidp),
+        _buff(0)
+//        _txn_tail(_txn_hdr)
+{
+    ::txn_hdr_init(&_txn_hdr, magic, QLS_JRNL_VERSION, 0, rid, xidlen);
+    ::rec_tail_copy(&_txn_tail, &_txn_hdr._rhdr, 0);
+}
+
+txn_rec::~txn_rec()
+{
+    clean();
+}
+
+void
+txn_rec::reset(const uint32_t magic)
+{
+    _txn_hdr._rhdr._magic = magic;
+    _txn_hdr._rhdr._rid = 0;
+    _txn_hdr._xidsize = 0;
+    _xidp = 0;
+    _buff = 0;
+    _txn_tail._xmagic = ~magic;
+    _txn_tail._rid = 0;
+}
+
+void
+txn_rec::reset(const uint32_t magic, const  uint64_t rid, const void* const xidp,
+        const std::size_t xidlen/*, const bool owi*/)
+{
+    _txn_hdr._rhdr._magic = magic;
+    _txn_hdr._rhdr._rid = rid;
+//    _txn_hdr.set_owi(owi);
+    _txn_hdr._xidsize = xidlen;
+    _xidp = xidp;
+    _buff = 0;
+    _txn_tail._xmagic = ~magic;
+    _txn_tail._rid = rid;
+}
+
+uint32_t
+txn_rec::encode(void* wptr, uint32_t rec_offs_dblks, uint32_t max_size_dblks)
+{
+    assert(wptr != 0);
+    assert(max_size_dblks > 0);
+    assert(_xidp != 0 && _txn_hdr._xidsize > 0);
+
+    std::size_t rec_offs = rec_offs_dblks * QLS_DBLK_SIZE_BYTES;
+    std::size_t rem = max_size_dblks * QLS_DBLK_SIZE_BYTES;
+    std::size_t wr_cnt = 0;
+    if (rec_offs_dblks) // Continuation of split dequeue record (over 2 or more pages)
+    {
+        if (size_dblks(rec_size()) - rec_offs_dblks > max_size_dblks) // Further split required
+        {
+            rec_offs -= sizeof(txn_hdr_t);
+            std::size_t wsize = _txn_hdr._xidsize > rec_offs ? _txn_hdr._xidsize - rec_offs : 0;
+            std::size_t wsize2 = wsize;
+            if (wsize)
+            {
+                if (wsize > rem)
+                    wsize = rem;
+                std::memcpy(wptr, (const char*)_xidp + rec_offs, wsize);
+                wr_cnt += wsize;
+                rem -= wsize;
+            }
+            rec_offs -= _txn_hdr._xidsize - wsize2;
+            if (rem)
+            {
+                wsize = sizeof(_txn_tail) > rec_offs ? sizeof(_txn_tail) - rec_offs : 0;
+                wsize2 = wsize;
+                if (wsize)
+                {
+                    if (wsize > rem)
+                        wsize = rem;
+                    std::memcpy((char*)wptr + wr_cnt, (char*)&_txn_tail + rec_offs, wsize);
+                    wr_cnt += wsize;
+                    rem -= wsize;
+                }
+                rec_offs -= sizeof(_txn_tail) - wsize2;
+            }
+            assert(rem == 0);
+            assert(rec_offs == 0);
+        }
+        else // No further split required
+        {
+            rec_offs -= sizeof(txn_hdr_t);
+            std::size_t wsize = _txn_hdr._xidsize > rec_offs ? _txn_hdr._xidsize - rec_offs : 0;
+            if (wsize)
+            {
+                std::memcpy(wptr, (const char*)_xidp + rec_offs, wsize);
+                wr_cnt += wsize;
+            }
+            rec_offs -= _txn_hdr._xidsize - wsize;
+            wsize = sizeof(_txn_tail) > rec_offs ? sizeof(_txn_tail) - rec_offs : 0;
+            if (wsize)
+            {
+                std::memcpy((char*)wptr + wr_cnt, (char*)&_txn_tail + rec_offs, wsize);
+                wr_cnt += wsize;
+#ifdef QLS_CLEAN
+                std::size_t rec_offs = rec_offs_dblks * QLS_DBLK_SIZE_BYTES;
+                std::size_t dblk_rec_size = size_dblks(rec_size() - rec_offs) * QLS_DBLK_SIZE_BYTES;
+                std::memset((char*)wptr + wr_cnt, QLS_CLEAN_CHAR, dblk_rec_size - wr_cnt);
+#endif
+            }
+            rec_offs -= sizeof(_txn_tail) - wsize;
+            assert(rec_offs == 0);
+        }
+    }
+    else // Start at beginning of data record
+    {
+        // Assumption: the header will always fit into the first dblk
+        std::memcpy(wptr, (void*)&_txn_hdr, sizeof(txn_hdr_t));
+        wr_cnt = sizeof(txn_hdr_t);
+        if (size_dblks(rec_size()) > max_size_dblks) // Split required
+        {
+            std::size_t wsize;
+            rem -= sizeof(txn_hdr_t);
+            if (rem)
+            {
+                wsize = rem >= _txn_hdr._xidsize ? _txn_hdr._xidsize : rem;
+                std::memcpy((char*)wptr + wr_cnt, _xidp, wsize);
+                wr_cnt += wsize;
+                rem -= wsize;
+            }
+            if (rem)
+            {
+                wsize = rem >= sizeof(_txn_tail) ? sizeof(_txn_tail) : rem;
+                std::memcpy((char*)wptr + wr_cnt, (void*)&_txn_tail, wsize);
+                wr_cnt += wsize;
+                rem -= wsize;
+            }
+            assert(rem == 0);
+        }
+        else // No split required
+        {
+            std::memcpy((char*)wptr + wr_cnt, _xidp, _txn_hdr._xidsize);
+            wr_cnt += _txn_hdr._xidsize;
+            std::memcpy((char*)wptr + wr_cnt, (void*)&_txn_tail, sizeof(_txn_tail));
+            wr_cnt += sizeof(_txn_tail);
+#ifdef QLS_CLEAN
+            std::size_t dblk_rec_size = size_dblks(rec_size()) * QLS_DBLK_SIZE_BYTES;
+            std::memset((char*)wptr + wr_cnt, QLS_CLEAN_CHAR, dblk_rec_size - wr_cnt);
+#endif
+        }
+    }
+    return size_dblks(wr_cnt);
+}
+
+uint32_t
+txn_rec::decode(rec_hdr_t& h, void* rptr, uint32_t rec_offs_dblks, uint32_t max_size_dblks)
+{
+    assert(rptr != 0);
+    assert(max_size_dblks > 0);
+
+    std::size_t rd_cnt = 0;
+    if (rec_offs_dblks) // Continuation of record on new page
+    {
+        const uint32_t hdr_xid_dblks = size_dblks(sizeof(txn_hdr_t) + _txn_hdr._xidsize);
+        const uint32_t hdr_xid_tail_dblks = size_dblks(sizeof(txn_hdr_t) +  _txn_hdr._xidsize + sizeof(rec_tail_t));
+        const std::size_t rec_offs = rec_offs_dblks * QLS_DBLK_SIZE_BYTES;
+
+        if (hdr_xid_tail_dblks - rec_offs_dblks <= max_size_dblks)
+        {
+            // Remainder of xid fits within this page
+            if (rec_offs - sizeof(txn_hdr_t) < _txn_hdr._xidsize)
+            {
+                // Part of xid still outstanding, copy remainder of xid and tail
+                const std::size_t xid_offs = rec_offs - sizeof(txn_hdr_t);
+                const std::size_t xid_rem = _txn_hdr._xidsize - xid_offs;
+                std::memcpy((char*)_buff + xid_offs, rptr, xid_rem);
+                rd_cnt = xid_rem;
+                std::memcpy((void*)&_txn_tail, ((char*)rptr + rd_cnt), sizeof(_txn_tail));
+                chk_tail();
+                rd_cnt += sizeof(_txn_tail);
+            }
+            else
+            {
+                // Tail or part of tail only outstanding, complete tail
+                const std::size_t tail_offs = rec_offs - sizeof(txn_hdr_t) - _txn_hdr._xidsize;
+                const std::size_t tail_rem = sizeof(rec_tail_t) - tail_offs;
+                std::memcpy((char*)&_txn_tail + tail_offs, rptr, tail_rem);
+                chk_tail();
+                rd_cnt = tail_rem;
+            }
+        }
+        else if (hdr_xid_dblks - rec_offs_dblks <= max_size_dblks)
+        {
+            // Remainder of xid fits within this page, tail split
+            const std::size_t xid_offs = rec_offs - sizeof(txn_hdr_t);
+            const std::size_t xid_rem = _txn_hdr._xidsize - xid_offs;
+            std::memcpy((char*)_buff + xid_offs, rptr, xid_rem);
+            rd_cnt += xid_rem;
+            const std::size_t tail_rem = (max_size_dblks * QLS_DBLK_SIZE_BYTES) - rd_cnt;
+            if (tail_rem)
+            {
+                std::memcpy((void*)&_txn_tail, ((char*)rptr + xid_rem), tail_rem);
+                rd_cnt += tail_rem;
+            }
+        }
+        else
+        {
+            // Remainder of xid split
+            const std::size_t xid_cp_size = (max_size_dblks * QLS_DBLK_SIZE_BYTES);
+            std::memcpy((char*)_buff + rec_offs - sizeof(txn_hdr_t), rptr, xid_cp_size);
+            rd_cnt += xid_cp_size;
+        }
+    }
+    else // Start of record
+    {
+        // Get and check header
+        //_txn_hdr.hdr_copy(h);
+        ::rec_hdr_copy(&_txn_hdr._rhdr, &h);
+        rd_cnt = sizeof(rec_hdr_t);
+#if defined(JRNL_BIG_ENDIAN) && defined(JRNL_32_BIT)
+        rd_cnt += sizeof(uint32_t); // Filler 0
+#endif
+        _txn_hdr._xidsize = *(std::size_t*)((char*)rptr + rd_cnt);
+        rd_cnt = sizeof(txn_hdr_t);
+        chk_hdr();
+        _buff = std::malloc(_txn_hdr._xidsize);
+        MALLOC_CHK(_buff, "_buff", "txn_rec", "decode");
+        const uint32_t hdr_xid_dblks = size_dblks(sizeof(txn_hdr_t) + _txn_hdr._xidsize);
+        const uint32_t hdr_xid_tail_dblks = size_dblks(sizeof(txn_hdr_t) + _txn_hdr._xidsize +
+                sizeof(rec_tail_t));
+
+        // Check if record (header + xid + tail) fits within this page, we can check the
+        // tail before the expense of copying data to memory
+        if (hdr_xid_tail_dblks <= max_size_dblks)
+        {
+            // Entire header, xid and tail fits within this page
+            std::memcpy(_buff, (char*)rptr + rd_cnt, _txn_hdr._xidsize);
+            rd_cnt += _txn_hdr._xidsize;
+            std::memcpy((void*)&_txn_tail, (char*)rptr + rd_cnt, sizeof(_txn_tail));
+            rd_cnt += sizeof(_txn_tail);
+            chk_tail();
+        }
+        else if (hdr_xid_dblks <= max_size_dblks)
+        {
+            // Entire header and xid fit within this page, tail split
+            std::memcpy(_buff, (char*)rptr + rd_cnt, _txn_hdr._xidsize);
+            rd_cnt += _txn_hdr._xidsize;
+            const std::size_t tail_rem = (max_size_dblks * QLS_DBLK_SIZE_BYTES) - rd_cnt;
+            if (tail_rem)
+            {
+                std::memcpy((void*)&_txn_tail, (char*)rptr + rd_cnt, tail_rem);
+                rd_cnt += tail_rem;
+            }
+        }
+        else
+        {
+            // Header fits within this page, xid split
+            const std::size_t xid_cp_size = (max_size_dblks * QLS_DBLK_SIZE_BYTES) - rd_cnt;
+            std::memcpy(_buff, (char*)rptr + rd_cnt, xid_cp_size);
+            rd_cnt += xid_cp_size;
+        }
+    }
+    return size_dblks(rd_cnt);
+}
+
+bool
+txn_rec::rcv_decode(rec_hdr_t h, std::ifstream* ifsp, std::size_t& rec_offs)
+{
+    if (rec_offs == 0)
+    {
+        // Read header, allocate for xid
+        //_txn_hdr.hdr_copy(h);
+        ::rec_hdr_copy(&_txn_hdr._rhdr, &h);
+#if defined(JRNL_BIG_ENDIAN) && defined(JRNL_32_BIT)
+        ifsp->ignore(sizeof(uint32_t)); // _filler0
+#endif
+        ifsp->read((char*)&_txn_hdr._xidsize, sizeof(std::size_t));
+#if defined(JRNL_LITTLE_ENDIAN) && defined(JRNL_32_BIT)
+        ifsp->ignore(sizeof(uint32_t)); // _filler0
+#endif
+        rec_offs = sizeof(txn_hdr_t);
+        _buff = std::malloc(_txn_hdr._xidsize);
+        MALLOC_CHK(_buff, "_buff", "txn_rec", "rcv_decode");
+    }
+    if (rec_offs < sizeof(txn_hdr_t) + _txn_hdr._xidsize)
+    {
+        // Read xid (or continue reading xid)
+        std::size_t offs = rec_offs - sizeof(txn_hdr_t);
+        ifsp->read((char*)_buff + offs, _txn_hdr._xidsize - offs);
+        std::size_t size_read = ifsp->gcount();
+        rec_offs += size_read;
+        if (size_read < _txn_hdr._xidsize - offs)
+        {
+            assert(ifsp->eof());
+            // As we may have read past eof, turn off fail bit
+            ifsp->clear(ifsp->rdstate()&(~std::ifstream::failbit));
+            assert(!ifsp->fail() && !ifsp->bad());
+            return false;
+        }
+    }
+    if (rec_offs < sizeof(txn_hdr_t) + _txn_hdr._xidsize + sizeof(rec_tail_t))
+    {
+        // Read tail (or continue reading tail)
+        std::size_t offs = rec_offs - sizeof(txn_hdr_t) - _txn_hdr._xidsize;
+        ifsp->read((char*)&_txn_tail + offs, sizeof(rec_tail_t) - offs);
+        std::size_t size_read = ifsp->gcount();
+        rec_offs += size_read;
+        if (size_read < sizeof(rec_tail_t) - offs)
+        {
+            assert(ifsp->eof());
+            // As we may have read past eof, turn off fail bit
+            ifsp->clear(ifsp->rdstate()&(~std::ifstream::failbit));
+            assert(!ifsp->fail() && !ifsp->bad());
+            return false;
+        }
+    }
+    ifsp->ignore(rec_size_dblks() * QLS_DBLK_SIZE_BYTES - rec_size());
+    chk_tail(); // Throws if tail invalid or record incomplete
+    assert(!ifsp->fail() && !ifsp->bad());
+    return true;
+}
+
+std::size_t
+txn_rec::get_xid(void** const xidpp)
+{
+    if (!_buff)
+    {
+        *xidpp = 0;
+        return 0;
+    }
+    *xidpp = _buff;
+    return _txn_hdr._xidsize;
+}
+
+std::string&
+txn_rec::str(std::string& str) const
+{
+    std::ostringstream oss;
+    if (_txn_hdr._rhdr._magic == QLS_TXA_MAGIC)
+        oss << "dtxa_rec: m=" << _txn_hdr._rhdr._magic;
+    else
+        oss << "dtxc_rec: m=" << _txn_hdr._rhdr._magic;
+    oss << " v=" << (int)_txn_hdr._rhdr._version;
+    oss << " rid=" << _txn_hdr._rhdr._rid;
+    oss << " xid=\"" << _xidp << "\"";
+    str.append(oss.str());
+    return str;
+}
+
+std::size_t
+txn_rec::xid_size() const
+{
+    return _txn_hdr._xidsize;
+}
+
+std::size_t
+txn_rec::rec_size() const
+{
+    return sizeof(txn_hdr_t) + _txn_hdr._xidsize + sizeof(rec_tail_t);
+}
+
+void
+txn_rec::chk_hdr() const
+{
+    jrec::chk_hdr(_txn_hdr._rhdr);
+    if (_txn_hdr._rhdr._magic != QLS_TXA_MAGIC && _txn_hdr._rhdr._magic != QLS_TXC_MAGIC)
+    {
+        std::ostringstream oss;
+        oss << std::hex << std::setfill('0');
+        oss << "dtx magic: rid=0x" << std::setw(16) << _txn_hdr._rhdr._rid;
+        oss << ": expected=(0x" << std::setw(8) << QLS_TXA_MAGIC;
+        oss << " or 0x" << QLS_TXC_MAGIC;
+        oss << ") read=0x" << std::setw(2) << (int)_txn_hdr._rhdr._magic;
+        throw jexception(jerrno::JERR_JREC_BADRECHDR, oss.str(), "txn_rec", "chk_hdr");
+    }
+}
+
+void
+txn_rec::chk_hdr(uint64_t rid) const
+{
+    chk_hdr();
+    jrec::chk_rid(_txn_hdr._rhdr, rid);
+}
+
+void
+txn_rec::chk_tail() const
+{
+    jrec::chk_tail(_txn_tail, _txn_hdr._rhdr);
+}
+
+void
+txn_rec::clean()
+{
+    // clean up allocated memory here
+}
+
+}}

Added: qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/txn_rec.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/txn_rec.h?rev=1534736&view=auto
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/txn_rec.h (added)
+++ qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/txn_rec.h Tue Oct 22 19:09:56 2013
@@ -0,0 +1,88 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#ifndef QPID_LEGACYSTORE_JRNL_TXN_REC_H
+#define QPID_LEGACYSTORE_JRNL_TXN_REC_H
+
+namespace qpid
+{
+namespace qls_jrnl
+{
+class txn_rec;
+}}
+
+#include <cstddef>
+#include "qpid/linearstore/jrnl/jrec.h"
+#include "qpid/linearstore/jrnl/utils/txn_hdr.h"
+
+namespace qpid
+{
+namespace qls_jrnl
+{
+
+    /**
+    * \class txn_rec
+    * \brief Class to handle a single journal DTX commit or abort record.
+    */
+    class txn_rec : public jrec
+    {
+    private:
+        txn_hdr_t _txn_hdr;     ///< transaction header
+        const void* _xidp;      ///< xid pointer for encoding (writing to disk)
+        void* _buff;            ///< Pointer to buffer to receive data read from disk
+        rec_tail_t _txn_tail;   ///< Record tail
+
+    public:
+        // constructor used for read operations and xid must have memory allocated
+        txn_rec();
+        // constructor used for write operations, where xid already exists
+        txn_rec(const uint32_t magic, const uint64_t rid, const void* const xidp,
+                const std::size_t xidlen/*, const bool owi*/);
+        virtual ~txn_rec();
+
+        // Prepare instance for use in reading data from journal
+        void reset(const uint32_t magic);
+        // Prepare instance for use in writing data to journal
+        void reset(const uint32_t magic, const  uint64_t rid, const void* const xidp,
+                const std::size_t xidlen/*, const bool owi*/);
+        uint32_t encode(void* wptr, uint32_t rec_offs_dblks, uint32_t max_size_dblks);
+        uint32_t decode(rec_hdr_t& h, void* rptr, uint32_t rec_offs_dblks,
+                uint32_t max_size_dblks);
+        // Decode used for recover
+        bool rcv_decode(rec_hdr_t h, std::ifstream* ifsp, std::size_t& rec_offs);
+
+        std::size_t get_xid(void** const xidpp);
+        std::string& str(std::string& str) const;
+        inline std::size_t data_size() const { return 0; } // This record never carries data
+        std::size_t xid_size() const;
+        std::size_t rec_size() const;
+        inline uint64_t rid() const { return _txn_hdr._rhdr._rid; }
+
+    private:
+        void chk_hdr() const;
+        void chk_hdr(uint64_t rid) const;
+        void chk_tail() const;
+        virtual void clean();
+    }; // class txn_rec
+
+}}
+
+#endif // ifndef QPID_LEGACYSTORE_JRNL_TXN_REC_H

Added: qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/utils/deq_hdr.c
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/utils/deq_hdr.c?rev=1534736&view=auto
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/utils/deq_hdr.c (added)
+++ qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/utils/deq_hdr.c Tue Oct 22 19:09:56 2013
@@ -0,0 +1,46 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "deq_hdr.h"
+
+/*static const uint16_t DEQ_HDR_TXNCMPLCOMMIT_MASK = 0x10;*/
+
+void deq_hdr_init(deq_hdr_t* dest, const uint32_t magic, const uint16_t version, const uint16_t uflag, const uint64_t
+                  rid, const uint64_t deq_rid, const uint64_t xidsize) {
+    rec_hdr_init(&dest->_rhdr, magic, version, uflag, rid);
+    dest->_deq_rid = deq_rid;
+    dest->_xidsize = xidsize;
+}
+
+void deq_hdr_copy(deq_hdr_t* dest, const deq_hdr_t* src) {
+    rec_hdr_copy(&dest->_rhdr, &src->_rhdr);
+    dest->_deq_rid = src->_deq_rid;
+    dest->_xidsize = src->_xidsize;
+}
+
+bool is_txn_coml_commit(const deq_hdr_t *dh) {
+    return dh->_rhdr._uflag & DEQ_HDR_TXNCMPLCOMMIT_MASK;
+}
+
+void set_txn_coml_commit(deq_hdr_t *dh, const bool commit) {
+    dh->_rhdr._uflag = commit ? dh->_rhdr._uflag | DEQ_HDR_TXNCMPLCOMMIT_MASK :
+                                dh->_rhdr._uflag & (~DEQ_HDR_TXNCMPLCOMMIT_MASK);
+}

Added: qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/utils/deq_hdr.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/utils/deq_hdr.h?rev=1534736&view=auto
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/utils/deq_hdr.h (added)
+++ qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/utils/deq_hdr.h Tue Oct 22 19:09:56 2013
@@ -0,0 +1,81 @@
+#ifndef QPID_LINEARSTORE_JRNL_UTILS_DEQ_HDR_H
+#define QPID_LINEARSTORE_JRNL_UTILS_DEQ_HDR_H
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include <stdbool.h>
+#include "rec_hdr.h"
+
+#ifdef __cplusplus
+extern "C"{
+#endif
+
+#pragma pack(1)
+
+/**
+ * \brief Struct for dequeue record.
+ *
+ * Struct for dequeue record. If this record has a non-zero xidsize field (i.e., there is a
+ * valid XID), then this header is followed by the XID of xidsize bytes and a rec_tail. If,
+ * on the other hand, this record has a zero xidsize (i.e., there is no XID), then the rec_tail
+ * is absent.
+ *
+ * Note that this record had its own rid distinct from the rid of the record it is dequeueing.
+ * The rid field below is the rid of the dequeue record itself; the deq-rid field is the rid of a
+ * previous enqueue record being dequeued by this record.
+ *
+ * Record header info in binary format (32 bytes):
+ * <pre>
+ *   0                           7
+ * +---+---+---+---+---+---+---+---+  -+
+ * |     magic     |  ver  | flags |   |
+ * +---+---+---+---+---+---+---+---+   | struct rec_hdr_t
+ * |              rid              |   |
+ * +---+---+---+---+---+---+---+---+  -+
+ * |            deq-rid            |
+ * +---+---+---+---+---+---+---+---+
+ * |            xidsize            |
+ * +---+---+---+---+---+---+---+---+
+ *
+ * deq-rid = dequeue record ID
+ * </pre>
+ */
+typedef struct deq_hdr_t {
+    rec_hdr_t _rhdr;		/**< Common record header struct */
+    uint64_t  _deq_rid;		/**< Record ID of record being dequeued */
+    uint64_t  _xidsize;		/**< XID size */
+} deq_hdr_t;
+
+static const uint16_t DEQ_HDR_TXNCMPLCOMMIT_MASK = 0x10;
+
+void deq_hdr_init(deq_hdr_t* dest, const uint32_t magic, const uint16_t version, const uint16_t uflag,
+                  const uint64_t rid, const uint64_t deq_rid, const uint64_t xidsize);
+void deq_hdr_copy(deq_hdr_t* dest, const deq_hdr_t* src);
+bool is_txn_coml_commit(const deq_hdr_t *dh);
+void set_txn_coml_commit(deq_hdr_t *dh, const bool commit);
+
+#pragma pack()
+
+#ifdef __cplusplus
+}
+#endif
+
+#endif /* ifndef QPID_LINEARSTORE_JRNL_UTILS_DEQ_HDR_H */

Added: qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/utils/enq_hdr.c
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/utils/enq_hdr.c?rev=1534736&view=auto
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/utils/enq_hdr.c (added)
+++ qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/utils/enq_hdr.c Tue Oct 22 19:09:56 2013
@@ -0,0 +1,63 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "enq_hdr.h"
+
+//static const uint16_t ENQ_HDR_TRANSIENT_MASK = 0x10;
+//static const uint16_t ENQ_HDR_EXTERNAL_MASK = 0x20;
+
+void enq_hdr_init(enq_hdr_t* dest, const uint32_t magic, const uint16_t version, const uint16_t uflag,
+                  const uint64_t rid, const uint64_t xidsize, const uint64_t dsize) {
+    rec_hdr_init(&dest->_rhdr, magic, version, uflag, rid);
+    dest->_xidsize = xidsize;
+    dest->_dsize = dsize;
+}
+
+void enq_hdr_copy(enq_hdr_t* dest, const enq_hdr_t* src) {
+    rec_hdr_copy(&dest->_rhdr, &src->_rhdr);
+    dest->_xidsize = src->_xidsize;
+    dest->_dsize = src->_dsize;
+}
+
+bool is_enq_transient(const enq_hdr_t *eh) {
+    return eh->_rhdr._uflag & ENQ_HDR_TRANSIENT_MASK;
+}
+
+void set_enq_transient(enq_hdr_t *eh, const bool transient) {
+    eh->_rhdr._uflag = transient ? eh->_rhdr._uflag | ENQ_HDR_TRANSIENT_MASK :
+                                   eh->_rhdr._uflag & (~ENQ_HDR_TRANSIENT_MASK);
+}
+
+bool is_enq_external(const enq_hdr_t *eh) {
+    return eh->_rhdr._uflag & ENQ_HDR_EXTERNAL_MASK;
+}
+
+void set_enq_external(enq_hdr_t *eh, const bool external) {
+    eh->_rhdr._uflag = external ? eh->_rhdr._uflag | ENQ_HDR_EXTERNAL_MASK :
+                                  eh->_rhdr._uflag & (~ENQ_HDR_EXTERNAL_MASK);
+}
+
+bool validate_enq_hdr(enq_hdr_t *eh, const uint32_t magic, const uint16_t version, const uint64_t rid) {
+    return eh->_rhdr._magic == magic &&
+           eh->_rhdr._version == version &&
+           rid > 0 ? eh->_rhdr._rid == rid /* If rid == 0, don't compare rids */
+                   : true;
+}

Added: qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/utils/enq_hdr.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/utils/enq_hdr.h?rev=1534736&view=auto
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/utils/enq_hdr.h (added)
+++ qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/utils/enq_hdr.h Tue Oct 22 19:09:56 2013
@@ -0,0 +1,81 @@
+#ifndef QPID_LINEARSTORE_JRNL_UTILS_ENQ_HDR_H
+#define QPID_LINEARSTORE_JRNL_UTILS_ENQ_HDR_H
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include <stdbool.h>
+#include "rec_hdr.h"
+
+#ifdef __cplusplus
+extern "C"{
+#endif
+
+#pragma pack(1)
+
+/**
+ * \brief Struct for enqueue record.
+ *
+ * Struct for enqueue record. In addition to the common data, this header includes both the
+ * xid and data blob sizes.
+ *
+ * This header precedes all enqueue data in journal files.
+ *
+ * Record header info in binary format (32 bytes):
+ * <pre>
+ *   0                           7
+ * +---+---+---+---+---+---+---+---+  -+
+ * |     magic     |  ver  | flags |   |
+ * +---+---+---+---+---+---+---+---+   | struct rec_hdr_t
+ * |              rid              |   |
+ * +---+---+---+---+---+---+---+---+  -+
+ * |            xidsize            |
+ * +---+---+---+---+---+---+---+---+
+ * |             dsize             |
+ * +---+---+---+---+---+---+---+---+
+ * v = file version (If the format or encoding of this file changes, then this
+ *     number should be incremented)
+ * </pre>
+ */
+typedef struct enq_hdr_t {
+    rec_hdr_t  _rhdr;		/**< Common record header struct */
+    uint64_t  _xidsize;		/**< XID size in octets */
+    uint64_t  _dsize;		/**< Record data size in octets */
+} enq_hdr_t;
+
+static const uint16_t ENQ_HDR_TRANSIENT_MASK = 0x10;
+static const uint16_t ENQ_HDR_EXTERNAL_MASK = 0x20;
+
+void enq_hdr_init(enq_hdr_t* dest, const uint32_t magic, const uint16_t version, const uint16_t uflag,
+                  const uint64_t rid, const uint64_t xidsize, const uint64_t dsize);
+void enq_hdr_copy(enq_hdr_t* dest, const enq_hdr_t* src);
+bool is_enq_transient(const enq_hdr_t *eh);
+void set_enq_transient(enq_hdr_t *eh, const bool transient);
+bool is_enq_external(const enq_hdr_t *eh);
+void set_enq_external(enq_hdr_t *eh, const bool external);
+bool validate_enq_hdr(enq_hdr_t *eh, const uint32_t magic, const uint16_t version, const uint64_t rid);
+
+#pragma pack()
+
+#ifdef __cplusplus
+}
+#endif
+
+#endif /* ifndef QPID_LINEARSTORE_JRNL_UTILS_ENQ_HDR_H */

Added: qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/utils/file_hdr.c
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/utils/file_hdr.c?rev=1534736&view=auto
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/utils/file_hdr.c (added)
+++ qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/utils/file_hdr.c Tue Oct 22 19:09:56 2013
@@ -0,0 +1,105 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "file_hdr.h"
+#include <string.h>
+
+void file_hdr_create(file_hdr_t* dest, const uint32_t magic, const uint16_t version, const uint16_t fhdr_size_sblks,
+                    const uint16_t efp_partition, const uint64_t file_size) {
+    rec_hdr_init(&dest->_rhdr, magic, version, 0, 0);
+    dest->_fhdr_size_sblks = fhdr_size_sblks;
+    dest->_efp_partition = efp_partition;
+    dest->_reserved = 0;
+    dest->_file_size_kib = file_size;
+    dest->_fro = 0;
+    dest->_ts_nsec = 0;
+    dest->_ts_sec = 0;
+    dest->_file_number = 0;
+    dest->_queue_name_len = 0;
+}
+
+int file_hdr_init(void* dest, const uint64_t dest_len, const uint16_t uflag, const uint64_t rid, const uint64_t fro,
+                  const uint64_t file_number, const uint16_t queue_name_len, const char* queue_name) {
+    file_hdr_t* fhp = (file_hdr_t*)dest;
+    fhp->_rhdr._uflag = uflag;
+    fhp->_rhdr._rid = rid;
+    fhp->_fro = fro;
+    fhp->_file_number = file_number;
+    if (sizeof(file_hdr_t) + queue_name_len < MAX_FILE_HDR_LEN) {
+        fhp->_queue_name_len = queue_name_len;
+    } else {
+        fhp->_queue_name_len = MAX_FILE_HDR_LEN - sizeof(file_hdr_t);
+    }
+    fhp->_queue_name_len = queue_name_len;
+    memcpy((char*)dest + sizeof(file_hdr_t), queue_name, queue_name_len);
+    memset((char*)dest + sizeof(file_hdr_t) + queue_name_len, 0, dest_len - sizeof(file_hdr_t) - queue_name_len);
+    return set_time_now(dest);
+}
+
+void file_hdr_copy(file_hdr_t* dest, const file_hdr_t* src) {
+    rec_hdr_copy(&dest->_rhdr, &src->_rhdr);
+    dest->_fhdr_size_sblks = src->_fhdr_size_sblks; // Should this be copied?
+    dest->_efp_partition = src->_efp_partition;     // Should this be copied?
+    dest->_file_size_kib = src->_file_size_kib;
+    dest->_fro = src->_fro;
+    dest->_ts_sec = src->_ts_sec;
+    dest->_ts_nsec = src->_ts_nsec;
+    dest->_file_number = src->_file_number;
+}
+
+void file_hdr_reset(file_hdr_t* target) {
+    target->_rhdr._uflag = 0;
+    target->_rhdr._rid = 0;
+    target->_fro = 0;
+    target->_ts_sec = 0;
+    target->_ts_nsec = 0;
+    target->_file_number = 0;
+    target->_queue_name_len = 0;
+}
+
+int is_file_hdr_reset(file_hdr_t* target) {
+    return target->_rhdr._uflag == 0 &&
+           target->_rhdr._rid == 0 &&
+           target->_ts_sec == 0 &&
+           target->_ts_nsec == 0 &&
+           target->_file_number == 0 &&
+           target->_queue_name_len == 0;
+}
+
+int set_time_now(file_hdr_t *fh)
+{
+    struct timespec ts;
+    int    err = clock_gettime(CLOCK_REALTIME, &ts);
+    if (err)
+        return err;
+    fh->_ts_sec = ts.tv_sec;
+    fh->_ts_nsec = ts.tv_nsec;
+    return 0;
+}
+
+
+void set_time(file_hdr_t *fh, struct timespec *ts)
+{
+    fh->_ts_sec  = ts->tv_sec;
+    fh->_ts_nsec = ts->tv_nsec;
+}
+
+



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org