You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by tr...@apache.org on 2013/07/10 20:20:20 UTC

svn commit: r1501895 [8/10] - in /qpid/branches/linearstore/qpid/cpp/src: ./ qpid/linearstore/ qpid/linearstore/jrnl/

Added: qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/lpmgr.h
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/lpmgr.h?rev=1501895&view=auto
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/lpmgr.h (added)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/lpmgr.h Wed Jul 10 18:20:19 2013
@@ -0,0 +1,303 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+/**
+ * \file lpmgr.h
+ *
+ * Qpid asynchronous store plugin library
+ *
+ * Class mrg::journal::lpmgr. See class documentation for details.
+ *
+ * \author Kim van der Riet
+ */
+
+#ifndef QPID_LEGACYSTORE_JRNL_LPMGR_H
+#define QPID_LEGACYSTORE_JRNL_LPMGR_H
+
+namespace mrg
+{
+namespace journal
+{
+    class jcntl;
+    class lpmgr;
+}
+}
+
+#include "qpid/legacystore/jrnl/fcntl.h"
+#include <vector>
+
+namespace mrg
+{
+namespace journal
+{
+
+    /**
+    * \brief LFID-PFID manager. This class maps the logical file id (lfid) to the physical file id (pfid) so that files
+    * may be inserted into the file ring buffer in (nearly) arbitrary logical locations while the physical ids continue
+    * to be appended. NOTE: NOT THREAD SAFE.
+    *
+    * The entire functionality of the LFID-PFID manager is to maintain an array of pointers to fcntl objects which have
+    * a one-to-one relationship to the physical %journal files. The logical file id (lfid) is used as an index to the
+    * array to read the mapped physical file id (pfid). By altering the order of these pointers within the array, the
+    * mapping of logical to physical files may be altered. This can be used to allow for the logical insertion of
+    * %journal files into a ring buffer, even though the physical file ids must be appended to those that preceded them.
+    *
+    * Since the insert() operation uses after-lfid as its position parameter, it is not possible to insert before lfid
+    * 0 - i.e. It is only possible to insert after an existing lfid. Consequently, lfid 0 and pfid 0 are always
+    * coincident in a %journal. Note, however, that inserting before lfid 0 is logically equivilent to inserting after
+    * the last lfid.
+    *
+    * When one or more files are inserted after a particular lfid, the lfids of the following files are incremented. The
+    * pfids of the inserted files follow those of all existing files, thus leading to a lfid-pfid discreppancy (ie no
+    * longer a one-to-one mapping):
+    *
+    * Example: Before insertion, %journal file headers would look as follows:
+    * <pre>
+    *          Logical view (sorted by lfid):               Physical view (sorted by pfid):
+    *          +---+---+---+---+---+---+                    +---+---+---+---+---+---+
+    * pfid --> | 0 | 1 | 2 | 3 | 4 | 5 |           pfid --> | 0 | 1 | 2 | 3 | 4 | 5 |
+    * lfid --> | 0 | 1 | 2 | 3 | 4 | 5 |           lfid --> | 0 | 1 | 2 | 3 | 4 | 5 |
+    *          +---+---+---+---+---+---+                    +---+---+---+---+---+---+
+    * </pre>
+    *
+    * After insertion of 2 files after lid 2 (marked with *s):
+    * <pre>
+    *          Logical view (sorted by lfid):               Physical view (sorted by pfid):
+    *          +---+---+---+---+---+---+---+---+            +---+---+---+---+---+---+---+---+
+    * pfid --> | 0 | 1 | 2 |*6*|*7*| 3 | 4 | 5 |   pfid --> | 0 | 1 | 2 | 3 | 4 | 5 |*6*|*7*|
+    * lfid --> | 0 | 1 | 2 |*3*|*4*| 5 | 6 | 7 |   lfid --> | 0 | 1 | 2 | 5 | 6 | 7 |*3*|*4*|
+    *          +---+---+---+---+---+---+---+---+            +---+---+---+---+---+---+---+---+
+    * </pre>
+    *
+    * The insert() function updates the internal map immediately, but the physical files (which have both the pfid and
+    * lfid written into the file header) are only updated as they are overwritten in the normal course of enqueueing
+    * and dequeueing messages. If the %journal should fail after insertion but before the files following those inserted
+    * are overwritten, then duplicate lfids will be present (though no duplicate pfids are possible). The overwrite
+    * indicator (owi) flag and the pfid numbers may be used to resolve the ambiguity and determine the logically earlier
+    * lfid in this case.
+    *
+    * Example: Before insertion, the current active write file being lfid/pfid 2 as determined by the owi flag, %journal
+    * file headers would look as follows:
+    * <pre>
+    *          Logical view (sorted by lfid):               Physical view (sorted by pfid):
+    *          +---+---+---+---+---+---+                    +---+---+---+---+---+---+
+    * pfid --> | 0 | 1 | 2 | 3 | 4 | 5 |           pfid --> | 0 | 1 | 2 | 3 | 4 | 5 |
+    * lfid --> | 0 | 1 | 2 | 3 | 4 | 5 |           lfid --> | 0 | 1 | 2 | 3 | 4 | 5 |
+    *  owi --> | t | t | t | f | f | f |            owi --> | t | t | t | f | f | f |
+    *          +---+---+---+---+---+---+                    +---+---+---+---+---+---+
+    * </pre>
+    *
+    * After inserting 2 files after lfid 2 and then 3 (the newly inserted file) - marked with *s:
+    * <pre>
+    *          Logical view (sorted by lfid):               Physical view (sorted by pfid):
+    *          +---+---+---+---+---+---+---+---+            +---+---+---+---+---+---+---+---+
+    * pfid --> | 0 | 1 | 2 |*6*|*7*| 3 | 4 | 5 |   pfid --> | 0 | 1 | 2 | 3 | 4 | 5 |*3*|*4*|
+    * lfid --> | 0 | 1 | 2 |*3*|*4*| 3 | 4 | 5 |   lfid --> | 0 | 1 | 2 | 3 | 4 | 5 |*3*|*4*|
+    *  owi --> | t | t | t | t | t | f | f | f |    owi --> | t | t | t | f | f | f | t | t |
+    *          +---+---+---+---+---+---+---+---+            +---+---+---+---+---+---+---+---+
+    * </pre>
+    *
+    * If a broker failure occurs at this point, then there are two independent tests that may be made to resolve
+    * duplicate lfids during recovery in such cases:
+    * <ol>
+    *   <li>The correct lfid has owi flag that matches that of pfid/lfid 0</li>
+    *   <li>The most recently inserted (hence correct) lfid has pfids that are higher than the duplicate that was not
+    *       overwritten</li>
+    * </ol>
+    *
+    * NOTE: NOT THREAD SAFE. Provide external thread protection if used in multi-threaded environments.
+    */
+    class lpmgr
+    {
+    public:
+        /**
+        * \brief Function pointer to function that will create a new fcntl object and return its pointer.
+        *
+        * \param jcp        Pointer to jcntl instance from which journal file details will be obtained.
+        * \param lfid       Logical file ID for new fcntl instance.
+        * \param pfid       Physical file ID for file associated with new fcntl instance.
+        * \param rdp        Pointer to rcvdat instance which conatins recovery information for new fcntl instance when
+        *                   recovering an existing file, or null if a new file is to be created.
+        */
+        typedef fcntl* (new_obj_fn_ptr)(jcntl* const jcp,
+                                        const u_int16_t lfid,
+                                        const u_int16_t pfid,
+                                        const rcvdat* const rdp);
+
+    private:
+        bool _ae;                       ///< Auto-expand mode
+        u_int16_t _ae_max_jfiles;       ///< Max file count for auto-expansion; 0 = no limit
+        std::vector<fcntl*> _fcntl_arr; ///< Array of pointers to fcntl objects
+
+    public:
+        lpmgr();
+        virtual ~lpmgr();
+
+        /**
+        * \brief Initialize from scratch for a known number of %journal files. All lfid values are identical to pfid
+        * values (which is normal before any inserts have occurred).
+        *
+        * \param num_jfiles Number of files to be created, and consequently the number of fcntl objects in array
+        *                   _fcntl_arr.
+        * \param ae         If true, allows auto-expansion; if false, disables auto-expansion.
+        * \param ae_max_jfiles The maximum number of files allowed for auto-expansion. Cannot be lower than the current
+        *                   number of files. However, a zero value disables the limit checks, and allows unlimited
+        *                   expansion.
+        * \param jcp        Pointer to jcntl instance. This is used to find the file path and base filename so that
+        *                   new files may be created.
+        * \param fp         Pointer to function which creates and returns a pointer to a new fcntl object (and hence
+        *                   causes a new %journal file to be created).
+        */
+        void initialize(const u_int16_t num_jfiles,
+                        const bool ae,
+                        const u_int16_t ae_max_jfiles,
+                        jcntl* const jcp,
+                        new_obj_fn_ptr fp);
+
+        /**
+        * \brief Initialize from a known lfid-pfid map pfid_list (within rcvdat param rd), which is usually obtained
+        * from a recover. The index of pfid_list is the logical file id (lfid); the value contained in the vector is
+        * the physical file id (pfid).
+        *
+        * \param rd         Ref to rcvdat struct which contains recovery data and the pfid_list.
+        * \param jcp        Pointer to jcntl instance. This is used to find the file path and base filename so that
+        *                   new files may be created.
+        * \param fp         Pointer to function which creates and returns a pointer to a new fcntl object (and hence
+        *                   causes a new %journal file to be created).
+        */
+        void recover(const rcvdat& rd,
+                     jcntl* const jcp,
+                     new_obj_fn_ptr fp);
+
+        /**
+        * \brief Insert num_jfiles files after lfid index after_lfid. This causes all lfids after after_lfid to be
+        * increased by num_jfiles.
+        *
+        * Note that it is not possible to insert <i>before</i> lfid 0, and thus lfid 0 should always point to pfid 0.
+        * Inserting before lfid 0 is logically equivilent to inserting after the last lfid in a circular buffer.
+        *
+        * \param after_lfid Lid index after which to insert file(s).
+        * \param jcp        Pointer to jcntl instance. This is used to find the file path and base filename so that
+        *                   new files may be created.
+        * \param fp         Pointer to function which creates and returns a pointer to a new fcntl object (and hence
+        *                   causes a new %journal file to be created).
+        * \param num_jfiles The number of files by which to increase.
+        */
+        void insert(const u_int16_t after_lfid,
+                    jcntl* const jcp,
+                    new_obj_fn_ptr fp,
+                    const u_int16_t num_jfiles = 1);
+
+        /**
+        * \brief Clears _fcntl_arr and deletes all fcntl instances.
+        */
+        void finalize();
+
+        /**
+        * \brief Returns true if initialized; false otherwise. After construction, will return false until initialize()
+        * is called; thereafter true until finalize() is called, whereupon it will return false again.
+        *
+        * \return True if initialized; false otherwise.
+        */
+        inline bool is_init() const { return _fcntl_arr.size() > 0; }
+
+        /**
+        * \brief Returns true if auto-expand mode is enabled; false if not.
+        *
+        * \return True if auto-expand mode is enabled; false if not.
+        */
+        inline bool is_ae() const { return _ae; }
+
+        /**
+        * \brief Sets the auto-expand mode to enabled if ae is true, to disabled otherwise. The value of _ae_max_jfiles
+        * must be valid to succeed (i.e. _ae_max_jfiles must be greater than the current number of files or be zero).
+        *
+        * \param ae         If true will enable auto-expand mode; if false will disable it.
+        */
+        void set_ae(const bool ae);
+
+        /**
+        * \brief Returns the number of %journal files, including any that were appended or inserted since
+        * initialization.
+        *
+        * \return Number of %journal files if initialized; 0 otherwise.
+        */
+        inline u_int16_t num_jfiles() const { return static_cast<u_int16_t>(_fcntl_arr.size()); }
+
+        /**
+        * \brief Returns the maximum number of files allowed for auto-expansion.
+        *
+        * \return Maximum number of files allowed for auto-expansion. A zero value represents a disabled limit
+        *   - i.e. unlimited expansion.
+        */
+        inline u_int16_t ae_max_jfiles() const { return _ae_max_jfiles; }
+
+        /**
+        * \brief Sets the maximum number of files allowed for auto-expansion. A zero value disables the limit.
+        *
+        * \param ae_max_jfiles The maximum number of files allowed for auto-expansion. Cannot be lower than the current
+        *                   number of files. However, a zero value disables the limit checks, and allows unlimited
+        *                   expansion.
+        */
+        void set_ae_max_jfiles(const u_int16_t ae_max_jfiles);
+
+        /**
+        * \brief Calculates the number of future files available for auto-expansion.
+        *
+        * \return The number of future files available for auto-expansion.
+        */
+        u_int16_t ae_jfiles_rem() const;
+
+        /**
+        * \brief Get a pointer to fcntl instance for a given lfid.
+        *
+        * \return Pointer to fcntl object corresponding to logical file id lfid, or 0 if lfid is out of range
+        *   (greater than number of files in use).
+        */
+        inline fcntl* get_fcntlp(const u_int16_t lfid) const
+                { if (lfid >= _fcntl_arr.size()) return 0; return _fcntl_arr[lfid]; }
+
+        // Testing functions
+        void get_pfid_list(std::vector<u_int16_t>& pfid_list) const;
+        void get_lfid_list(std::vector<u_int16_t>& lfid_list) const;
+
+    protected:
+
+        /**
+        * \brief Append num_jfiles files to the end of the logical and file id sequence. This is similar to extending
+        * the from-scratch initialization.
+        *
+        * \param jcp        Pointer to jcntl instance. This is used to find the file path and base filename so that
+        *                   new files may be created.
+        * \param fp         Pointer to function which creates and returns a pointer to a new fcntl object (and hence
+        *                   causes a new %journal file to be created).
+        * \param num_jfiles The number of files by which to increase.
+        */
+        void append(jcntl* const jcp,
+                    new_obj_fn_ptr fp,
+                    const u_int16_t num_jfiles = 1);
+
+    };
+
+} // namespace journal
+} // namespace mrg
+
+#endif // ifndef QPID_LEGACYSTORE_JRNL_LPMGR_H

Propchange: qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/lpmgr.h
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/lpmgr.h
------------------------------------------------------------------------------
    svn:keywords = Author Date Id Rev URL

Added: qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/pmgr.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/pmgr.cpp?rev=1501895&view=auto
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/pmgr.cpp (added)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/pmgr.cpp Wed Jul 10 18:20:19 2013
@@ -0,0 +1,215 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+/**
+ * \file pmgr.cpp
+ *
+ * Qpid asynchronous store plugin library
+ *
+ * File containing code for class mrg::journal::pmgr (page manager). See
+ * comments in file pmgr.h for details.
+ *
+ * \author Kim van der Riet
+ */
+
+#include "qpid/legacystore/jrnl/pmgr.h"
+
+#include <cerrno>
+#include <cstdlib>
+#include <cstring>
+#include "qpid/legacystore/jrnl/jcfg.h"
+#include "qpid/legacystore/jrnl/jcntl.h"
+#include "qpid/legacystore/jrnl/jerrno.h"
+#include <sstream>
+
+
+namespace mrg
+{
+namespace journal
+{
+
+pmgr::page_cb::page_cb(u_int16_t index):
+        _index(index),
+        _state(UNUSED),
+        _wdblks(0),
+        _rdblks(0),
+        _pdtokl(0),
+        _wfh(0),
+        _rfh(0),
+        _pbuff(0)
+{}
+
+const char*
+pmgr::page_cb::state_str() const
+{
+    switch(_state)
+    {
+        case UNUSED:
+            return "UNUSED";
+        case IN_USE:
+            return "IN_USE";
+        case AIO_PENDING:
+            return "AIO_PENDING";
+        case AIO_COMPLETE:
+            return "AIO_COMPLETE";
+    }
+    return "<unknown>";
+}
+
+const u_int32_t pmgr::_sblksize = JRNL_SBLK_SIZE * JRNL_DBLK_SIZE;
+
+pmgr::pmgr(jcntl* jc, enq_map& emap, txn_map& tmap):
+        _cache_pgsize_sblks(0),
+        _cache_num_pages(0),
+        _jc(jc),
+        _emap(emap),
+        _tmap(tmap),
+        _page_base_ptr(0),
+        _page_ptr_arr(0),
+        _page_cb_arr(0),
+        _aio_cb_arr(0),
+        _aio_event_arr(0),
+        _ioctx(0),
+        _pg_index(0),
+        _pg_cntr(0),
+        _pg_offset_dblks(0),
+        _aio_evt_rem(0),
+        _cbp(0),
+        _enq_rec(),
+        _deq_rec(),
+        _txn_rec()
+{}
+
+pmgr::~pmgr()
+{
+    pmgr::clean();
+}
+
+void
+pmgr::initialize(aio_callback* const cbp, const u_int32_t cache_pgsize_sblks, const u_int16_t cache_num_pages)
+{
+    // As static use of this class keeps old values around, clean up first...
+    pmgr::clean();
+    _pg_index = 0;
+    _pg_cntr = 0;
+    _pg_offset_dblks = 0;
+    _aio_evt_rem = 0;
+    _cache_pgsize_sblks = cache_pgsize_sblks;
+    _cache_num_pages = cache_num_pages;
+    _cbp = cbp;
+
+    // 1. Allocate page memory (as a single block)
+    std::size_t cache_pgsize = _cache_num_pages * _cache_pgsize_sblks * _sblksize;
+    if (::posix_memalign(&_page_base_ptr, _sblksize, cache_pgsize))
+    {
+        clean();
+        std::ostringstream oss;
+        oss << "posix_memalign(): blksize=" << _sblksize << " size=" << cache_pgsize;
+        oss << FORMAT_SYSERR(errno);
+        throw jexception(jerrno::JERR__MALLOC, oss.str(), "pmgr", "initialize");
+    }
+    // 2. Allocate array of page pointers
+    _page_ptr_arr = (void**)std::malloc(_cache_num_pages * sizeof(void*));
+    MALLOC_CHK(_page_ptr_arr, "_page_ptr_arr", "pmgr", "initialize");
+
+    // 3. Allocate and initilaize page control block (page_cb) array
+    _page_cb_arr = (page_cb*)std::malloc(_cache_num_pages * sizeof(page_cb));
+    MALLOC_CHK(_page_cb_arr, "_page_cb_arr", "pmgr", "initialize");
+    std::memset(_page_cb_arr, 0, _cache_num_pages * sizeof(page_cb));
+
+    // 5. Allocate IO control block (iocb) array
+    _aio_cb_arr = (aio_cb*)std::malloc(_cache_num_pages * sizeof(aio_cb));
+    MALLOC_CHK(_aio_cb_arr, "_aio_cb_arr", "pmgr", "initialize");
+
+    // 6. Set page pointers in _page_ptr_arr, _page_cb_arr and iocbs to pages within page block
+    for (u_int16_t i=0; i<_cache_num_pages; i++)
+    {
+        _page_ptr_arr[i] = (void*)((char*)_page_base_ptr + _cache_pgsize_sblks * _sblksize * i);
+        _page_cb_arr[i]._index = i;
+        _page_cb_arr[i]._state = UNUSED;
+        _page_cb_arr[i]._pbuff = _page_ptr_arr[i];
+        _page_cb_arr[i]._pdtokl = new std::deque<data_tok*>;
+        _page_cb_arr[i]._pdtokl->clear();
+        _aio_cb_arr[i].data = (void*)&_page_cb_arr[i];
+    }
+
+    // 7. Allocate io_event array, max one event per cache page plus one for each file
+    const u_int16_t max_aio_evts = _cache_num_pages + _jc->num_jfiles();
+    _aio_event_arr = (aio_event*)std::malloc(max_aio_evts * sizeof(aio_event));
+    MALLOC_CHK(_aio_event_arr, "_aio_event_arr", "pmgr", "initialize");
+
+    // 8. Initialize AIO context
+    if (int ret = aio::queue_init(max_aio_evts, &_ioctx))
+    {
+        std::ostringstream oss;
+        oss << "io_queue_init() failed: " << FORMAT_SYSERR(-ret);
+        throw jexception(jerrno::JERR__AIO, oss.str(), "pmgr", "initialize");
+    }
+}
+
+void
+pmgr::clean()
+{
+    // clean up allocated memory here
+
+    if (_ioctx)
+        aio::queue_release(_ioctx);
+
+    std::free(_page_base_ptr);
+    _page_base_ptr = 0;
+
+    if (_page_cb_arr)
+    {
+        for (int i=0; i<_cache_num_pages; i++)
+            delete _page_cb_arr[i]._pdtokl;
+        std::free(_page_ptr_arr);
+        _page_ptr_arr = 0;
+    }
+
+    std::free(_page_cb_arr);
+    _page_cb_arr = 0;
+
+    std::free(_aio_cb_arr);
+    _aio_cb_arr = 0;
+
+    std::free(_aio_event_arr);
+    _aio_event_arr = 0;
+}
+
+const char*
+pmgr::page_state_str(page_state ps)
+{
+    switch (ps)
+    {
+        case UNUSED:
+            return "UNUSED";
+        case IN_USE:
+            return "IN_USE";
+        case AIO_PENDING:
+            return "AIO_PENDING";
+        case AIO_COMPLETE:
+            return "AIO_COMPLETE";
+    }
+    return "<page_state unknown>";
+}
+
+} // namespace journal
+} // namespace mrg

Propchange: qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/pmgr.cpp
------------------------------------------------------------------------------
    svn:eol-style = native

Added: qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/pmgr.h
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/pmgr.h?rev=1501895&view=auto
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/pmgr.h (added)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/pmgr.h Wed Jul 10 18:20:19 2013
@@ -0,0 +1,142 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+/**
+ * \file pmgr.h
+ *
+ * Qpid asynchronous store plugin library
+ *
+ * File containing code for class mrg::journal::pmgr (page manager). See
+ * class documentation for details.
+ *
+ * \author Kim van der Riet
+ */
+
+#ifndef QPID_LEGACYSTORE_JRNL_PMGR_H
+#define QPID_LEGACYSTORE_JRNL_PMGR_H
+
+namespace mrg
+{
+namespace journal
+{
+    class pmgr;
+    class jcntl;
+}
+}
+
+#include <deque>
+#include "qpid/legacystore/jrnl/aio.h"
+#include "qpid/legacystore/jrnl/aio_callback.h"
+#include "qpid/legacystore/jrnl/data_tok.h"
+#include "qpid/legacystore/jrnl/deq_rec.h"
+#include "qpid/legacystore/jrnl/enq_map.h"
+#include "qpid/legacystore/jrnl/enq_rec.h"
+#include "qpid/legacystore/jrnl/fcntl.h"
+#include "qpid/legacystore/jrnl/txn_map.h"
+#include "qpid/legacystore/jrnl/txn_rec.h"
+
+namespace mrg
+{
+namespace journal
+{
+
+    /**
+    * \brief Abstract class for managing either read or write page cache of arbitrary size and
+    *    number of cache_num_pages.
+    */
+    class pmgr
+    {
+    public:
+        /**
+        * \brief Enumeration of possible stats of a page within a page cache.
+        */
+        enum page_state
+        {
+            UNUSED,                     ///< A page is uninitialized, contains no data.
+            IN_USE,                     ///< Page is in use.
+            AIO_PENDING,                ///< An AIO request outstanding.
+            AIO_COMPLETE                ///< An AIO request is complete.
+        };
+
+    protected:
+        /**
+        * \brief Page control block, carries control and state information for each page in the
+        *     cache.
+        */
+        struct page_cb
+        {
+            u_int16_t _index;           ///< Index of this page
+            page_state _state;          ///< Status of page
+            u_int64_t _frid;            ///< First rid in page (used for fhdr init)
+            u_int32_t _wdblks;          ///< Total number of dblks in page so far
+            u_int32_t _rdblks;          ///< Total number of dblks in page
+            std::deque<data_tok*>* _pdtokl; ///< Page message tokens list
+            fcntl* _wfh;                ///< File handle for incrementing write compl counts
+            fcntl* _rfh;                ///< File handle for incrementing read compl counts
+            void* _pbuff;               ///< Page buffer
+
+            page_cb(u_int16_t index);   ///< Convenience constructor
+            const char* state_str() const; ///< Return state as string for this pcb
+        };
+
+        static const u_int32_t _sblksize; ///< Disk softblock size
+        u_int32_t _cache_pgsize_sblks;  ///< Size of page cache cache_num_pages
+        u_int16_t _cache_num_pages;     ///< Number of page cache cache_num_pages
+        jcntl* _jc;                     ///< Pointer to journal controller
+        enq_map& _emap;                 ///< Ref to enqueue map
+        txn_map& _tmap;                 ///< Ref to transaction map
+        void* _page_base_ptr;           ///< Base pointer to page memory
+        void** _page_ptr_arr;           ///< Array of pointers to cache_num_pages in page memory
+        page_cb* _page_cb_arr;          ///< Array of page_cb structs
+        aio_cb* _aio_cb_arr;            ///< Array of iocb structs
+        aio_event* _aio_event_arr;      ///< Array of io_events
+        io_context_t _ioctx;            ///< AIO context for read/write operations
+        u_int16_t _pg_index;            ///< Index of current page being used
+        u_int32_t _pg_cntr;             ///< Page counter; determines if file rotation req'd
+        u_int32_t _pg_offset_dblks;     ///< Page offset (used so far) in data blocks
+        u_int32_t _aio_evt_rem;         ///< Remaining AIO events
+        aio_callback* _cbp;             ///< Pointer to callback object
+
+        enq_rec _enq_rec;               ///< Enqueue record used for encoding/decoding
+        deq_rec _deq_rec;               ///< Dequeue record used for encoding/decoding
+        txn_rec _txn_rec;               ///< Transaction record used for encoding/decoding
+
+    public:
+        pmgr(jcntl* jc, enq_map& emap, txn_map& tmap);
+        virtual ~pmgr();
+
+        virtual int32_t get_events(page_state state, timespec* const timeout, bool flush = false) = 0;
+        inline u_int32_t get_aio_evt_rem() const { return _aio_evt_rem; }
+        static const char* page_state_str(page_state ps);
+        inline u_int32_t cache_pgsize_sblks() const { return _cache_pgsize_sblks; }
+        inline u_int16_t cache_num_pages() const { return _cache_num_pages; }
+
+    protected:
+        virtual void initialize(aio_callback* const cbp, const u_int32_t cache_pgsize_sblks,
+                const u_int16_t cache_num_pages);
+        virtual void rotate_page() = 0;
+        virtual void clean();
+    };
+
+} // namespace journal
+} // namespace mrg
+
+#endif // ifndef QPID_LEGACYSTORE_JRNL_PMGR_H

Propchange: qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/pmgr.h
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/pmgr.h
------------------------------------------------------------------------------
    svn:keywords = Author Date Id Rev URL

Added: qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/rcvdat.h
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/rcvdat.h?rev=1501895&view=auto
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/rcvdat.h (added)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/rcvdat.h Wed Jul 10 18:20:19 2013
@@ -0,0 +1,181 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+/**
+ * \file rcvdat.h
+ *
+ * Qpid asynchronous store plugin library
+ *
+ * Contains structure for recovery status and offset data.
+ *
+ * \author Kim van der Riet
+ */
+
+#ifndef QPID_LEGACYSTORE_JRNL_RCVDAT_H
+#define QPID_LEGACYSTORE_JRNL_RCVDAT_H
+
+#include <cstddef>
+#include <iomanip>
+#include <map>
+#include "qpid/legacystore/jrnl/jcfg.h"
+#include <sstream>
+#include <sys/types.h>
+#include <vector>
+
+namespace mrg
+{
+namespace journal
+{
+
+        struct rcvdat
+        {
+            u_int16_t _njf;     ///< Number of journal files
+            bool _ae;           ///< Auto-expand mode
+            u_int16_t _aemjf;   ///< Auto-expand mode max journal files
+            bool _owi;          ///< Overwrite indicator
+            bool _frot;         ///< First rotation flag
+            bool _jempty;       ///< Journal data files empty
+            u_int16_t _ffid;    ///< First file id
+            std::size_t _fro;   ///< First record offset in ffid
+            u_int16_t _lfid;    ///< Last file id
+            std::size_t _eo;    ///< End offset (first byte past last record)
+            u_int64_t _h_rid;   ///< Highest rid found
+            bool _lffull;       ///< Last file is full
+            bool _jfull;        ///< Journal is full
+            std::vector<u_int16_t> _fid_list; ///< Fid-lid mapping - list of fids in order of lid
+            std::vector<u_int32_t> _enq_cnt_list; ///< Number enqueued records found for each file
+
+            rcvdat():
+                    _njf(0),
+                    _ae(false),
+                    _aemjf(0),
+                    _owi(false),
+                    _frot(false),
+                    _jempty(true),
+                    _ffid(0),
+                    _fro(0),
+                    _lfid(0),
+                    _eo(0),
+                    _h_rid(0),
+                    _lffull(false),
+                    _jfull(false),
+                    _fid_list(),
+                    _enq_cnt_list()
+            {}
+
+            void reset(const u_int16_t num_jfiles, const bool auto_expand, const u_int16_t ae_max_jfiles)
+            {
+                _njf = num_jfiles;
+                _ae = auto_expand;
+                _aemjf = ae_max_jfiles;
+                _owi = false;
+                _frot = false;
+                _jempty = true;
+                _ffid = 0;
+                _fro = 0;
+                _lfid = 0;
+                _eo = 0;
+                _h_rid = 0;
+                _lffull = false;
+                _jfull = false;
+                _fid_list.clear();
+                _enq_cnt_list.clear();
+                _enq_cnt_list.resize(num_jfiles, 0);
+            }
+
+            // Find first fid with enqueued records
+            u_int16_t ffid()
+            {
+                u_int16_t index = _ffid;
+                while (index != _lfid && _enq_cnt_list[index] == 0)
+                {
+                    if (++index >= _njf)
+                        index = 0;
+                }
+                return index;
+            }
+
+            std::string to_string(const std::string& jid)
+            {
+                std::ostringstream oss;
+                oss << "Recover file analysis (jid=\"" << jid << "\"):" << std::endl;
+                oss << "  Number of journal files (_njf) = " << _njf << std::endl;
+                oss << "  Auto-expand mode (_ae) = " << (_ae ? "TRUE" : "FALSE") << std::endl;
+                if (_ae) oss << "  Auto-expand mode max journal files (_aemjf) = " << _aemjf << std::endl;
+                oss << "  Overwrite indicator (_owi) = " << (_owi ? "TRUE" : "FALSE") << std::endl;
+                oss << "  First rotation (_frot) = " << (_frot ? "TRUE" : "FALSE") << std::endl;
+                oss << "  Journal empty (_jempty) = " << (_jempty ? "TRUE" : "FALSE") << std::endl;
+                oss << "  First (earliest) fid (_ffid) = " << _ffid << std::endl;
+                oss << "  First record offset in first fid (_fro) = 0x" << std::hex << _fro <<
+                        std::dec << " (" << (_fro/JRNL_DBLK_SIZE) << " dblks)" << std::endl;
+                oss << "  Last (most recent) fid (_lfid) = " << _lfid << std::endl;
+                oss << "  End offset (_eo) = 0x" << std::hex << _eo << std::dec << " ("  <<
+                        (_eo/JRNL_DBLK_SIZE) << " dblks)" << std::endl;
+                oss << "  Highest rid (_h_rid) = 0x" << std::hex << _h_rid << std::dec << std::endl;
+                oss << "  Last file full (_lffull) = " << (_lffull ? "TRUE" : "FALSE") << std::endl;
+                oss << "  Journal full (_jfull) = " << (_jfull ? "TRUE" : "FALSE") << std::endl;
+                oss << "  Normalized fid list (_fid_list) = [";
+                for (std::vector<u_int16_t>::const_iterator i = _fid_list.begin(); i < _fid_list.end(); i++)
+                {
+                    if (i != _fid_list.begin()) oss << ", ";
+                    oss << *i;
+                }
+                oss << "]" << std::endl;
+                oss << "  Enqueued records (txn & non-txn):" << std::endl;
+                for (unsigned i=0; i<_enq_cnt_list.size(); i++)
+                   oss << "    File " << std::setw(2) << i << ": " << _enq_cnt_list[i] <<
+                            std::endl;
+                return oss.str();
+            }
+
+            std::string to_log(const std::string& jid)
+            {
+                std::ostringstream oss;
+                oss << "Recover file analysis (jid=\"" << jid << "\"):";
+                oss << " njf=" << _njf;
+                oss << " ae=" << (_owi ? "T" : "F");
+                oss << " aemjf=" << _aemjf;
+                oss << " owi=" << (_ae ? "T" : "F");
+                oss << " frot=" << (_frot ? "T" : "F");
+                oss << " jempty=" << (_jempty ? "T" : "F");
+                oss << " ffid=" << _ffid;
+                oss << " fro=0x" << std::hex << _fro << std::dec << " (" <<
+                        (_fro/JRNL_DBLK_SIZE) << " dblks)";
+                oss << " lfid=" << _lfid;
+                oss << " eo=0x" << std::hex << _eo << std::dec << " ("  <<
+                        (_eo/JRNL_DBLK_SIZE) << " dblks)";
+                oss << " h_rid=0x" << std::hex << _h_rid << std::dec;
+                oss << " lffull=" << (_lffull ? "T" : "F");
+                oss << " jfull=" << (_jfull ? "T" : "F");
+                oss << " Enqueued records (txn & non-txn): [ ";
+                for (unsigned i=0; i<_enq_cnt_list.size(); i++)
+                {
+                    if (i) oss << " ";
+                    oss << "fid_" << std::setw(2) << std::setfill('0') << i << "=" << _enq_cnt_list[i];
+                }
+                oss << " ]";
+                return oss.str();
+            }
+        };
+} // namespace journal
+} // namespace mrg
+
+#endif // ifndef QPID_LEGACYSTORE_JRNL_RCVDAT_H

Propchange: qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/rcvdat.h
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/rcvdat.h
------------------------------------------------------------------------------
    svn:keywords = Author Date Id Rev URL

Added: qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/rec_hdr.h
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/rec_hdr.h?rev=1501895&view=auto
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/rec_hdr.h (added)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/rec_hdr.h Wed Jul 10 18:20:19 2013
@@ -0,0 +1,143 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+/**
+ * \file rec_hdr.h
+ *
+ * Qpid asynchronous store plugin library
+ *
+ * File containing code for class mrg::journal::rec_hdr (record header),
+ * which is a common initial header used for all journal record structures
+ * except the record tail (rec_tail).
+ *
+ * \author Kim van der Riet
+ */
+
+#ifndef QPID_LEGACYSTORE_JRNL_REC_HDR_H
+#define QPID_LEGACYSTORE_JRNL_REC_HDR_H
+
+#include <cstddef>
+#include "qpid/legacystore/jrnl/jcfg.h"
+#include <sys/types.h>
+
+namespace mrg
+{
+namespace journal
+{
+
+#pragma pack(1)
+
+    /**
+    * \brief Struct for data common to the head of all journal files and records.
+    * This includes identification for the file type, the encoding version, endian
+    * indicator and a record ID.
+    *
+    * File header info in binary format (16 bytes):
+    * <pre>
+    *   0                           7
+    * +---+---+---+---+---+---+---+---+
+    * |     magic     | v | e | flags |
+    * +---+---+---+---+---+---+---+---+
+    * |              rid              |
+    * +---+---+---+---+---+---+---+---+
+    * v = file version (If the format or encoding of this file changes, then this
+    *     number should be incremented)
+    * e = endian flag, false (0x00) for little endian, true (0x01) for big endian
+    * </pre>
+    *
+    * Note that journal files should be transferable between 32- and 64-bit
+    * hardware of the same endianness, but not between hardware of opposite
+    * entianness without some sort of binary conversion utility. Thus buffering
+    * will be needed for types that change size between 32- and 64-bit compiles.
+    */
+    struct rec_hdr
+    {
+        u_int32_t _magic;       ///< File type identifier (magic number)
+        u_int8_t _version;      ///< File encoding version
+        u_int8_t _eflag;        ///< Flag for determining endianness
+        u_int16_t _uflag;       ///< User-defined flags
+        u_int64_t _rid;         ///< Record ID (rotating 64-bit counter)
+
+        // Global flags
+        static const u_int16_t HDR_OVERWRITE_INDICATOR_MASK = 0x1;
+
+        // Convenience constructors and methods
+        /**
+        * \brief Default constructor, which sets all values to 0.
+        */
+        inline rec_hdr(): _magic(0), _version(0), _eflag(0), _uflag(0), _rid(0) {}
+
+        /**
+        * \brief Convenience constructor which initializes values during construction.
+        */
+        inline rec_hdr(const u_int32_t magic, const u_int8_t version, const u_int64_t rid,
+                const bool owi): _magic(magic), _version(version),
+#if defined(JRNL_BIG_ENDIAN)
+            _eflag(RHM_BENDIAN_FLAG),
+#else
+            _eflag(RHM_LENDIAN_FLAG),
+#endif
+            _uflag(owi ? HDR_OVERWRITE_INDICATOR_MASK : 0), _rid(rid) {}
+
+        /**
+        * \brief Convenience copy method.
+        */
+        inline void hdr_copy(const rec_hdr& h)
+        {
+            _magic = h._magic;
+            _version = h._version;
+            _eflag = h._eflag;
+            _uflag = h._uflag;
+            _rid =h._rid;
+        }
+
+        /**
+        * \brief Resets all fields to 0
+        */
+        inline void reset()
+        {
+            _magic = 0;
+            _version = 0;
+            _eflag = 0;
+            _uflag = 0;
+            _rid = 0;
+        }
+
+        inline bool get_owi() const { return _uflag & HDR_OVERWRITE_INDICATOR_MASK; }
+
+        inline void set_owi(const bool owi)
+        {
+            _uflag = owi ? _uflag | HDR_OVERWRITE_INDICATOR_MASK :
+                    _uflag & (~HDR_OVERWRITE_INDICATOR_MASK);
+        }
+
+        /**
+        * \brief Returns the size of the header in bytes.
+        */
+        inline static std::size_t size() { return sizeof(rec_hdr); }
+    }; // struct rec_hdr
+
+#pragma pack()
+
+} // namespace journal
+} // namespace mrg
+
+#endif // ifndef QPID_LEGACYSTORE_JRNL_REC_HDR_H

Propchange: qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/rec_hdr.h
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/rec_hdr.h
------------------------------------------------------------------------------
    svn:keywords = Author Date Id Rev URL

Added: qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/rec_tail.h
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/rec_tail.h?rev=1501895&view=auto
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/rec_tail.h (added)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/rec_tail.h Wed Jul 10 18:20:19 2013
@@ -0,0 +1,98 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+/**
+ * \file rec_tail.h
+ *
+ * Qpid asynchronous store plugin library
+ *
+ * File containing code for class mrg::journal::rec_tail (record tail), used to
+ * finalize a persistent record. The presence of a valid tail at the expected
+ * position in the journal file indicates that the record write was completed.
+ *
+ * \author Kim van der Riet
+ */
+
+#ifndef QPID_LEGACYSTORE_JRNL_REC_TAIL_H
+#define QPID_LEGACYSTORE_JRNL_REC_TAIL_H
+
+#include <cstddef>
+#include "qpid/legacystore/jrnl/jcfg.h"
+
+namespace mrg
+{
+namespace journal
+{
+
+#pragma pack(1)
+
+    /**
+    * \brief Struct for data common to the tail of all records. The magic number
+    * used here is the binary inverse (1's complement) of the magic used in the
+    * record header; this minimizes possible confusion with other headers that may
+    * be present during recovery. The tail is used with all records that have either
+    * XIDs or data - ie any size-variable content. Currently the only records that
+    * do NOT use the tail are non-transactional dequeues and filler records.
+    *
+    * Record header info in binary format (12 bytes):
+    * <pre>
+    *   0                           7
+    * +---+---+---+---+---+---+---+---+
+    * |   ~(magic)    |      rid      |
+    * +---+---+---+---+---+---+---+---+
+    * |  rid (con't)  |
+    * +---+---+---+---+
+    * </pre>
+    */
+    struct rec_tail
+    {
+        u_int32_t _xmagic;      ///< Binary inverse (1's complement) of hdr magic number
+        u_int64_t _rid;         ///< ID (rotating 64-bit counter)
+
+
+        /**
+        * \brief Default constructor, which sets all values to 0.
+        */
+        inline rec_tail(): _xmagic(0xffffffff), _rid(0) {}
+
+        /**
+        * \brief Convenience constructor which initializes values during construction from
+        *     existing enq_hdr instance.
+        */
+        inline rec_tail(const rec_hdr& h): _xmagic(~h._magic), _rid(h._rid) {}
+
+        /**
+        * \brief Convenience constructor which initializes values during construction.
+        */
+        inline rec_tail(const u_int32_t xmagic, const u_int64_t rid): _xmagic(xmagic), _rid(rid) {}
+
+        /**
+        * \brief Returns the size of the header in bytes.
+        */
+        inline static std::size_t size() { return sizeof(rec_tail); }
+    };
+
+#pragma pack()
+
+} // namespace journal
+} // namespace mrg
+
+#endif // ifndef QPID_LEGACYSTORE_JRNL_REC_TAIL_H

Propchange: qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/rec_tail.h
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/rec_tail.h
------------------------------------------------------------------------------
    svn:keywords = Author Date Id Rev URL

Added: qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/rfc.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/rfc.cpp?rev=1501895&view=auto
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/rfc.cpp (added)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/rfc.cpp Wed Jul 10 18:20:19 2013
@@ -0,0 +1,82 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+/**
+ * \file rfc.cpp
+ *
+ * Qpid asynchronous store plugin library
+ *
+ * File containing code for class mrg::journal::rfc (rotating
+ * file controller). See comments in file rfc.h for details.
+ *
+ * \author Kim van der Riet
+ */
+
+#include "qpid/legacystore/jrnl/rfc.h"
+
+#include <cassert>
+
+namespace mrg
+{
+namespace journal
+{
+
+rfc::rfc(const lpmgr* lpmp): _lpmp(lpmp), _fc_index(0), _curr_fc(0)
+{}
+
+rfc::~rfc()
+{}
+
+void
+rfc::finalize()
+{
+    unset_findex();
+}
+
+void
+rfc::set_findex(const u_int16_t fc_index)
+{
+    _fc_index = fc_index;
+    _curr_fc = _lpmp->get_fcntlp(fc_index);
+    _curr_fc->rd_reset();
+}
+
+void
+rfc::unset_findex()
+{
+    _fc_index = 0;
+    _curr_fc = 0;
+}
+
+std::string
+rfc::status_str() const
+{
+    if (!_lpmp->is_init())
+        return "state: Uninitialized";
+    if (_curr_fc == 0)
+        return "state: Inactive";
+    std::ostringstream oss;
+    oss << "state: Active";
+    return oss.str();
+}
+
+} // namespace journal
+} // namespace mrg

Propchange: qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/rfc.cpp
------------------------------------------------------------------------------
    svn:eol-style = native

Added: qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/rfc.h
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/rfc.h?rev=1501895&view=auto
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/rfc.h (added)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/rfc.h Wed Jul 10 18:20:19 2013
@@ -0,0 +1,193 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+/**
+ * \file rfc.h
+ *
+ * Qpid asynchronous store plugin library
+ *
+ * File containing code for class mrg::journal::rfc (rotating
+ * file controller). See class documentation for details.
+ *
+ * \author Kim van der Riet
+ */
+
+#ifndef QPID_LEGACYSTORE_JRNL_RFC_H
+#define QPID_LEGACYSTORE_JRNL_RFC_H
+
+namespace mrg
+{
+namespace journal
+{
+class rfc;
+}
+}
+
+#include "qpid/legacystore/jrnl/lpmgr.h"
+#include "qpid/legacystore/jrnl/enums.h"
+
+namespace mrg
+{
+namespace journal
+{
+
+    /**
+    * \class rfc
+    * \brief Rotating File Controller (rfc) - Class to handle the manangement of an array of file controllers (fcntl)
+    *     objects for use in a circular disk buffer (journal). Each fcntl object corresponds to a file in the journal.
+    *
+    * The following states exist in this class:
+    *
+    * <pre>
+    *                                                                   is_init()  is_active()
+    *                  +===+                    _lpmp.is_init() == false
+    *      +---------->|   |     Uninitialized: _curr_fc == 0               F           F
+    *      |       +-->+===+ --+
+    *      |       |           |
+    *      |       |           |
+    *      |   finalize()   initialize()
+    *      |       |           |
+    *      |       |           |
+    *      |       +-- +===+<--+                _lpmp.is_init() == true
+    *  finalize()      |   |     Inactive:      _curr_fc == 0               T           F
+    *      |       +-->+===+ --+
+    *      |       |           |
+    *      |       |           |
+    *      | unset_findex() set_findex()
+    *      |       |           |
+    *      |       |           |
+    *      |       +-- +===+<--+                _lpmp.is_init() == true
+    *      +---------- |   |     Active:        _curr_fc != 0               T           T
+    *                  +===+
+    * </pre>
+    *
+    * The Uninitialized state is where the class starts after construction. Once the number of files is known and
+    * the array of file controllers allocated, then initialize() is called to set these, causing the state to move
+    * to Inactive.
+    *
+    * The Inactive state has the file controllers allocated and pointing to their respective journal files, but no
+    * current file controller has been selected. The pointer to the current file controller _curr_fc is null. Once the
+    * index of the active file is known, then calling set_findex() will set the index and internal pointer
+    * to the currently active file controller. This moves the state to Active.
+    *
+    * Note TODO: Comment on sync issues between change in num files in _lpmp and _fc_index/_curr_fc.
+    */
+    class rfc
+    {
+    protected:
+        const lpmgr* _lpmp;     ///< Pointer to jcntl's lpmgr instance containing lfid/pfid map and fcntl objects
+        u_int16_t _fc_index;    ///< Index of current file controller
+        fcntl*    _curr_fc;     ///< Pointer to current file controller
+
+    public:
+        rfc(const lpmgr* lpmp);
+        virtual ~rfc();
+
+        /**
+        * \brief Initialize the controller, moving from state Uninitialized to Inactive. The main function of
+        *     initialize() is to set the number of files and the pointer to the fcntl array.
+        */
+        virtual inline void initialize() {}
+
+        /**
+        * \brief Reset the controller to Uninitialized state, usually called when the journal is stopped. Once called,
+        *     initialize() must be called to reuse an instance.
+        */
+        virtual void finalize();
+
+        /**
+        * \brief Check initialization state: true = Not Uninitialized, ie Initialized or Active; false = Uninitialized.
+        */
+        virtual inline bool is_init() const { return _lpmp->is_init(); }
+
+        /**
+        * \brief Check active state: true = Initialized and _curr_fc not null; false otherwise.
+        */
+        virtual inline bool is_active() const { return _lpmp->is_init() && _curr_fc != 0; }
+
+        /**
+        * \brief Sets the current file index and active fcntl object. Moves to state Active.
+        */
+        virtual void set_findex(const u_int16_t fc_index);
+
+        /**
+        * \brief Nulls the current file index and active fcntl pointer, moves to state Inactive.
+        */
+        virtual void unset_findex();
+
+        /**
+        * \brief Rotate active file controller to next file in rotating file group.
+        * \exception jerrno::JERR__NINIT if called before calling initialize().
+        */
+        virtual iores rotate() = 0;
+
+        /**
+        * \brief Returns the index of the currently active file within the rotating file group.
+        */
+        inline u_int16_t index() const { return _fc_index; }
+
+        /**
+        * \brief Returns the currently active journal file controller within the rotating file group.
+        */
+        inline fcntl* file_controller() const { return _curr_fc; }
+
+        /**
+        * \brief Returns the currently active physical file id (pfid)
+        */
+        inline u_int16_t pfid() const { return _curr_fc->pfid(); }
+
+        // Convenience access methods to current file controller
+        // Note: Do not call when not in active state
+
+        inline u_int32_t enqcnt() const { return _curr_fc->enqcnt(); }
+        inline u_int32_t incr_enqcnt() { return _curr_fc->incr_enqcnt(); }
+        inline u_int32_t incr_enqcnt(const u_int16_t fid) { return _lpmp->get_fcntlp(fid)->incr_enqcnt(); }
+        inline u_int32_t add_enqcnt(const u_int32_t a) { return _curr_fc->add_enqcnt(a); }
+        inline u_int32_t add_enqcnt(const u_int16_t fid, const u_int32_t a)
+                { return _lpmp->get_fcntlp(fid)->add_enqcnt(a); }
+        inline u_int32_t decr_enqcnt(const u_int16_t fid) { return _lpmp->get_fcntlp(fid)->decr_enqcnt(); }
+        inline u_int32_t subtr_enqcnt(const u_int16_t fid, const u_int32_t s)
+                { return _lpmp->get_fcntlp(fid)->subtr_enqcnt(s); }
+
+        virtual inline u_int32_t subm_cnt_dblks() const = 0;
+        virtual inline std::size_t subm_offs() const = 0;
+        virtual inline u_int32_t add_subm_cnt_dblks(u_int32_t a) = 0;
+
+        virtual inline u_int32_t cmpl_cnt_dblks() const = 0;
+        virtual inline std::size_t cmpl_offs() const = 0;
+        virtual inline u_int32_t add_cmpl_cnt_dblks(u_int32_t a) = 0;
+
+        virtual inline bool is_void() const = 0;
+        virtual inline bool is_empty() const = 0;
+        virtual inline u_int32_t remaining_dblks() const = 0;
+        virtual inline bool is_full() const = 0;
+        virtual inline bool is_compl() const = 0;
+        virtual inline u_int32_t aio_outstanding_dblks() const = 0;
+        virtual inline bool file_rotate() const = 0;
+
+        // Debug aid
+        virtual std::string status_str() const;
+    }; // class rfc
+
+} // namespace journal
+} // namespace mrg
+
+#endif // ifndef QPID_LEGACYSTORE_JRNL_RFC_H

Propchange: qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/rfc.h
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/rfc.h
------------------------------------------------------------------------------
    svn:keywords = Author Date Id Rev URL

Added: qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/rmgr.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/rmgr.cpp?rev=1501895&view=auto
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/rmgr.cpp (added)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/rmgr.cpp Wed Jul 10 18:20:19 2013
@@ -0,0 +1,698 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+/**
+ * \file rmgr.cpp
+ *
+ * Qpid asynchronous store plugin library
+ *
+ * File containing code for class mrg::journal::rmgr (read manager). See
+ * comments in file rmgr.h for details.
+ *
+ * \author Kim van der Riet
+ */
+
+#include "qpid/legacystore/jrnl/rmgr.h"
+
+#include <cassert>
+#include <cerrno>
+#include <cstdlib>
+#include "qpid/legacystore/jrnl/jcntl.h"
+#include "qpid/legacystore/jrnl/jerrno.h"
+#include <sstream>
+
+namespace mrg
+{
+namespace journal
+{
+
+rmgr::rmgr(jcntl* jc, enq_map& emap, txn_map& tmap, rrfc& rrfc):
+        pmgr(jc, emap, tmap),
+        _rrfc(rrfc),
+        _hdr(),
+        _fhdr_buffer(0),
+        _fhdr_aio_cb_ptr(0),
+        _fhdr_rd_outstanding(false)
+{}
+
+rmgr::~rmgr()
+{
+    rmgr::clean();
+}
+
+void
+rmgr::initialize(aio_callback* const cbp)
+{
+    pmgr::initialize(cbp, JRNL_RMGR_PAGE_SIZE, JRNL_RMGR_PAGES);
+    clean();
+    // Allocate memory for reading file header
+    if (::posix_memalign(&_fhdr_buffer, _sblksize, _sblksize))
+    {
+        std::ostringstream oss;
+        oss << "posix_memalign(): blksize=" << _sblksize << " size=" << _sblksize;
+        oss << FORMAT_SYSERR(errno);
+        throw jexception(jerrno::JERR__MALLOC, oss.str(), "rmgr", "initialize");
+    }
+    _fhdr_aio_cb_ptr = new aio_cb;
+    std::memset(_fhdr_aio_cb_ptr, 0, sizeof(aio_cb*));
+}
+
+void
+rmgr::clean()
+{
+    std::free(_fhdr_buffer);
+    _fhdr_buffer = 0;
+
+    if (_fhdr_aio_cb_ptr)
+    {
+        delete _fhdr_aio_cb_ptr;
+        _fhdr_aio_cb_ptr = 0;
+    }
+}
+
+iores
+rmgr::read(void** const datapp, std::size_t& dsize, void** const xidpp, std::size_t& xidsize,
+        bool& transient, bool& external, data_tok* dtokp,  bool ignore_pending_txns)
+{
+    iores res = pre_read_check(dtokp);
+    if (res != RHM_IORES_SUCCESS)
+    {
+        set_params_null(datapp, dsize, xidpp, xidsize);
+        return res;
+    }
+
+    if (dtokp->rstate() == data_tok::SKIP_PART)
+    {
+        if (_page_cb_arr[_pg_index]._state != AIO_COMPLETE)
+        {
+            aio_cycle();   // check if rd AIOs returned; initiate new reads if possible
+            return RHM_IORES_PAGE_AIOWAIT;
+        }
+        const iores res = skip(dtokp);
+        if (res != RHM_IORES_SUCCESS)
+        {
+            set_params_null(datapp, dsize, xidpp, xidsize);
+            return res;
+        }
+    }
+    if (dtokp->rstate() == data_tok::READ_PART)
+    {
+        assert(dtokp->rid() == _hdr._rid);
+        void* rptr = (void*)((char*)_page_ptr_arr[_pg_index] + (_pg_offset_dblks * JRNL_DBLK_SIZE));
+        const iores res = read_enq(_hdr, rptr, dtokp);
+        dsize = _enq_rec.get_data(datapp);
+        xidsize = _enq_rec.get_xid(xidpp);
+        transient = _enq_rec.is_transient();
+        external = _enq_rec.is_external();
+        return res;
+    }
+
+    set_params_null(datapp, dsize, xidpp, xidsize);
+    _hdr.reset();
+    // Read header, determine next record type
+    while (true)
+    {
+        if(dblks_rem() == 0 && _rrfc.is_compl() && !_rrfc.is_wr_aio_outstanding())
+        {
+            aio_cycle();   // check if rd AIOs returned; initiate new reads if possible
+            if(dblks_rem() == 0 && _rrfc.is_compl() && !_rrfc.is_wr_aio_outstanding())
+            {
+                if (_jc->unflushed_dblks() > 0)
+                    _jc->flush();
+                else if (!_aio_evt_rem)
+                    return RHM_IORES_EMPTY;
+            }
+        }
+        if (_page_cb_arr[_pg_index]._state != AIO_COMPLETE)
+        {
+            aio_cycle();
+            return RHM_IORES_PAGE_AIOWAIT;
+        }
+        void* rptr = (void*)((char*)_page_ptr_arr[_pg_index] + (_pg_offset_dblks * JRNL_DBLK_SIZE));
+        std::memcpy(&_hdr, rptr, sizeof(rec_hdr));
+        switch (_hdr._magic)
+        {
+            case RHM_JDAT_ENQ_MAGIC:
+            {
+                _enq_rec.reset(); // sets enqueue rec size
+                // Check if RID of this rec is still enqueued, if so read it, else skip
+                bool is_enq = false;
+                int16_t fid = _emap.get_pfid(_hdr._rid);
+                if (fid < enq_map::EMAP_OK)
+                {
+                    bool enforce_txns = !_jc->is_read_only() && !ignore_pending_txns;
+                    // Block read for transactionally locked record (only when not recovering)
+                    if (fid == enq_map::EMAP_LOCKED && enforce_txns)
+                        return RHM_IORES_TXPENDING;
+
+                    // (Recover mode only) Ok, not in emap - now search tmap, if present then read
+                    is_enq = _tmap.is_enq(_hdr._rid);
+                    if (enforce_txns && is_enq)
+                        return RHM_IORES_TXPENDING;
+                }
+                else
+                    is_enq = true;
+
+                if (is_enq) // ok, this record is enqueued, check it, then read it...
+                {
+                    if (dtokp->rid())
+                    {
+                        if (_hdr._rid != dtokp->rid())
+                        {
+                            std::ostringstream oss;
+                            oss << std::hex << "rid=0x" << _hdr._rid << "; dtok_rid=0x" << dtokp->rid()
+                                << "; dtok_id=0x" << dtokp->id();
+                            throw jexception(jerrno::JERR_RMGR_RIDMISMATCH, oss.str(), "rmgr", "read");
+                        }
+                    }
+                    else
+                        dtokp->set_rid(_hdr._rid);
+
+// TODO: Add member _fid to pmgr::page_cb which indicates the fid from which this page was
+// populated. When this value is set in wmgr::flush() somewehere, then uncomment the following
+// check:
+//                     if (fid != _page_cb_arr[_pg_index]._fid)
+//                     {
+//                         std::ostringstream oss;
+//                         oss << std::hex << std::setfill('0');
+//                         oss << "rid=0x" << std::setw(16) << _hdr._rid;
+//                         oss << "; emap_fid=0x" << std::setw(4) << fid;
+//                         oss << "; current_fid=" << _rrfc.fid();
+//                         throw jexception(jerrno::JERR_RMGR_FIDMISMATCH, oss.str(), "rmgr",
+//                              "read");
+//                     }
+
+                    const iores res = read_enq(_hdr, rptr, dtokp);
+                    dsize = _enq_rec.get_data(datapp);
+                    xidsize = _enq_rec.get_xid(xidpp);
+                    transient = _enq_rec.is_transient();
+                    external = _enq_rec.is_external();
+                    return res;
+                }
+                else // skip this record, it is already dequeued
+                    consume_xid_rec(_hdr, rptr, dtokp);
+                break;
+            }
+            case RHM_JDAT_DEQ_MAGIC:
+                consume_xid_rec(_hdr, rptr, dtokp);
+                break;
+            case RHM_JDAT_TXA_MAGIC:
+                consume_xid_rec(_hdr, rptr, dtokp);
+                break;
+            case RHM_JDAT_TXC_MAGIC:
+                consume_xid_rec(_hdr, rptr, dtokp);
+                break;
+            case RHM_JDAT_EMPTY_MAGIC:
+                consume_filler();
+                break;
+            default:
+                return RHM_IORES_EMPTY;
+        }
+    }
+}
+
+int32_t
+rmgr::get_events(page_state state, timespec* const timeout, bool flush)
+{
+    if (_aio_evt_rem == 0) // no events to get
+        return 0;
+
+    int32_t ret;
+    if ((ret = aio::getevents(_ioctx, flush ? _aio_evt_rem : 1, _aio_evt_rem/*_cache_num_pages + _jc->num_jfiles()*/, _aio_event_arr, timeout)) < 0)
+    {
+        if (ret == -EINTR) // Interrupted by signal
+            return 0;
+        std::ostringstream oss;
+        oss << "io_getevents() failed: " << std::strerror(-ret) << " (" << ret << ")";
+        throw jexception(jerrno::JERR__AIO, oss.str(), "rmgr", "get_events");
+    }
+    if (ret == 0 && timeout)
+        return jerrno::AIO_TIMEOUT;
+
+    std::vector<u_int16_t> pil;
+    pil.reserve(ret);
+    for (int i=0; i<ret; i++) // Index of returned AIOs
+    {
+        if (_aio_evt_rem == 0)
+        {
+            std::ostringstream oss;
+            oss << "_aio_evt_rem; evt " << (i + 1) << " of " << ret;
+            throw jexception(jerrno::JERR__UNDERFLOW, oss.str(), "rmgr", "get_events");
+        }
+        _aio_evt_rem--;
+        aio_cb* aiocbp = _aio_event_arr[i].obj; // This I/O control block (iocb)
+        page_cb* pcbp = (page_cb*)(aiocbp->data); // This page control block (pcb)
+        long aioret = (long)_aio_event_arr[i].res;
+        if (aioret < 0)
+        {
+            std::ostringstream oss;
+            oss << "AIO read operation failed: " << std::strerror(-aioret) << " (" << aioret << ")";
+            oss << " [pg=" << pcbp->_index << " buf=" << aiocbp->u.c.buf;
+            oss << " rsize=0x" << std::hex << aiocbp->u.c.nbytes;
+            oss << " offset=0x" << aiocbp->u.c.offset << std::dec;
+            oss << " fh=" << aiocbp->aio_fildes << "]";
+            throw jexception(jerrno::JERR__AIO, oss.str(), "rmgr", "get_events");
+        }
+
+        if (pcbp) // Page reads have pcb
+        {
+            if (pcbp->_rfh->rd_subm_cnt_dblks() >= JRNL_SBLK_SIZE) // Detects if write reset of this fcntl obj has occurred.
+            {
+                // Increment the completed read offset
+                // NOTE: We cannot use _rrfc here, as it may have rotated since submitting count.
+                // Use stored pointer to fcntl in the pcb instead.
+                pcbp->_rdblks = aiocbp->u.c.nbytes / JRNL_DBLK_SIZE;
+                pcbp->_rfh->add_rd_cmpl_cnt_dblks(pcbp->_rdblks);
+                pcbp->_state = state;
+                pil[i] = pcbp->_index;
+            }
+        }
+        else // File header reads have no pcb
+        {
+            std::memcpy(&_fhdr, _fhdr_buffer, sizeof(file_hdr));
+            _rrfc.add_cmpl_cnt_dblks(JRNL_SBLK_SIZE);
+
+            u_int32_t fro_dblks = (_fhdr._fro / JRNL_DBLK_SIZE) - JRNL_SBLK_SIZE;
+            // Check fro_dblks does not exceed the write pointers which can happen in some corrupted journal recoveries
+            if (fro_dblks > _jc->wr_subm_cnt_dblks(_fhdr._pfid) - JRNL_SBLK_SIZE)
+                fro_dblks = _jc->wr_subm_cnt_dblks(_fhdr._pfid) - JRNL_SBLK_SIZE;
+            _pg_cntr = fro_dblks / (JRNL_RMGR_PAGE_SIZE * JRNL_SBLK_SIZE);
+            u_int32_t tot_pg_offs_dblks = _pg_cntr * JRNL_RMGR_PAGE_SIZE * JRNL_SBLK_SIZE;
+            _pg_index = _pg_cntr % JRNL_RMGR_PAGES;
+            _pg_offset_dblks = fro_dblks - tot_pg_offs_dblks;
+            _rrfc.add_subm_cnt_dblks(tot_pg_offs_dblks);
+            _rrfc.add_cmpl_cnt_dblks(tot_pg_offs_dblks);
+
+            _fhdr_rd_outstanding = false;
+            _rrfc.set_valid();
+        }
+    }
+
+    // Perform AIO return callback
+    if (_cbp && ret)
+        _cbp->rd_aio_cb(pil);
+    return ret;
+}
+
+void
+rmgr::recover_complete()
+{}
+
+void
+rmgr::invalidate()
+{
+    if (_rrfc.is_valid())
+        _rrfc.set_invalid();
+}
+
+void
+rmgr::flush(timespec* timeout)
+{
+    // Wait for any outstanding AIO read operations to complete before synchronizing
+    while (_aio_evt_rem)
+    {
+        if (get_events(AIO_COMPLETE, timeout) == jerrno::AIO_TIMEOUT) // timed out, nothing returned
+        {
+            throw jexception(jerrno::JERR__TIMEOUT,
+                            "Timed out waiting for outstanding read aio to return", "rmgr", "init_validation");
+        }
+    }
+
+    // Reset all read states and pointers
+    for (int i=0; i<_cache_num_pages; i++)
+        _page_cb_arr[i]._state = UNUSED;
+    _rrfc.unset_findex();
+    _pg_index = 0;
+    _pg_offset_dblks = 0;
+}
+
+bool
+rmgr::wait_for_validity(timespec* timeout, const bool throw_on_timeout)
+{
+    bool timed_out = false;
+    while (!_rrfc.is_valid() && !timed_out)
+    {
+        timed_out = get_events(AIO_COMPLETE, timeout) == jerrno::AIO_TIMEOUT;
+        if (timed_out && throw_on_timeout)
+            throw jexception(jerrno::JERR__TIMEOUT, "Timed out waiting for read validity", "rmgr", "wait_for_validity");
+    }
+    return _rrfc.is_valid();
+}
+
+iores
+rmgr::pre_read_check(data_tok* dtokp)
+{
+    if (_aio_evt_rem)
+        get_events(AIO_COMPLETE, 0);
+
+    if (!_rrfc.is_valid())
+        return RHM_IORES_RCINVALID;
+
+    // block reads until outstanding file header read completes as fro is needed to read
+    if (_fhdr_rd_outstanding)
+        return RHM_IORES_PAGE_AIOWAIT;
+
+    if(dblks_rem() == 0 && _rrfc.is_compl() && !_rrfc.is_wr_aio_outstanding())
+    {
+        aio_cycle();   // check if any AIOs have returned
+        if(dblks_rem() == 0 && _rrfc.is_compl() && !_rrfc.is_wr_aio_outstanding())
+        {
+            if (_jc->unflushed_dblks() > 0)
+                _jc->flush();
+            else if (!_aio_evt_rem)
+                return RHM_IORES_EMPTY;
+        }
+    }
+
+    // Check write state of this token is ENQ - required for read
+    if (dtokp)
+    {
+        if (!dtokp->is_readable())
+        {
+            std::ostringstream oss;
+            oss << std::hex << std::setfill('0');
+            oss << "dtok_id=0x" << std::setw(8) << dtokp->id();
+            oss << "; dtok_rid=0x" << std::setw(16) << dtokp->rid();
+            oss << "; dtok_wstate=" << dtokp->wstate_str();
+            throw jexception(jerrno::JERR_RMGR_ENQSTATE, oss.str(), "rmgr", "pre_read_check");
+        }
+    }
+
+    return RHM_IORES_SUCCESS;
+}
+
+iores
+rmgr::read_enq(rec_hdr& h, void* rptr, data_tok* dtokp)
+{
+    if (_page_cb_arr[_pg_index]._state != AIO_COMPLETE)
+    {
+        aio_cycle();   // check if any AIOs have returned
+        return RHM_IORES_PAGE_AIOWAIT;
+    }
+
+    // Read data from this page, first block will have header and data size.
+    u_int32_t dblks_rd = _enq_rec.decode(h, rptr, dtokp->dblocks_read(), dblks_rem());
+    dtokp->incr_dblocks_read(dblks_rd);
+
+    _pg_offset_dblks += dblks_rd;
+
+    // If data still incomplete, move to next page and decode again
+    while (dtokp->dblocks_read() < _enq_rec.rec_size_dblks())
+    {
+        rotate_page();
+        if (_page_cb_arr[_pg_index]._state != AIO_COMPLETE)
+        {
+            dtokp->set_rstate(data_tok::READ_PART);
+            dtokp->set_dsize(_enq_rec.data_size());
+            return RHM_IORES_PAGE_AIOWAIT;
+        }
+
+        rptr = (void*)((char*)_page_ptr_arr[_pg_index]);
+        dblks_rd = _enq_rec.decode(h, rptr, dtokp->dblocks_read(), dblks_rem());
+        dtokp->incr_dblocks_read(dblks_rd);
+        _pg_offset_dblks += dblks_rd;
+    }
+
+    // If we have finished with this page, rotate it
+    if (dblks_rem() == 0)
+        rotate_page();
+
+    // Set the record size in dtokp
+    dtokp->set_rstate(data_tok::READ);
+    dtokp->set_dsize(_enq_rec.data_size());
+    return RHM_IORES_SUCCESS;
+}
+
+void
+rmgr::consume_xid_rec(rec_hdr& h, void* rptr, data_tok* dtokp)
+{
+    if (h._magic == RHM_JDAT_ENQ_MAGIC)
+    {
+        enq_hdr ehdr;
+        std::memcpy(&ehdr, rptr, sizeof(enq_hdr));
+        if (ehdr.is_external())
+            dtokp->set_dsize(ehdr._xidsize + sizeof(enq_hdr) + sizeof(rec_tail));
+        else
+            dtokp->set_dsize(ehdr._xidsize + ehdr._dsize + sizeof(enq_hdr) + sizeof(rec_tail));
+    }
+    else if (h._magic == RHM_JDAT_DEQ_MAGIC)
+    {
+        deq_hdr dhdr;
+        std::memcpy(&dhdr, rptr, sizeof(deq_hdr));
+        if (dhdr._xidsize)
+            dtokp->set_dsize(dhdr._xidsize + sizeof(deq_hdr) + sizeof(rec_tail));
+        else
+            dtokp->set_dsize(sizeof(deq_hdr));
+    }
+    else if (h._magic == RHM_JDAT_TXA_MAGIC || h._magic == RHM_JDAT_TXC_MAGIC)
+    {
+        txn_hdr thdr;
+        std::memcpy(&thdr, rptr, sizeof(txn_hdr));
+        dtokp->set_dsize(thdr._xidsize + sizeof(txn_hdr) + sizeof(rec_tail));
+    }
+    else
+    {
+        std::ostringstream oss;
+        oss << "Record type found = \"" << (char*)&h._magic << "\"";
+        throw jexception(jerrno::JERR_RMGR_BADRECTYPE, oss.str(), "rmgr", "consume_xid_rec");
+    }
+    dtokp->set_dblocks_read(0);
+    skip(dtokp);
+}
+
+void
+rmgr::consume_filler()
+{
+    // Filler (Magic "RHMx") is one dblk by definition
+    _pg_offset_dblks++;
+    if (dblks_rem() == 0)
+        rotate_page();
+}
+
+iores
+rmgr::skip(data_tok* dtokp)
+{
+    u_int32_t dsize_dblks = jrec::size_dblks(dtokp->dsize());
+    u_int32_t tot_dblk_cnt = dtokp->dblocks_read();
+    while (true)
+    {
+        u_int32_t this_dblk_cnt = 0;
+        if (dsize_dblks - tot_dblk_cnt > dblks_rem())
+            this_dblk_cnt = dblks_rem();
+        else
+            this_dblk_cnt = dsize_dblks - tot_dblk_cnt;
+        if (this_dblk_cnt)
+        {
+            dtokp->incr_dblocks_read(this_dblk_cnt);
+            _pg_offset_dblks += this_dblk_cnt;
+            tot_dblk_cnt += this_dblk_cnt;
+        }
+        // If skip still incomplete, move to next page and decode again
+        if (tot_dblk_cnt < dsize_dblks)
+        {
+            if (dblks_rem() == 0)
+                rotate_page();
+            if (_page_cb_arr[_pg_index]._state != AIO_COMPLETE)
+            {
+                dtokp->set_rstate(data_tok::SKIP_PART);
+                return RHM_IORES_PAGE_AIOWAIT;
+            }
+        }
+        else
+        {
+            // Skip complete, put state back to unread
+            dtokp->set_rstate(data_tok::UNREAD);
+            dtokp->set_dsize(0);
+            dtokp->set_dblocks_read(0);
+
+            // If we have finished with this page, rotate it
+            if (dblks_rem() == 0)
+                rotate_page();
+            return RHM_IORES_SUCCESS;
+        }
+    }
+}
+
+iores
+rmgr::aio_cycle()
+{
+    // Perform validity checks
+    if (_fhdr_rd_outstanding) // read of file header still outstanding in aio
+        return RHM_IORES_SUCCESS;
+    if (!_rrfc.is_valid())
+    {
+        // Flush and reset all read states and pointers
+        flush(&jcntl::_aio_cmpl_timeout);
+
+        _jc->get_earliest_fid(); // determine initial file to read; calls _rrfc.set_findex() to set value
+        // If this file has not yet been written to, return RHM_IORES_EMPTY
+        if (_rrfc.is_void() && !_rrfc.is_wr_aio_outstanding())
+            return RHM_IORES_EMPTY;
+        init_file_header_read(); // send off AIO read request for file header
+        return RHM_IORES_SUCCESS;
+    }
+
+    int16_t first_uninit = -1;
+    u_int16_t num_uninit = 0;
+    u_int16_t num_compl = 0;
+    bool outstanding = false;
+    // Index must start with current buffer and cycle around so that first
+    // uninitialized buffer is initialized first
+    for (u_int16_t i=_pg_index; i<_pg_index+_cache_num_pages; i++)
+    {
+        int16_t ci = i % _cache_num_pages;
+        switch (_page_cb_arr[ci]._state)
+        {
+            case UNUSED:
+                if (first_uninit < 0)
+                    first_uninit = ci;
+                num_uninit++;
+                break;
+            case IN_USE:
+                break;
+            case AIO_PENDING:
+                outstanding = true;
+                break;
+            case AIO_COMPLETE:
+                num_compl++;
+                break;
+            default:;
+        }
+    }
+    iores res = RHM_IORES_SUCCESS;
+    if (num_uninit)
+        res = init_aio_reads(first_uninit, num_uninit);
+    else if (num_compl == _cache_num_pages) // This condition exists after invalidation
+        res = init_aio_reads(0, _cache_num_pages);
+    if (outstanding)
+        get_events(AIO_COMPLETE, 0);
+    return res;
+}
+
+iores
+rmgr::init_aio_reads(const int16_t first_uninit, const u_int16_t num_uninit)
+{
+    for (int16_t i=0; i<num_uninit; i++)
+    {
+        if (_rrfc.is_void()) // Nothing to do; this file not yet written to
+            break;
+
+        if (_rrfc.subm_offs() == 0)
+        {
+            _rrfc.add_subm_cnt_dblks(JRNL_SBLK_SIZE);
+            _rrfc.add_cmpl_cnt_dblks(JRNL_SBLK_SIZE);
+        }
+
+        // TODO: Future perf improvement: Do a single AIO read for all available file
+        // space into all contiguous empty pages in one AIO operation.
+
+        u_int32_t file_rem_dblks = _rrfc.remaining_dblks();
+        file_rem_dblks -= file_rem_dblks % JRNL_SBLK_SIZE; // round down to closest sblk boundary
+        u_int32_t pg_size_dblks = JRNL_RMGR_PAGE_SIZE * JRNL_SBLK_SIZE;
+        u_int32_t rd_size = file_rem_dblks > pg_size_dblks ? pg_size_dblks : file_rem_dblks;
+        if (rd_size)
+        {
+            int16_t pi = (i + first_uninit) % _cache_num_pages;
+            // TODO: For perf, combine contiguous pages into single read
+            //   1 or 2 AIOs needed depending on whether read block folds
+            aio_cb* aiocbp = &_aio_cb_arr[pi];
+            aio::prep_pread_2(aiocbp, _rrfc.fh(), _page_ptr_arr[pi], rd_size * JRNL_DBLK_SIZE, _rrfc.subm_offs());
+            if (aio::submit(_ioctx, 1, &aiocbp) < 0)
+                throw jexception(jerrno::JERR__AIO, "rmgr", "init_aio_reads");
+            _rrfc.add_subm_cnt_dblks(rd_size);
+            _aio_evt_rem++;
+            _page_cb_arr[pi]._state = AIO_PENDING;
+            _page_cb_arr[pi]._rfh = _rrfc.file_controller();
+        }
+        else // If there is nothing to read for this page, neither will there be for the others...
+            break;
+        if (_rrfc.file_rotate())
+            _rrfc.rotate();
+    }
+    return RHM_IORES_SUCCESS;
+}
+
+void
+rmgr::rotate_page()
+{
+    _page_cb_arr[_pg_index]._rdblks = 0;
+    _page_cb_arr[_pg_index]._state = UNUSED;
+    if (_pg_offset_dblks >= JRNL_RMGR_PAGE_SIZE * JRNL_SBLK_SIZE)
+    {
+        _pg_offset_dblks = 0;
+        _pg_cntr++;
+    }
+    if (++_pg_index >= _cache_num_pages)
+        _pg_index = 0;
+    aio_cycle();
+    _pg_offset_dblks = 0;
+    // This counter is for bookkeeping only, page rotates are handled directly in init_aio_reads()
+    // FIXME: _pg_cntr should be sync'd with aio ops, not use of page as it is now...
+    // Need to move reset into if (_rrfc.file_rotate()) above.
+    if (_pg_cntr >= (_jc->jfsize_sblks() / JRNL_RMGR_PAGE_SIZE))
+        _pg_cntr = 0;
+}
+
+u_int32_t
+rmgr::dblks_rem() const
+{
+    return _page_cb_arr[_pg_index]._rdblks - _pg_offset_dblks;
+}
+
+void
+rmgr::set_params_null(void** const datapp, std::size_t& dsize, void** const xidpp, std::size_t& xidsize)
+{
+    *datapp = 0;
+    dsize = 0;
+    *xidpp = 0;
+    xidsize = 0;
+}
+
+void
+rmgr::init_file_header_read()
+{
+    _jc->fhdr_wr_sync(_rrfc.index()); // wait if the file header write is outstanding
+    int rfh = _rrfc.fh();
+    aio::prep_pread_2(_fhdr_aio_cb_ptr, rfh, _fhdr_buffer, _sblksize, 0);
+    if (aio::submit(_ioctx, 1, &_fhdr_aio_cb_ptr) < 0)
+        throw jexception(jerrno::JERR__AIO, "rmgr", "init_file_header_read");
+    _aio_evt_rem++;
+    _rrfc.add_subm_cnt_dblks(JRNL_SBLK_SIZE);
+    _fhdr_rd_outstanding = true;
+}
+
+/* TODO (sometime in the future)
+const iores
+rmgr::get(const u_int64_t& rid, const std::size_t& dsize, const std::size_t& dsize_avail,
+        const void** const data, bool auto_discard)
+{
+    return RHM_IORES_SUCCESS;
+}
+
+const iores
+rmgr::discard(data_tok* dtokp)
+{
+    return RHM_IORES_SUCCESS;
+}
+*/
+
+} // namespace journal
+} // namespace mrg

Propchange: qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/rmgr.cpp
------------------------------------------------------------------------------
    svn:eol-style = native

Added: qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/rmgr.h
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/rmgr.h?rev=1501895&view=auto
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/rmgr.h (added)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/rmgr.h Wed Jul 10 18:20:19 2013
@@ -0,0 +1,114 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+/**
+ * \file rmgr.h
+ *
+ * Qpid asynchronous store plugin library
+ *
+ * File containing code for class mrg::journal::rmgr (read manager). See
+ * class documentation for details.
+ *
+ * \author Kim van der Riet
+ */
+
+#ifndef QPID_LEGACYSTORE_JRNL_RMGR_H
+#define QPID_LEGACYSTORE_JRNL_RMGR_H
+
+namespace mrg
+{
+namespace journal
+{
+class rmgr;
+}
+}
+
+#include <cstring>
+#include "qpid/legacystore/jrnl/enums.h"
+#include "qpid/legacystore/jrnl/file_hdr.h"
+#include "qpid/legacystore/jrnl/pmgr.h"
+#include "qpid/legacystore/jrnl/rec_hdr.h"
+#include "qpid/legacystore/jrnl/rrfc.h"
+
+namespace mrg
+{
+namespace journal
+{
+
+    /**
+    * \brief Class for managing a read page cache of arbitrary size and number of pages.
+    *
+    * The read page cache works on the principle of filling as many pages as possilbe in advance of
+    * reading the data. This ensures that delays caused by AIO operations are minimized.
+    */
+    class rmgr : public pmgr
+    {
+    private:
+        rrfc& _rrfc;                ///< Ref to read rotating file controller
+        rec_hdr _hdr;               ///< Header used to determind record type
+
+        void* _fhdr_buffer;         ///< Buffer used for fhdr reads
+        aio_cb* _fhdr_aio_cb_ptr;   ///< iocb pointer for fhdr reads
+        file_hdr _fhdr;             ///< file header instance for reading file headers
+        bool _fhdr_rd_outstanding;  ///< true if a fhdr read is outstanding
+
+    public:
+        rmgr(jcntl* jc, enq_map& emap, txn_map& tmap, rrfc& rrfc);
+        virtual ~rmgr();
+
+        using pmgr::initialize;
+        void initialize(aio_callback* const cbp);
+        iores read(void** const datapp, std::size_t& dsize, void** const xidpp,
+                std::size_t& xidsize, bool& transient, bool& external, data_tok* dtokp,
+                bool ignore_pending_txns);
+        int32_t get_events(page_state state, timespec* const timeout, bool flush = false);
+        void recover_complete();
+        inline iores synchronize() { if (_rrfc.is_valid()) return RHM_IORES_SUCCESS; return aio_cycle(); }
+        void invalidate();
+        bool wait_for_validity(timespec* const timeout, const bool throw_on_timeout = false);
+
+        /* TODO (if required)
+        const iores get(const u_int64_t& rid, const std::size_t& dsize, const std::size_t& dsize_avail,
+                const void** const data, bool auto_discard);
+        const iores discard(data_tok* dtok);
+        */
+
+    private:
+        void clean();
+        void flush(timespec* timeout);
+        iores pre_read_check(data_tok* dtokp);
+        iores read_enq(rec_hdr& h, void* rptr, data_tok* dtokp);
+        void consume_xid_rec(rec_hdr& h, void* rptr, data_tok* dtokp);
+        void consume_filler();
+        iores skip(data_tok* dtokp);
+        iores aio_cycle();
+        iores init_aio_reads(const int16_t first_uninit, const u_int16_t num_uninit);
+        void rotate_page();
+        u_int32_t dblks_rem() const;
+        void set_params_null(void** const datapp, std::size_t& dsize, void** const xidpp,
+                std::size_t& xidsize);
+        void init_file_header_read();
+    };
+
+} // namespace journal
+} // namespace mrg
+
+#endif // ifndef QPID_LEGACYSTORE_JRNL_RMGR_H

Propchange: qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/rmgr.h
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/rmgr.h
------------------------------------------------------------------------------
    svn:keywords = Author Date Id Rev URL



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org