You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by tr...@apache.org on 2013/07/10 20:20:20 UTC
svn commit: r1501895 [8/10] - in /qpid/branches/linearstore/qpid/cpp/src: ./
qpid/linearstore/ qpid/linearstore/jrnl/
Added: qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/lpmgr.h
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/lpmgr.h?rev=1501895&view=auto
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/lpmgr.h (added)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/lpmgr.h Wed Jul 10 18:20:19 2013
@@ -0,0 +1,303 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+/**
+ * \file lpmgr.h
+ *
+ * Qpid asynchronous store plugin library
+ *
+ * Class mrg::journal::lpmgr. See class documentation for details.
+ *
+ * \author Kim van der Riet
+ */
+
+#ifndef QPID_LEGACYSTORE_JRNL_LPMGR_H
+#define QPID_LEGACYSTORE_JRNL_LPMGR_H
+
+namespace mrg
+{
+namespace journal
+{
+ class jcntl;
+ class lpmgr;
+}
+}
+
+#include "qpid/legacystore/jrnl/fcntl.h"
+#include <vector>
+
+namespace mrg
+{
+namespace journal
+{
+
+ /**
+ * \brief LFID-PFID manager. This class maps the logical file id (lfid) to the physical file id (pfid) so that files
+ * may be inserted into the file ring buffer in (nearly) arbitrary logical locations while the physical ids continue
+ * to be appended. NOTE: NOT THREAD SAFE.
+ *
+ * The entire functionality of the LFID-PFID manager is to maintain an array of pointers to fcntl objects which have
+ * a one-to-one relationship to the physical %journal files. The logical file id (lfid) is used as an index to the
+ * array to read the mapped physical file id (pfid). By altering the order of these pointers within the array, the
+ * mapping of logical to physical files may be altered. This can be used to allow for the logical insertion of
+ * %journal files into a ring buffer, even though the physical file ids must be appended to those that preceded them.
+ *
+ * Since the insert() operation uses after-lfid as its position parameter, it is not possible to insert before lfid
+ * 0 - i.e. It is only possible to insert after an existing lfid. Consequently, lfid 0 and pfid 0 are always
+ * coincident in a %journal. Note, however, that inserting before lfid 0 is logically equivilent to inserting after
+ * the last lfid.
+ *
+ * When one or more files are inserted after a particular lfid, the lfids of the following files are incremented. The
+ * pfids of the inserted files follow those of all existing files, thus leading to a lfid-pfid discreppancy (ie no
+ * longer a one-to-one mapping):
+ *
+ * Example: Before insertion, %journal file headers would look as follows:
+ * <pre>
+ * Logical view (sorted by lfid): Physical view (sorted by pfid):
+ * +---+---+---+---+---+---+ +---+---+---+---+---+---+
+ * pfid --> | 0 | 1 | 2 | 3 | 4 | 5 | pfid --> | 0 | 1 | 2 | 3 | 4 | 5 |
+ * lfid --> | 0 | 1 | 2 | 3 | 4 | 5 | lfid --> | 0 | 1 | 2 | 3 | 4 | 5 |
+ * +---+---+---+---+---+---+ +---+---+---+---+---+---+
+ * </pre>
+ *
+ * After insertion of 2 files after lid 2 (marked with *s):
+ * <pre>
+ * Logical view (sorted by lfid): Physical view (sorted by pfid):
+ * +---+---+---+---+---+---+---+---+ +---+---+---+---+---+---+---+---+
+ * pfid --> | 0 | 1 | 2 |*6*|*7*| 3 | 4 | 5 | pfid --> | 0 | 1 | 2 | 3 | 4 | 5 |*6*|*7*|
+ * lfid --> | 0 | 1 | 2 |*3*|*4*| 5 | 6 | 7 | lfid --> | 0 | 1 | 2 | 5 | 6 | 7 |*3*|*4*|
+ * +---+---+---+---+---+---+---+---+ +---+---+---+---+---+---+---+---+
+ * </pre>
+ *
+ * The insert() function updates the internal map immediately, but the physical files (which have both the pfid and
+ * lfid written into the file header) are only updated as they are overwritten in the normal course of enqueueing
+ * and dequeueing messages. If the %journal should fail after insertion but before the files following those inserted
+ * are overwritten, then duplicate lfids will be present (though no duplicate pfids are possible). The overwrite
+ * indicator (owi) flag and the pfid numbers may be used to resolve the ambiguity and determine the logically earlier
+ * lfid in this case.
+ *
+ * Example: Before insertion, the current active write file being lfid/pfid 2 as determined by the owi flag, %journal
+ * file headers would look as follows:
+ * <pre>
+ * Logical view (sorted by lfid): Physical view (sorted by pfid):
+ * +---+---+---+---+---+---+ +---+---+---+---+---+---+
+ * pfid --> | 0 | 1 | 2 | 3 | 4 | 5 | pfid --> | 0 | 1 | 2 | 3 | 4 | 5 |
+ * lfid --> | 0 | 1 | 2 | 3 | 4 | 5 | lfid --> | 0 | 1 | 2 | 3 | 4 | 5 |
+ * owi --> | t | t | t | f | f | f | owi --> | t | t | t | f | f | f |
+ * +---+---+---+---+---+---+ +---+---+---+---+---+---+
+ * </pre>
+ *
+ * After inserting 2 files after lfid 2 and then 3 (the newly inserted file) - marked with *s:
+ * <pre>
+ * Logical view (sorted by lfid): Physical view (sorted by pfid):
+ * +---+---+---+---+---+---+---+---+ +---+---+---+---+---+---+---+---+
+ * pfid --> | 0 | 1 | 2 |*6*|*7*| 3 | 4 | 5 | pfid --> | 0 | 1 | 2 | 3 | 4 | 5 |*3*|*4*|
+ * lfid --> | 0 | 1 | 2 |*3*|*4*| 3 | 4 | 5 | lfid --> | 0 | 1 | 2 | 3 | 4 | 5 |*3*|*4*|
+ * owi --> | t | t | t | t | t | f | f | f | owi --> | t | t | t | f | f | f | t | t |
+ * +---+---+---+---+---+---+---+---+ +---+---+---+---+---+---+---+---+
+ * </pre>
+ *
+ * If a broker failure occurs at this point, then there are two independent tests that may be made to resolve
+ * duplicate lfids during recovery in such cases:
+ * <ol>
+ * <li>The correct lfid has owi flag that matches that of pfid/lfid 0</li>
+ * <li>The most recently inserted (hence correct) lfid has pfids that are higher than the duplicate that was not
+ * overwritten</li>
+ * </ol>
+ *
+ * NOTE: NOT THREAD SAFE. Provide external thread protection if used in multi-threaded environments.
+ */
+ class lpmgr
+ {
+ public:
+ /**
+ * \brief Function pointer to function that will create a new fcntl object and return its pointer.
+ *
+ * \param jcp Pointer to jcntl instance from which journal file details will be obtained.
+ * \param lfid Logical file ID for new fcntl instance.
+ * \param pfid Physical file ID for file associated with new fcntl instance.
+ * \param rdp Pointer to rcvdat instance which conatins recovery information for new fcntl instance when
+ * recovering an existing file, or null if a new file is to be created.
+ */
+ typedef fcntl* (new_obj_fn_ptr)(jcntl* const jcp,
+ const u_int16_t lfid,
+ const u_int16_t pfid,
+ const rcvdat* const rdp);
+
+ private:
+ bool _ae; ///< Auto-expand mode
+ u_int16_t _ae_max_jfiles; ///< Max file count for auto-expansion; 0 = no limit
+ std::vector<fcntl*> _fcntl_arr; ///< Array of pointers to fcntl objects
+
+ public:
+ lpmgr();
+ virtual ~lpmgr();
+
+ /**
+ * \brief Initialize from scratch for a known number of %journal files. All lfid values are identical to pfid
+ * values (which is normal before any inserts have occurred).
+ *
+ * \param num_jfiles Number of files to be created, and consequently the number of fcntl objects in array
+ * _fcntl_arr.
+ * \param ae If true, allows auto-expansion; if false, disables auto-expansion.
+ * \param ae_max_jfiles The maximum number of files allowed for auto-expansion. Cannot be lower than the current
+ * number of files. However, a zero value disables the limit checks, and allows unlimited
+ * expansion.
+ * \param jcp Pointer to jcntl instance. This is used to find the file path and base filename so that
+ * new files may be created.
+ * \param fp Pointer to function which creates and returns a pointer to a new fcntl object (and hence
+ * causes a new %journal file to be created).
+ */
+ void initialize(const u_int16_t num_jfiles,
+ const bool ae,
+ const u_int16_t ae_max_jfiles,
+ jcntl* const jcp,
+ new_obj_fn_ptr fp);
+
+ /**
+ * \brief Initialize from a known lfid-pfid map pfid_list (within rcvdat param rd), which is usually obtained
+ * from a recover. The index of pfid_list is the logical file id (lfid); the value contained in the vector is
+ * the physical file id (pfid).
+ *
+ * \param rd Ref to rcvdat struct which contains recovery data and the pfid_list.
+ * \param jcp Pointer to jcntl instance. This is used to find the file path and base filename so that
+ * new files may be created.
+ * \param fp Pointer to function which creates and returns a pointer to a new fcntl object (and hence
+ * causes a new %journal file to be created).
+ */
+ void recover(const rcvdat& rd,
+ jcntl* const jcp,
+ new_obj_fn_ptr fp);
+
+ /**
+ * \brief Insert num_jfiles files after lfid index after_lfid. This causes all lfids after after_lfid to be
+ * increased by num_jfiles.
+ *
+ * Note that it is not possible to insert <i>before</i> lfid 0, and thus lfid 0 should always point to pfid 0.
+ * Inserting before lfid 0 is logically equivilent to inserting after the last lfid in a circular buffer.
+ *
+ * \param after_lfid Lid index after which to insert file(s).
+ * \param jcp Pointer to jcntl instance. This is used to find the file path and base filename so that
+ * new files may be created.
+ * \param fp Pointer to function which creates and returns a pointer to a new fcntl object (and hence
+ * causes a new %journal file to be created).
+ * \param num_jfiles The number of files by which to increase.
+ */
+ void insert(const u_int16_t after_lfid,
+ jcntl* const jcp,
+ new_obj_fn_ptr fp,
+ const u_int16_t num_jfiles = 1);
+
+ /**
+ * \brief Clears _fcntl_arr and deletes all fcntl instances.
+ */
+ void finalize();
+
+ /**
+ * \brief Returns true if initialized; false otherwise. After construction, will return false until initialize()
+ * is called; thereafter true until finalize() is called, whereupon it will return false again.
+ *
+ * \return True if initialized; false otherwise.
+ */
+ inline bool is_init() const { return _fcntl_arr.size() > 0; }
+
+ /**
+ * \brief Returns true if auto-expand mode is enabled; false if not.
+ *
+ * \return True if auto-expand mode is enabled; false if not.
+ */
+ inline bool is_ae() const { return _ae; }
+
+ /**
+ * \brief Sets the auto-expand mode to enabled if ae is true, to disabled otherwise. The value of _ae_max_jfiles
+ * must be valid to succeed (i.e. _ae_max_jfiles must be greater than the current number of files or be zero).
+ *
+ * \param ae If true will enable auto-expand mode; if false will disable it.
+ */
+ void set_ae(const bool ae);
+
+ /**
+ * \brief Returns the number of %journal files, including any that were appended or inserted since
+ * initialization.
+ *
+ * \return Number of %journal files if initialized; 0 otherwise.
+ */
+ inline u_int16_t num_jfiles() const { return static_cast<u_int16_t>(_fcntl_arr.size()); }
+
+ /**
+ * \brief Returns the maximum number of files allowed for auto-expansion.
+ *
+ * \return Maximum number of files allowed for auto-expansion. A zero value represents a disabled limit
+ * - i.e. unlimited expansion.
+ */
+ inline u_int16_t ae_max_jfiles() const { return _ae_max_jfiles; }
+
+ /**
+ * \brief Sets the maximum number of files allowed for auto-expansion. A zero value disables the limit.
+ *
+ * \param ae_max_jfiles The maximum number of files allowed for auto-expansion. Cannot be lower than the current
+ * number of files. However, a zero value disables the limit checks, and allows unlimited
+ * expansion.
+ */
+ void set_ae_max_jfiles(const u_int16_t ae_max_jfiles);
+
+ /**
+ * \brief Calculates the number of future files available for auto-expansion.
+ *
+ * \return The number of future files available for auto-expansion.
+ */
+ u_int16_t ae_jfiles_rem() const;
+
+ /**
+ * \brief Get a pointer to fcntl instance for a given lfid.
+ *
+ * \return Pointer to fcntl object corresponding to logical file id lfid, or 0 if lfid is out of range
+ * (greater than number of files in use).
+ */
+ inline fcntl* get_fcntlp(const u_int16_t lfid) const
+ { if (lfid >= _fcntl_arr.size()) return 0; return _fcntl_arr[lfid]; }
+
+ // Testing functions
+ void get_pfid_list(std::vector<u_int16_t>& pfid_list) const;
+ void get_lfid_list(std::vector<u_int16_t>& lfid_list) const;
+
+ protected:
+
+ /**
+ * \brief Append num_jfiles files to the end of the logical and file id sequence. This is similar to extending
+ * the from-scratch initialization.
+ *
+ * \param jcp Pointer to jcntl instance. This is used to find the file path and base filename so that
+ * new files may be created.
+ * \param fp Pointer to function which creates and returns a pointer to a new fcntl object (and hence
+ * causes a new %journal file to be created).
+ * \param num_jfiles The number of files by which to increase.
+ */
+ void append(jcntl* const jcp,
+ new_obj_fn_ptr fp,
+ const u_int16_t num_jfiles = 1);
+
+ };
+
+} // namespace journal
+} // namespace mrg
+
+#endif // ifndef QPID_LEGACYSTORE_JRNL_LPMGR_H
Propchange: qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/lpmgr.h
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/lpmgr.h
------------------------------------------------------------------------------
svn:keywords = Author Date Id Rev URL
Added: qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/pmgr.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/pmgr.cpp?rev=1501895&view=auto
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/pmgr.cpp (added)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/pmgr.cpp Wed Jul 10 18:20:19 2013
@@ -0,0 +1,215 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+/**
+ * \file pmgr.cpp
+ *
+ * Qpid asynchronous store plugin library
+ *
+ * File containing code for class mrg::journal::pmgr (page manager). See
+ * comments in file pmgr.h for details.
+ *
+ * \author Kim van der Riet
+ */
+
+#include "qpid/legacystore/jrnl/pmgr.h"
+
+#include <cerrno>
+#include <cstdlib>
+#include <cstring>
+#include "qpid/legacystore/jrnl/jcfg.h"
+#include "qpid/legacystore/jrnl/jcntl.h"
+#include "qpid/legacystore/jrnl/jerrno.h"
+#include <sstream>
+
+
+namespace mrg
+{
+namespace journal
+{
+
+pmgr::page_cb::page_cb(u_int16_t index):
+ _index(index),
+ _state(UNUSED),
+ _wdblks(0),
+ _rdblks(0),
+ _pdtokl(0),
+ _wfh(0),
+ _rfh(0),
+ _pbuff(0)
+{}
+
+const char*
+pmgr::page_cb::state_str() const
+{
+ switch(_state)
+ {
+ case UNUSED:
+ return "UNUSED";
+ case IN_USE:
+ return "IN_USE";
+ case AIO_PENDING:
+ return "AIO_PENDING";
+ case AIO_COMPLETE:
+ return "AIO_COMPLETE";
+ }
+ return "<unknown>";
+}
+
+const u_int32_t pmgr::_sblksize = JRNL_SBLK_SIZE * JRNL_DBLK_SIZE;
+
+pmgr::pmgr(jcntl* jc, enq_map& emap, txn_map& tmap):
+ _cache_pgsize_sblks(0),
+ _cache_num_pages(0),
+ _jc(jc),
+ _emap(emap),
+ _tmap(tmap),
+ _page_base_ptr(0),
+ _page_ptr_arr(0),
+ _page_cb_arr(0),
+ _aio_cb_arr(0),
+ _aio_event_arr(0),
+ _ioctx(0),
+ _pg_index(0),
+ _pg_cntr(0),
+ _pg_offset_dblks(0),
+ _aio_evt_rem(0),
+ _cbp(0),
+ _enq_rec(),
+ _deq_rec(),
+ _txn_rec()
+{}
+
+pmgr::~pmgr()
+{
+ pmgr::clean();
+}
+
+void
+pmgr::initialize(aio_callback* const cbp, const u_int32_t cache_pgsize_sblks, const u_int16_t cache_num_pages)
+{
+ // As static use of this class keeps old values around, clean up first...
+ pmgr::clean();
+ _pg_index = 0;
+ _pg_cntr = 0;
+ _pg_offset_dblks = 0;
+ _aio_evt_rem = 0;
+ _cache_pgsize_sblks = cache_pgsize_sblks;
+ _cache_num_pages = cache_num_pages;
+ _cbp = cbp;
+
+ // 1. Allocate page memory (as a single block)
+ std::size_t cache_pgsize = _cache_num_pages * _cache_pgsize_sblks * _sblksize;
+ if (::posix_memalign(&_page_base_ptr, _sblksize, cache_pgsize))
+ {
+ clean();
+ std::ostringstream oss;
+ oss << "posix_memalign(): blksize=" << _sblksize << " size=" << cache_pgsize;
+ oss << FORMAT_SYSERR(errno);
+ throw jexception(jerrno::JERR__MALLOC, oss.str(), "pmgr", "initialize");
+ }
+ // 2. Allocate array of page pointers
+ _page_ptr_arr = (void**)std::malloc(_cache_num_pages * sizeof(void*));
+ MALLOC_CHK(_page_ptr_arr, "_page_ptr_arr", "pmgr", "initialize");
+
+ // 3. Allocate and initilaize page control block (page_cb) array
+ _page_cb_arr = (page_cb*)std::malloc(_cache_num_pages * sizeof(page_cb));
+ MALLOC_CHK(_page_cb_arr, "_page_cb_arr", "pmgr", "initialize");
+ std::memset(_page_cb_arr, 0, _cache_num_pages * sizeof(page_cb));
+
+ // 5. Allocate IO control block (iocb) array
+ _aio_cb_arr = (aio_cb*)std::malloc(_cache_num_pages * sizeof(aio_cb));
+ MALLOC_CHK(_aio_cb_arr, "_aio_cb_arr", "pmgr", "initialize");
+
+ // 6. Set page pointers in _page_ptr_arr, _page_cb_arr and iocbs to pages within page block
+ for (u_int16_t i=0; i<_cache_num_pages; i++)
+ {
+ _page_ptr_arr[i] = (void*)((char*)_page_base_ptr + _cache_pgsize_sblks * _sblksize * i);
+ _page_cb_arr[i]._index = i;
+ _page_cb_arr[i]._state = UNUSED;
+ _page_cb_arr[i]._pbuff = _page_ptr_arr[i];
+ _page_cb_arr[i]._pdtokl = new std::deque<data_tok*>;
+ _page_cb_arr[i]._pdtokl->clear();
+ _aio_cb_arr[i].data = (void*)&_page_cb_arr[i];
+ }
+
+ // 7. Allocate io_event array, max one event per cache page plus one for each file
+ const u_int16_t max_aio_evts = _cache_num_pages + _jc->num_jfiles();
+ _aio_event_arr = (aio_event*)std::malloc(max_aio_evts * sizeof(aio_event));
+ MALLOC_CHK(_aio_event_arr, "_aio_event_arr", "pmgr", "initialize");
+
+ // 8. Initialize AIO context
+ if (int ret = aio::queue_init(max_aio_evts, &_ioctx))
+ {
+ std::ostringstream oss;
+ oss << "io_queue_init() failed: " << FORMAT_SYSERR(-ret);
+ throw jexception(jerrno::JERR__AIO, oss.str(), "pmgr", "initialize");
+ }
+}
+
+void
+pmgr::clean()
+{
+ // clean up allocated memory here
+
+ if (_ioctx)
+ aio::queue_release(_ioctx);
+
+ std::free(_page_base_ptr);
+ _page_base_ptr = 0;
+
+ if (_page_cb_arr)
+ {
+ for (int i=0; i<_cache_num_pages; i++)
+ delete _page_cb_arr[i]._pdtokl;
+ std::free(_page_ptr_arr);
+ _page_ptr_arr = 0;
+ }
+
+ std::free(_page_cb_arr);
+ _page_cb_arr = 0;
+
+ std::free(_aio_cb_arr);
+ _aio_cb_arr = 0;
+
+ std::free(_aio_event_arr);
+ _aio_event_arr = 0;
+}
+
+const char*
+pmgr::page_state_str(page_state ps)
+{
+ switch (ps)
+ {
+ case UNUSED:
+ return "UNUSED";
+ case IN_USE:
+ return "IN_USE";
+ case AIO_PENDING:
+ return "AIO_PENDING";
+ case AIO_COMPLETE:
+ return "AIO_COMPLETE";
+ }
+ return "<page_state unknown>";
+}
+
+} // namespace journal
+} // namespace mrg
Propchange: qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/pmgr.cpp
------------------------------------------------------------------------------
svn:eol-style = native
Added: qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/pmgr.h
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/pmgr.h?rev=1501895&view=auto
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/pmgr.h (added)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/pmgr.h Wed Jul 10 18:20:19 2013
@@ -0,0 +1,142 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+/**
+ * \file pmgr.h
+ *
+ * Qpid asynchronous store plugin library
+ *
+ * File containing code for class mrg::journal::pmgr (page manager). See
+ * class documentation for details.
+ *
+ * \author Kim van der Riet
+ */
+
+#ifndef QPID_LEGACYSTORE_JRNL_PMGR_H
+#define QPID_LEGACYSTORE_JRNL_PMGR_H
+
+namespace mrg
+{
+namespace journal
+{
+ class pmgr;
+ class jcntl;
+}
+}
+
+#include <deque>
+#include "qpid/legacystore/jrnl/aio.h"
+#include "qpid/legacystore/jrnl/aio_callback.h"
+#include "qpid/legacystore/jrnl/data_tok.h"
+#include "qpid/legacystore/jrnl/deq_rec.h"
+#include "qpid/legacystore/jrnl/enq_map.h"
+#include "qpid/legacystore/jrnl/enq_rec.h"
+#include "qpid/legacystore/jrnl/fcntl.h"
+#include "qpid/legacystore/jrnl/txn_map.h"
+#include "qpid/legacystore/jrnl/txn_rec.h"
+
+namespace mrg
+{
+namespace journal
+{
+
+ /**
+ * \brief Abstract class for managing either read or write page cache of arbitrary size and
+ * number of cache_num_pages.
+ */
+ class pmgr
+ {
+ public:
+ /**
+ * \brief Enumeration of possible stats of a page within a page cache.
+ */
+ enum page_state
+ {
+ UNUSED, ///< A page is uninitialized, contains no data.
+ IN_USE, ///< Page is in use.
+ AIO_PENDING, ///< An AIO request outstanding.
+ AIO_COMPLETE ///< An AIO request is complete.
+ };
+
+ protected:
+ /**
+ * \brief Page control block, carries control and state information for each page in the
+ * cache.
+ */
+ struct page_cb
+ {
+ u_int16_t _index; ///< Index of this page
+ page_state _state; ///< Status of page
+ u_int64_t _frid; ///< First rid in page (used for fhdr init)
+ u_int32_t _wdblks; ///< Total number of dblks in page so far
+ u_int32_t _rdblks; ///< Total number of dblks in page
+ std::deque<data_tok*>* _pdtokl; ///< Page message tokens list
+ fcntl* _wfh; ///< File handle for incrementing write compl counts
+ fcntl* _rfh; ///< File handle for incrementing read compl counts
+ void* _pbuff; ///< Page buffer
+
+ page_cb(u_int16_t index); ///< Convenience constructor
+ const char* state_str() const; ///< Return state as string for this pcb
+ };
+
+ static const u_int32_t _sblksize; ///< Disk softblock size
+ u_int32_t _cache_pgsize_sblks; ///< Size of page cache cache_num_pages
+ u_int16_t _cache_num_pages; ///< Number of page cache cache_num_pages
+ jcntl* _jc; ///< Pointer to journal controller
+ enq_map& _emap; ///< Ref to enqueue map
+ txn_map& _tmap; ///< Ref to transaction map
+ void* _page_base_ptr; ///< Base pointer to page memory
+ void** _page_ptr_arr; ///< Array of pointers to cache_num_pages in page memory
+ page_cb* _page_cb_arr; ///< Array of page_cb structs
+ aio_cb* _aio_cb_arr; ///< Array of iocb structs
+ aio_event* _aio_event_arr; ///< Array of io_events
+ io_context_t _ioctx; ///< AIO context for read/write operations
+ u_int16_t _pg_index; ///< Index of current page being used
+ u_int32_t _pg_cntr; ///< Page counter; determines if file rotation req'd
+ u_int32_t _pg_offset_dblks; ///< Page offset (used so far) in data blocks
+ u_int32_t _aio_evt_rem; ///< Remaining AIO events
+ aio_callback* _cbp; ///< Pointer to callback object
+
+ enq_rec _enq_rec; ///< Enqueue record used for encoding/decoding
+ deq_rec _deq_rec; ///< Dequeue record used for encoding/decoding
+ txn_rec _txn_rec; ///< Transaction record used for encoding/decoding
+
+ public:
+ pmgr(jcntl* jc, enq_map& emap, txn_map& tmap);
+ virtual ~pmgr();
+
+ virtual int32_t get_events(page_state state, timespec* const timeout, bool flush = false) = 0;
+ inline u_int32_t get_aio_evt_rem() const { return _aio_evt_rem; }
+ static const char* page_state_str(page_state ps);
+ inline u_int32_t cache_pgsize_sblks() const { return _cache_pgsize_sblks; }
+ inline u_int16_t cache_num_pages() const { return _cache_num_pages; }
+
+ protected:
+ virtual void initialize(aio_callback* const cbp, const u_int32_t cache_pgsize_sblks,
+ const u_int16_t cache_num_pages);
+ virtual void rotate_page() = 0;
+ virtual void clean();
+ };
+
+} // namespace journal
+} // namespace mrg
+
+#endif // ifndef QPID_LEGACYSTORE_JRNL_PMGR_H
Propchange: qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/pmgr.h
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/pmgr.h
------------------------------------------------------------------------------
svn:keywords = Author Date Id Rev URL
Added: qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/rcvdat.h
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/rcvdat.h?rev=1501895&view=auto
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/rcvdat.h (added)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/rcvdat.h Wed Jul 10 18:20:19 2013
@@ -0,0 +1,181 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+/**
+ * \file rcvdat.h
+ *
+ * Qpid asynchronous store plugin library
+ *
+ * Contains structure for recovery status and offset data.
+ *
+ * \author Kim van der Riet
+ */
+
+#ifndef QPID_LEGACYSTORE_JRNL_RCVDAT_H
+#define QPID_LEGACYSTORE_JRNL_RCVDAT_H
+
+#include <cstddef>
+#include <iomanip>
+#include <map>
+#include "qpid/legacystore/jrnl/jcfg.h"
+#include <sstream>
+#include <sys/types.h>
+#include <vector>
+
+namespace mrg
+{
+namespace journal
+{
+
+ struct rcvdat
+ {
+ u_int16_t _njf; ///< Number of journal files
+ bool _ae; ///< Auto-expand mode
+ u_int16_t _aemjf; ///< Auto-expand mode max journal files
+ bool _owi; ///< Overwrite indicator
+ bool _frot; ///< First rotation flag
+ bool _jempty; ///< Journal data files empty
+ u_int16_t _ffid; ///< First file id
+ std::size_t _fro; ///< First record offset in ffid
+ u_int16_t _lfid; ///< Last file id
+ std::size_t _eo; ///< End offset (first byte past last record)
+ u_int64_t _h_rid; ///< Highest rid found
+ bool _lffull; ///< Last file is full
+ bool _jfull; ///< Journal is full
+ std::vector<u_int16_t> _fid_list; ///< Fid-lid mapping - list of fids in order of lid
+ std::vector<u_int32_t> _enq_cnt_list; ///< Number enqueued records found for each file
+
+ rcvdat():
+ _njf(0),
+ _ae(false),
+ _aemjf(0),
+ _owi(false),
+ _frot(false),
+ _jempty(true),
+ _ffid(0),
+ _fro(0),
+ _lfid(0),
+ _eo(0),
+ _h_rid(0),
+ _lffull(false),
+ _jfull(false),
+ _fid_list(),
+ _enq_cnt_list()
+ {}
+
+ void reset(const u_int16_t num_jfiles, const bool auto_expand, const u_int16_t ae_max_jfiles)
+ {
+ _njf = num_jfiles;
+ _ae = auto_expand;
+ _aemjf = ae_max_jfiles;
+ _owi = false;
+ _frot = false;
+ _jempty = true;
+ _ffid = 0;
+ _fro = 0;
+ _lfid = 0;
+ _eo = 0;
+ _h_rid = 0;
+ _lffull = false;
+ _jfull = false;
+ _fid_list.clear();
+ _enq_cnt_list.clear();
+ _enq_cnt_list.resize(num_jfiles, 0);
+ }
+
+ // Find first fid with enqueued records
+ u_int16_t ffid()
+ {
+ u_int16_t index = _ffid;
+ while (index != _lfid && _enq_cnt_list[index] == 0)
+ {
+ if (++index >= _njf)
+ index = 0;
+ }
+ return index;
+ }
+
+ std::string to_string(const std::string& jid)
+ {
+ std::ostringstream oss;
+ oss << "Recover file analysis (jid=\"" << jid << "\"):" << std::endl;
+ oss << " Number of journal files (_njf) = " << _njf << std::endl;
+ oss << " Auto-expand mode (_ae) = " << (_ae ? "TRUE" : "FALSE") << std::endl;
+ if (_ae) oss << " Auto-expand mode max journal files (_aemjf) = " << _aemjf << std::endl;
+ oss << " Overwrite indicator (_owi) = " << (_owi ? "TRUE" : "FALSE") << std::endl;
+ oss << " First rotation (_frot) = " << (_frot ? "TRUE" : "FALSE") << std::endl;
+ oss << " Journal empty (_jempty) = " << (_jempty ? "TRUE" : "FALSE") << std::endl;
+ oss << " First (earliest) fid (_ffid) = " << _ffid << std::endl;
+ oss << " First record offset in first fid (_fro) = 0x" << std::hex << _fro <<
+ std::dec << " (" << (_fro/JRNL_DBLK_SIZE) << " dblks)" << std::endl;
+ oss << " Last (most recent) fid (_lfid) = " << _lfid << std::endl;
+ oss << " End offset (_eo) = 0x" << std::hex << _eo << std::dec << " (" <<
+ (_eo/JRNL_DBLK_SIZE) << " dblks)" << std::endl;
+ oss << " Highest rid (_h_rid) = 0x" << std::hex << _h_rid << std::dec << std::endl;
+ oss << " Last file full (_lffull) = " << (_lffull ? "TRUE" : "FALSE") << std::endl;
+ oss << " Journal full (_jfull) = " << (_jfull ? "TRUE" : "FALSE") << std::endl;
+ oss << " Normalized fid list (_fid_list) = [";
+ for (std::vector<u_int16_t>::const_iterator i = _fid_list.begin(); i < _fid_list.end(); i++)
+ {
+ if (i != _fid_list.begin()) oss << ", ";
+ oss << *i;
+ }
+ oss << "]" << std::endl;
+ oss << " Enqueued records (txn & non-txn):" << std::endl;
+ for (unsigned i=0; i<_enq_cnt_list.size(); i++)
+ oss << " File " << std::setw(2) << i << ": " << _enq_cnt_list[i] <<
+ std::endl;
+ return oss.str();
+ }
+
+ std::string to_log(const std::string& jid)
+ {
+ std::ostringstream oss;
+ oss << "Recover file analysis (jid=\"" << jid << "\"):";
+ oss << " njf=" << _njf;
+ oss << " ae=" << (_owi ? "T" : "F");
+ oss << " aemjf=" << _aemjf;
+ oss << " owi=" << (_ae ? "T" : "F");
+ oss << " frot=" << (_frot ? "T" : "F");
+ oss << " jempty=" << (_jempty ? "T" : "F");
+ oss << " ffid=" << _ffid;
+ oss << " fro=0x" << std::hex << _fro << std::dec << " (" <<
+ (_fro/JRNL_DBLK_SIZE) << " dblks)";
+ oss << " lfid=" << _lfid;
+ oss << " eo=0x" << std::hex << _eo << std::dec << " (" <<
+ (_eo/JRNL_DBLK_SIZE) << " dblks)";
+ oss << " h_rid=0x" << std::hex << _h_rid << std::dec;
+ oss << " lffull=" << (_lffull ? "T" : "F");
+ oss << " jfull=" << (_jfull ? "T" : "F");
+ oss << " Enqueued records (txn & non-txn): [ ";
+ for (unsigned i=0; i<_enq_cnt_list.size(); i++)
+ {
+ if (i) oss << " ";
+ oss << "fid_" << std::setw(2) << std::setfill('0') << i << "=" << _enq_cnt_list[i];
+ }
+ oss << " ]";
+ return oss.str();
+ }
+ };
+} // namespace journal
+} // namespace mrg
+
+#endif // ifndef QPID_LEGACYSTORE_JRNL_RCVDAT_H
Propchange: qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/rcvdat.h
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/rcvdat.h
------------------------------------------------------------------------------
svn:keywords = Author Date Id Rev URL
Added: qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/rec_hdr.h
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/rec_hdr.h?rev=1501895&view=auto
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/rec_hdr.h (added)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/rec_hdr.h Wed Jul 10 18:20:19 2013
@@ -0,0 +1,143 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+/**
+ * \file rec_hdr.h
+ *
+ * Qpid asynchronous store plugin library
+ *
+ * File containing code for class mrg::journal::rec_hdr (record header),
+ * which is a common initial header used for all journal record structures
+ * except the record tail (rec_tail).
+ *
+ * \author Kim van der Riet
+ */
+
+#ifndef QPID_LEGACYSTORE_JRNL_REC_HDR_H
+#define QPID_LEGACYSTORE_JRNL_REC_HDR_H
+
+#include <cstddef>
+#include "qpid/legacystore/jrnl/jcfg.h"
+#include <sys/types.h>
+
+namespace mrg
+{
+namespace journal
+{
+
+#pragma pack(1)
+
+ /**
+ * \brief Struct for data common to the head of all journal files and records.
+ * This includes identification for the file type, the encoding version, endian
+ * indicator and a record ID.
+ *
+ * File header info in binary format (16 bytes):
+ * <pre>
+ * 0 7
+ * +---+---+---+---+---+---+---+---+
+ * | magic | v | e | flags |
+ * +---+---+---+---+---+---+---+---+
+ * | rid |
+ * +---+---+---+---+---+---+---+---+
+ * v = file version (If the format or encoding of this file changes, then this
+ * number should be incremented)
+ * e = endian flag, false (0x00) for little endian, true (0x01) for big endian
+ * </pre>
+ *
+ * Note that journal files should be transferable between 32- and 64-bit
+ * hardware of the same endianness, but not between hardware of opposite
+ * entianness without some sort of binary conversion utility. Thus buffering
+ * will be needed for types that change size between 32- and 64-bit compiles.
+ */
+ struct rec_hdr
+ {
+ u_int32_t _magic; ///< File type identifier (magic number)
+ u_int8_t _version; ///< File encoding version
+ u_int8_t _eflag; ///< Flag for determining endianness
+ u_int16_t _uflag; ///< User-defined flags
+ u_int64_t _rid; ///< Record ID (rotating 64-bit counter)
+
+ // Global flags
+ static const u_int16_t HDR_OVERWRITE_INDICATOR_MASK = 0x1;
+
+ // Convenience constructors and methods
+ /**
+ * \brief Default constructor, which sets all values to 0.
+ */
+ inline rec_hdr(): _magic(0), _version(0), _eflag(0), _uflag(0), _rid(0) {}
+
+ /**
+ * \brief Convenience constructor which initializes values during construction.
+ */
+ inline rec_hdr(const u_int32_t magic, const u_int8_t version, const u_int64_t rid,
+ const bool owi): _magic(magic), _version(version),
+#if defined(JRNL_BIG_ENDIAN)
+ _eflag(RHM_BENDIAN_FLAG),
+#else
+ _eflag(RHM_LENDIAN_FLAG),
+#endif
+ _uflag(owi ? HDR_OVERWRITE_INDICATOR_MASK : 0), _rid(rid) {}
+
+ /**
+ * \brief Convenience copy method.
+ */
+ inline void hdr_copy(const rec_hdr& h)
+ {
+ _magic = h._magic;
+ _version = h._version;
+ _eflag = h._eflag;
+ _uflag = h._uflag;
+ _rid =h._rid;
+ }
+
+ /**
+ * \brief Resets all fields to 0
+ */
+ inline void reset()
+ {
+ _magic = 0;
+ _version = 0;
+ _eflag = 0;
+ _uflag = 0;
+ _rid = 0;
+ }
+
+ inline bool get_owi() const { return _uflag & HDR_OVERWRITE_INDICATOR_MASK; }
+
+ inline void set_owi(const bool owi)
+ {
+ _uflag = owi ? _uflag | HDR_OVERWRITE_INDICATOR_MASK :
+ _uflag & (~HDR_OVERWRITE_INDICATOR_MASK);
+ }
+
+ /**
+ * \brief Returns the size of the header in bytes.
+ */
+ inline static std::size_t size() { return sizeof(rec_hdr); }
+ }; // struct rec_hdr
+
+#pragma pack()
+
+} // namespace journal
+} // namespace mrg
+
+#endif // ifndef QPID_LEGACYSTORE_JRNL_REC_HDR_H
Propchange: qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/rec_hdr.h
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/rec_hdr.h
------------------------------------------------------------------------------
svn:keywords = Author Date Id Rev URL
Added: qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/rec_tail.h
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/rec_tail.h?rev=1501895&view=auto
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/rec_tail.h (added)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/rec_tail.h Wed Jul 10 18:20:19 2013
@@ -0,0 +1,98 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+/**
+ * \file rec_tail.h
+ *
+ * Qpid asynchronous store plugin library
+ *
+ * File containing code for class mrg::journal::rec_tail (record tail), used to
+ * finalize a persistent record. The presence of a valid tail at the expected
+ * position in the journal file indicates that the record write was completed.
+ *
+ * \author Kim van der Riet
+ */
+
+#ifndef QPID_LEGACYSTORE_JRNL_REC_TAIL_H
+#define QPID_LEGACYSTORE_JRNL_REC_TAIL_H
+
+#include <cstddef>
+#include "qpid/legacystore/jrnl/jcfg.h"
+
+namespace mrg
+{
+namespace journal
+{
+
+#pragma pack(1)
+
+ /**
+ * \brief Struct for data common to the tail of all records. The magic number
+ * used here is the binary inverse (1's complement) of the magic used in the
+ * record header; this minimizes possible confusion with other headers that may
+ * be present during recovery. The tail is used with all records that have either
+ * XIDs or data - ie any size-variable content. Currently the only records that
+ * do NOT use the tail are non-transactional dequeues and filler records.
+ *
+ * Record header info in binary format (12 bytes):
+ * <pre>
+ * 0 7
+ * +---+---+---+---+---+---+---+---+
+ * | ~(magic) | rid |
+ * +---+---+---+---+---+---+---+---+
+ * | rid (con't) |
+ * +---+---+---+---+
+ * </pre>
+ */
+ struct rec_tail
+ {
+ u_int32_t _xmagic; ///< Binary inverse (1's complement) of hdr magic number
+ u_int64_t _rid; ///< ID (rotating 64-bit counter)
+
+
+ /**
+ * \brief Default constructor, which sets all values to 0.
+ */
+ inline rec_tail(): _xmagic(0xffffffff), _rid(0) {}
+
+ /**
+ * \brief Convenience constructor which initializes values during construction from
+ * existing enq_hdr instance.
+ */
+ inline rec_tail(const rec_hdr& h): _xmagic(~h._magic), _rid(h._rid) {}
+
+ /**
+ * \brief Convenience constructor which initializes values during construction.
+ */
+ inline rec_tail(const u_int32_t xmagic, const u_int64_t rid): _xmagic(xmagic), _rid(rid) {}
+
+ /**
+ * \brief Returns the size of the header in bytes.
+ */
+ inline static std::size_t size() { return sizeof(rec_tail); }
+ };
+
+#pragma pack()
+
+} // namespace journal
+} // namespace mrg
+
+#endif // ifndef QPID_LEGACYSTORE_JRNL_REC_TAIL_H
Propchange: qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/rec_tail.h
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/rec_tail.h
------------------------------------------------------------------------------
svn:keywords = Author Date Id Rev URL
Added: qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/rfc.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/rfc.cpp?rev=1501895&view=auto
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/rfc.cpp (added)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/rfc.cpp Wed Jul 10 18:20:19 2013
@@ -0,0 +1,82 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+/**
+ * \file rfc.cpp
+ *
+ * Qpid asynchronous store plugin library
+ *
+ * File containing code for class mrg::journal::rfc (rotating
+ * file controller). See comments in file rfc.h for details.
+ *
+ * \author Kim van der Riet
+ */
+
+#include "qpid/legacystore/jrnl/rfc.h"
+
+#include <cassert>
+
+namespace mrg
+{
+namespace journal
+{
+
+rfc::rfc(const lpmgr* lpmp): _lpmp(lpmp), _fc_index(0), _curr_fc(0)
+{}
+
+rfc::~rfc()
+{}
+
+void
+rfc::finalize()
+{
+ unset_findex();
+}
+
+void
+rfc::set_findex(const u_int16_t fc_index)
+{
+ _fc_index = fc_index;
+ _curr_fc = _lpmp->get_fcntlp(fc_index);
+ _curr_fc->rd_reset();
+}
+
+void
+rfc::unset_findex()
+{
+ _fc_index = 0;
+ _curr_fc = 0;
+}
+
+std::string
+rfc::status_str() const
+{
+ if (!_lpmp->is_init())
+ return "state: Uninitialized";
+ if (_curr_fc == 0)
+ return "state: Inactive";
+ std::ostringstream oss;
+ oss << "state: Active";
+ return oss.str();
+}
+
+} // namespace journal
+} // namespace mrg
Propchange: qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/rfc.cpp
------------------------------------------------------------------------------
svn:eol-style = native
Added: qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/rfc.h
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/rfc.h?rev=1501895&view=auto
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/rfc.h (added)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/rfc.h Wed Jul 10 18:20:19 2013
@@ -0,0 +1,193 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+/**
+ * \file rfc.h
+ *
+ * Qpid asynchronous store plugin library
+ *
+ * File containing code for class mrg::journal::rfc (rotating
+ * file controller). See class documentation for details.
+ *
+ * \author Kim van der Riet
+ */
+
+#ifndef QPID_LEGACYSTORE_JRNL_RFC_H
+#define QPID_LEGACYSTORE_JRNL_RFC_H
+
+namespace mrg
+{
+namespace journal
+{
+class rfc;
+}
+}
+
+#include "qpid/legacystore/jrnl/lpmgr.h"
+#include "qpid/legacystore/jrnl/enums.h"
+
+namespace mrg
+{
+namespace journal
+{
+
+ /**
+ * \class rfc
+ * \brief Rotating File Controller (rfc) - Class to handle the manangement of an array of file controllers (fcntl)
+ * objects for use in a circular disk buffer (journal). Each fcntl object corresponds to a file in the journal.
+ *
+ * The following states exist in this class:
+ *
+ * <pre>
+ * is_init() is_active()
+ * +===+ _lpmp.is_init() == false
+ * +---------->| | Uninitialized: _curr_fc == 0 F F
+ * | +-->+===+ --+
+ * | | |
+ * | | |
+ * | finalize() initialize()
+ * | | |
+ * | | |
+ * | +-- +===+<--+ _lpmp.is_init() == true
+ * finalize() | | Inactive: _curr_fc == 0 T F
+ * | +-->+===+ --+
+ * | | |
+ * | | |
+ * | unset_findex() set_findex()
+ * | | |
+ * | | |
+ * | +-- +===+<--+ _lpmp.is_init() == true
+ * +---------- | | Active: _curr_fc != 0 T T
+ * +===+
+ * </pre>
+ *
+ * The Uninitialized state is where the class starts after construction. Once the number of files is known and
+ * the array of file controllers allocated, then initialize() is called to set these, causing the state to move
+ * to Inactive.
+ *
+ * The Inactive state has the file controllers allocated and pointing to their respective journal files, but no
+ * current file controller has been selected. The pointer to the current file controller _curr_fc is null. Once the
+ * index of the active file is known, then calling set_findex() will set the index and internal pointer
+ * to the currently active file controller. This moves the state to Active.
+ *
+ * Note TODO: Comment on sync issues between change in num files in _lpmp and _fc_index/_curr_fc.
+ */
+ class rfc
+ {
+ protected:
+ const lpmgr* _lpmp; ///< Pointer to jcntl's lpmgr instance containing lfid/pfid map and fcntl objects
+ u_int16_t _fc_index; ///< Index of current file controller
+ fcntl* _curr_fc; ///< Pointer to current file controller
+
+ public:
+ rfc(const lpmgr* lpmp);
+ virtual ~rfc();
+
+ /**
+ * \brief Initialize the controller, moving from state Uninitialized to Inactive. The main function of
+ * initialize() is to set the number of files and the pointer to the fcntl array.
+ */
+ virtual inline void initialize() {}
+
+ /**
+ * \brief Reset the controller to Uninitialized state, usually called when the journal is stopped. Once called,
+ * initialize() must be called to reuse an instance.
+ */
+ virtual void finalize();
+
+ /**
+ * \brief Check initialization state: true = Not Uninitialized, ie Initialized or Active; false = Uninitialized.
+ */
+ virtual inline bool is_init() const { return _lpmp->is_init(); }
+
+ /**
+ * \brief Check active state: true = Initialized and _curr_fc not null; false otherwise.
+ */
+ virtual inline bool is_active() const { return _lpmp->is_init() && _curr_fc != 0; }
+
+ /**
+ * \brief Sets the current file index and active fcntl object. Moves to state Active.
+ */
+ virtual void set_findex(const u_int16_t fc_index);
+
+ /**
+ * \brief Nulls the current file index and active fcntl pointer, moves to state Inactive.
+ */
+ virtual void unset_findex();
+
+ /**
+ * \brief Rotate active file controller to next file in rotating file group.
+ * \exception jerrno::JERR__NINIT if called before calling initialize().
+ */
+ virtual iores rotate() = 0;
+
+ /**
+ * \brief Returns the index of the currently active file within the rotating file group.
+ */
+ inline u_int16_t index() const { return _fc_index; }
+
+ /**
+ * \brief Returns the currently active journal file controller within the rotating file group.
+ */
+ inline fcntl* file_controller() const { return _curr_fc; }
+
+ /**
+ * \brief Returns the currently active physical file id (pfid)
+ */
+ inline u_int16_t pfid() const { return _curr_fc->pfid(); }
+
+ // Convenience access methods to current file controller
+ // Note: Do not call when not in active state
+
+ inline u_int32_t enqcnt() const { return _curr_fc->enqcnt(); }
+ inline u_int32_t incr_enqcnt() { return _curr_fc->incr_enqcnt(); }
+ inline u_int32_t incr_enqcnt(const u_int16_t fid) { return _lpmp->get_fcntlp(fid)->incr_enqcnt(); }
+ inline u_int32_t add_enqcnt(const u_int32_t a) { return _curr_fc->add_enqcnt(a); }
+ inline u_int32_t add_enqcnt(const u_int16_t fid, const u_int32_t a)
+ { return _lpmp->get_fcntlp(fid)->add_enqcnt(a); }
+ inline u_int32_t decr_enqcnt(const u_int16_t fid) { return _lpmp->get_fcntlp(fid)->decr_enqcnt(); }
+ inline u_int32_t subtr_enqcnt(const u_int16_t fid, const u_int32_t s)
+ { return _lpmp->get_fcntlp(fid)->subtr_enqcnt(s); }
+
+ virtual inline u_int32_t subm_cnt_dblks() const = 0;
+ virtual inline std::size_t subm_offs() const = 0;
+ virtual inline u_int32_t add_subm_cnt_dblks(u_int32_t a) = 0;
+
+ virtual inline u_int32_t cmpl_cnt_dblks() const = 0;
+ virtual inline std::size_t cmpl_offs() const = 0;
+ virtual inline u_int32_t add_cmpl_cnt_dblks(u_int32_t a) = 0;
+
+ virtual inline bool is_void() const = 0;
+ virtual inline bool is_empty() const = 0;
+ virtual inline u_int32_t remaining_dblks() const = 0;
+ virtual inline bool is_full() const = 0;
+ virtual inline bool is_compl() const = 0;
+ virtual inline u_int32_t aio_outstanding_dblks() const = 0;
+ virtual inline bool file_rotate() const = 0;
+
+ // Debug aid
+ virtual std::string status_str() const;
+ }; // class rfc
+
+} // namespace journal
+} // namespace mrg
+
+#endif // ifndef QPID_LEGACYSTORE_JRNL_RFC_H
Propchange: qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/rfc.h
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/rfc.h
------------------------------------------------------------------------------
svn:keywords = Author Date Id Rev URL
Added: qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/rmgr.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/rmgr.cpp?rev=1501895&view=auto
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/rmgr.cpp (added)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/rmgr.cpp Wed Jul 10 18:20:19 2013
@@ -0,0 +1,698 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+/**
+ * \file rmgr.cpp
+ *
+ * Qpid asynchronous store plugin library
+ *
+ * File containing code for class mrg::journal::rmgr (read manager). See
+ * comments in file rmgr.h for details.
+ *
+ * \author Kim van der Riet
+ */
+
+#include "qpid/legacystore/jrnl/rmgr.h"
+
+#include <cassert>
+#include <cerrno>
+#include <cstdlib>
+#include "qpid/legacystore/jrnl/jcntl.h"
+#include "qpid/legacystore/jrnl/jerrno.h"
+#include <sstream>
+
+namespace mrg
+{
+namespace journal
+{
+
+rmgr::rmgr(jcntl* jc, enq_map& emap, txn_map& tmap, rrfc& rrfc):
+ pmgr(jc, emap, tmap),
+ _rrfc(rrfc),
+ _hdr(),
+ _fhdr_buffer(0),
+ _fhdr_aio_cb_ptr(0),
+ _fhdr_rd_outstanding(false)
+{}
+
+rmgr::~rmgr()
+{
+ rmgr::clean();
+}
+
+void
+rmgr::initialize(aio_callback* const cbp)
+{
+ pmgr::initialize(cbp, JRNL_RMGR_PAGE_SIZE, JRNL_RMGR_PAGES);
+ clean();
+ // Allocate memory for reading file header
+ if (::posix_memalign(&_fhdr_buffer, _sblksize, _sblksize))
+ {
+ std::ostringstream oss;
+ oss << "posix_memalign(): blksize=" << _sblksize << " size=" << _sblksize;
+ oss << FORMAT_SYSERR(errno);
+ throw jexception(jerrno::JERR__MALLOC, oss.str(), "rmgr", "initialize");
+ }
+ _fhdr_aio_cb_ptr = new aio_cb;
+ std::memset(_fhdr_aio_cb_ptr, 0, sizeof(aio_cb*));
+}
+
+void
+rmgr::clean()
+{
+ std::free(_fhdr_buffer);
+ _fhdr_buffer = 0;
+
+ if (_fhdr_aio_cb_ptr)
+ {
+ delete _fhdr_aio_cb_ptr;
+ _fhdr_aio_cb_ptr = 0;
+ }
+}
+
+iores
+rmgr::read(void** const datapp, std::size_t& dsize, void** const xidpp, std::size_t& xidsize,
+ bool& transient, bool& external, data_tok* dtokp, bool ignore_pending_txns)
+{
+ iores res = pre_read_check(dtokp);
+ if (res != RHM_IORES_SUCCESS)
+ {
+ set_params_null(datapp, dsize, xidpp, xidsize);
+ return res;
+ }
+
+ if (dtokp->rstate() == data_tok::SKIP_PART)
+ {
+ if (_page_cb_arr[_pg_index]._state != AIO_COMPLETE)
+ {
+ aio_cycle(); // check if rd AIOs returned; initiate new reads if possible
+ return RHM_IORES_PAGE_AIOWAIT;
+ }
+ const iores res = skip(dtokp);
+ if (res != RHM_IORES_SUCCESS)
+ {
+ set_params_null(datapp, dsize, xidpp, xidsize);
+ return res;
+ }
+ }
+ if (dtokp->rstate() == data_tok::READ_PART)
+ {
+ assert(dtokp->rid() == _hdr._rid);
+ void* rptr = (void*)((char*)_page_ptr_arr[_pg_index] + (_pg_offset_dblks * JRNL_DBLK_SIZE));
+ const iores res = read_enq(_hdr, rptr, dtokp);
+ dsize = _enq_rec.get_data(datapp);
+ xidsize = _enq_rec.get_xid(xidpp);
+ transient = _enq_rec.is_transient();
+ external = _enq_rec.is_external();
+ return res;
+ }
+
+ set_params_null(datapp, dsize, xidpp, xidsize);
+ _hdr.reset();
+ // Read header, determine next record type
+ while (true)
+ {
+ if(dblks_rem() == 0 && _rrfc.is_compl() && !_rrfc.is_wr_aio_outstanding())
+ {
+ aio_cycle(); // check if rd AIOs returned; initiate new reads if possible
+ if(dblks_rem() == 0 && _rrfc.is_compl() && !_rrfc.is_wr_aio_outstanding())
+ {
+ if (_jc->unflushed_dblks() > 0)
+ _jc->flush();
+ else if (!_aio_evt_rem)
+ return RHM_IORES_EMPTY;
+ }
+ }
+ if (_page_cb_arr[_pg_index]._state != AIO_COMPLETE)
+ {
+ aio_cycle();
+ return RHM_IORES_PAGE_AIOWAIT;
+ }
+ void* rptr = (void*)((char*)_page_ptr_arr[_pg_index] + (_pg_offset_dblks * JRNL_DBLK_SIZE));
+ std::memcpy(&_hdr, rptr, sizeof(rec_hdr));
+ switch (_hdr._magic)
+ {
+ case RHM_JDAT_ENQ_MAGIC:
+ {
+ _enq_rec.reset(); // sets enqueue rec size
+ // Check if RID of this rec is still enqueued, if so read it, else skip
+ bool is_enq = false;
+ int16_t fid = _emap.get_pfid(_hdr._rid);
+ if (fid < enq_map::EMAP_OK)
+ {
+ bool enforce_txns = !_jc->is_read_only() && !ignore_pending_txns;
+ // Block read for transactionally locked record (only when not recovering)
+ if (fid == enq_map::EMAP_LOCKED && enforce_txns)
+ return RHM_IORES_TXPENDING;
+
+ // (Recover mode only) Ok, not in emap - now search tmap, if present then read
+ is_enq = _tmap.is_enq(_hdr._rid);
+ if (enforce_txns && is_enq)
+ return RHM_IORES_TXPENDING;
+ }
+ else
+ is_enq = true;
+
+ if (is_enq) // ok, this record is enqueued, check it, then read it...
+ {
+ if (dtokp->rid())
+ {
+ if (_hdr._rid != dtokp->rid())
+ {
+ std::ostringstream oss;
+ oss << std::hex << "rid=0x" << _hdr._rid << "; dtok_rid=0x" << dtokp->rid()
+ << "; dtok_id=0x" << dtokp->id();
+ throw jexception(jerrno::JERR_RMGR_RIDMISMATCH, oss.str(), "rmgr", "read");
+ }
+ }
+ else
+ dtokp->set_rid(_hdr._rid);
+
+// TODO: Add member _fid to pmgr::page_cb which indicates the fid from which this page was
+// populated. When this value is set in wmgr::flush() somewehere, then uncomment the following
+// check:
+// if (fid != _page_cb_arr[_pg_index]._fid)
+// {
+// std::ostringstream oss;
+// oss << std::hex << std::setfill('0');
+// oss << "rid=0x" << std::setw(16) << _hdr._rid;
+// oss << "; emap_fid=0x" << std::setw(4) << fid;
+// oss << "; current_fid=" << _rrfc.fid();
+// throw jexception(jerrno::JERR_RMGR_FIDMISMATCH, oss.str(), "rmgr",
+// "read");
+// }
+
+ const iores res = read_enq(_hdr, rptr, dtokp);
+ dsize = _enq_rec.get_data(datapp);
+ xidsize = _enq_rec.get_xid(xidpp);
+ transient = _enq_rec.is_transient();
+ external = _enq_rec.is_external();
+ return res;
+ }
+ else // skip this record, it is already dequeued
+ consume_xid_rec(_hdr, rptr, dtokp);
+ break;
+ }
+ case RHM_JDAT_DEQ_MAGIC:
+ consume_xid_rec(_hdr, rptr, dtokp);
+ break;
+ case RHM_JDAT_TXA_MAGIC:
+ consume_xid_rec(_hdr, rptr, dtokp);
+ break;
+ case RHM_JDAT_TXC_MAGIC:
+ consume_xid_rec(_hdr, rptr, dtokp);
+ break;
+ case RHM_JDAT_EMPTY_MAGIC:
+ consume_filler();
+ break;
+ default:
+ return RHM_IORES_EMPTY;
+ }
+ }
+}
+
+int32_t
+rmgr::get_events(page_state state, timespec* const timeout, bool flush)
+{
+ if (_aio_evt_rem == 0) // no events to get
+ return 0;
+
+ int32_t ret;
+ if ((ret = aio::getevents(_ioctx, flush ? _aio_evt_rem : 1, _aio_evt_rem/*_cache_num_pages + _jc->num_jfiles()*/, _aio_event_arr, timeout)) < 0)
+ {
+ if (ret == -EINTR) // Interrupted by signal
+ return 0;
+ std::ostringstream oss;
+ oss << "io_getevents() failed: " << std::strerror(-ret) << " (" << ret << ")";
+ throw jexception(jerrno::JERR__AIO, oss.str(), "rmgr", "get_events");
+ }
+ if (ret == 0 && timeout)
+ return jerrno::AIO_TIMEOUT;
+
+ std::vector<u_int16_t> pil;
+ pil.reserve(ret);
+ for (int i=0; i<ret; i++) // Index of returned AIOs
+ {
+ if (_aio_evt_rem == 0)
+ {
+ std::ostringstream oss;
+ oss << "_aio_evt_rem; evt " << (i + 1) << " of " << ret;
+ throw jexception(jerrno::JERR__UNDERFLOW, oss.str(), "rmgr", "get_events");
+ }
+ _aio_evt_rem--;
+ aio_cb* aiocbp = _aio_event_arr[i].obj; // This I/O control block (iocb)
+ page_cb* pcbp = (page_cb*)(aiocbp->data); // This page control block (pcb)
+ long aioret = (long)_aio_event_arr[i].res;
+ if (aioret < 0)
+ {
+ std::ostringstream oss;
+ oss << "AIO read operation failed: " << std::strerror(-aioret) << " (" << aioret << ")";
+ oss << " [pg=" << pcbp->_index << " buf=" << aiocbp->u.c.buf;
+ oss << " rsize=0x" << std::hex << aiocbp->u.c.nbytes;
+ oss << " offset=0x" << aiocbp->u.c.offset << std::dec;
+ oss << " fh=" << aiocbp->aio_fildes << "]";
+ throw jexception(jerrno::JERR__AIO, oss.str(), "rmgr", "get_events");
+ }
+
+ if (pcbp) // Page reads have pcb
+ {
+ if (pcbp->_rfh->rd_subm_cnt_dblks() >= JRNL_SBLK_SIZE) // Detects if write reset of this fcntl obj has occurred.
+ {
+ // Increment the completed read offset
+ // NOTE: We cannot use _rrfc here, as it may have rotated since submitting count.
+ // Use stored pointer to fcntl in the pcb instead.
+ pcbp->_rdblks = aiocbp->u.c.nbytes / JRNL_DBLK_SIZE;
+ pcbp->_rfh->add_rd_cmpl_cnt_dblks(pcbp->_rdblks);
+ pcbp->_state = state;
+ pil[i] = pcbp->_index;
+ }
+ }
+ else // File header reads have no pcb
+ {
+ std::memcpy(&_fhdr, _fhdr_buffer, sizeof(file_hdr));
+ _rrfc.add_cmpl_cnt_dblks(JRNL_SBLK_SIZE);
+
+ u_int32_t fro_dblks = (_fhdr._fro / JRNL_DBLK_SIZE) - JRNL_SBLK_SIZE;
+ // Check fro_dblks does not exceed the write pointers which can happen in some corrupted journal recoveries
+ if (fro_dblks > _jc->wr_subm_cnt_dblks(_fhdr._pfid) - JRNL_SBLK_SIZE)
+ fro_dblks = _jc->wr_subm_cnt_dblks(_fhdr._pfid) - JRNL_SBLK_SIZE;
+ _pg_cntr = fro_dblks / (JRNL_RMGR_PAGE_SIZE * JRNL_SBLK_SIZE);
+ u_int32_t tot_pg_offs_dblks = _pg_cntr * JRNL_RMGR_PAGE_SIZE * JRNL_SBLK_SIZE;
+ _pg_index = _pg_cntr % JRNL_RMGR_PAGES;
+ _pg_offset_dblks = fro_dblks - tot_pg_offs_dblks;
+ _rrfc.add_subm_cnt_dblks(tot_pg_offs_dblks);
+ _rrfc.add_cmpl_cnt_dblks(tot_pg_offs_dblks);
+
+ _fhdr_rd_outstanding = false;
+ _rrfc.set_valid();
+ }
+ }
+
+ // Perform AIO return callback
+ if (_cbp && ret)
+ _cbp->rd_aio_cb(pil);
+ return ret;
+}
+
+void
+rmgr::recover_complete()
+{}
+
+void
+rmgr::invalidate()
+{
+ if (_rrfc.is_valid())
+ _rrfc.set_invalid();
+}
+
+void
+rmgr::flush(timespec* timeout)
+{
+ // Wait for any outstanding AIO read operations to complete before synchronizing
+ while (_aio_evt_rem)
+ {
+ if (get_events(AIO_COMPLETE, timeout) == jerrno::AIO_TIMEOUT) // timed out, nothing returned
+ {
+ throw jexception(jerrno::JERR__TIMEOUT,
+ "Timed out waiting for outstanding read aio to return", "rmgr", "init_validation");
+ }
+ }
+
+ // Reset all read states and pointers
+ for (int i=0; i<_cache_num_pages; i++)
+ _page_cb_arr[i]._state = UNUSED;
+ _rrfc.unset_findex();
+ _pg_index = 0;
+ _pg_offset_dblks = 0;
+}
+
+bool
+rmgr::wait_for_validity(timespec* timeout, const bool throw_on_timeout)
+{
+ bool timed_out = false;
+ while (!_rrfc.is_valid() && !timed_out)
+ {
+ timed_out = get_events(AIO_COMPLETE, timeout) == jerrno::AIO_TIMEOUT;
+ if (timed_out && throw_on_timeout)
+ throw jexception(jerrno::JERR__TIMEOUT, "Timed out waiting for read validity", "rmgr", "wait_for_validity");
+ }
+ return _rrfc.is_valid();
+}
+
+iores
+rmgr::pre_read_check(data_tok* dtokp)
+{
+ if (_aio_evt_rem)
+ get_events(AIO_COMPLETE, 0);
+
+ if (!_rrfc.is_valid())
+ return RHM_IORES_RCINVALID;
+
+ // block reads until outstanding file header read completes as fro is needed to read
+ if (_fhdr_rd_outstanding)
+ return RHM_IORES_PAGE_AIOWAIT;
+
+ if(dblks_rem() == 0 && _rrfc.is_compl() && !_rrfc.is_wr_aio_outstanding())
+ {
+ aio_cycle(); // check if any AIOs have returned
+ if(dblks_rem() == 0 && _rrfc.is_compl() && !_rrfc.is_wr_aio_outstanding())
+ {
+ if (_jc->unflushed_dblks() > 0)
+ _jc->flush();
+ else if (!_aio_evt_rem)
+ return RHM_IORES_EMPTY;
+ }
+ }
+
+ // Check write state of this token is ENQ - required for read
+ if (dtokp)
+ {
+ if (!dtokp->is_readable())
+ {
+ std::ostringstream oss;
+ oss << std::hex << std::setfill('0');
+ oss << "dtok_id=0x" << std::setw(8) << dtokp->id();
+ oss << "; dtok_rid=0x" << std::setw(16) << dtokp->rid();
+ oss << "; dtok_wstate=" << dtokp->wstate_str();
+ throw jexception(jerrno::JERR_RMGR_ENQSTATE, oss.str(), "rmgr", "pre_read_check");
+ }
+ }
+
+ return RHM_IORES_SUCCESS;
+}
+
+iores
+rmgr::read_enq(rec_hdr& h, void* rptr, data_tok* dtokp)
+{
+ if (_page_cb_arr[_pg_index]._state != AIO_COMPLETE)
+ {
+ aio_cycle(); // check if any AIOs have returned
+ return RHM_IORES_PAGE_AIOWAIT;
+ }
+
+ // Read data from this page, first block will have header and data size.
+ u_int32_t dblks_rd = _enq_rec.decode(h, rptr, dtokp->dblocks_read(), dblks_rem());
+ dtokp->incr_dblocks_read(dblks_rd);
+
+ _pg_offset_dblks += dblks_rd;
+
+ // If data still incomplete, move to next page and decode again
+ while (dtokp->dblocks_read() < _enq_rec.rec_size_dblks())
+ {
+ rotate_page();
+ if (_page_cb_arr[_pg_index]._state != AIO_COMPLETE)
+ {
+ dtokp->set_rstate(data_tok::READ_PART);
+ dtokp->set_dsize(_enq_rec.data_size());
+ return RHM_IORES_PAGE_AIOWAIT;
+ }
+
+ rptr = (void*)((char*)_page_ptr_arr[_pg_index]);
+ dblks_rd = _enq_rec.decode(h, rptr, dtokp->dblocks_read(), dblks_rem());
+ dtokp->incr_dblocks_read(dblks_rd);
+ _pg_offset_dblks += dblks_rd;
+ }
+
+ // If we have finished with this page, rotate it
+ if (dblks_rem() == 0)
+ rotate_page();
+
+ // Set the record size in dtokp
+ dtokp->set_rstate(data_tok::READ);
+ dtokp->set_dsize(_enq_rec.data_size());
+ return RHM_IORES_SUCCESS;
+}
+
+void
+rmgr::consume_xid_rec(rec_hdr& h, void* rptr, data_tok* dtokp)
+{
+ if (h._magic == RHM_JDAT_ENQ_MAGIC)
+ {
+ enq_hdr ehdr;
+ std::memcpy(&ehdr, rptr, sizeof(enq_hdr));
+ if (ehdr.is_external())
+ dtokp->set_dsize(ehdr._xidsize + sizeof(enq_hdr) + sizeof(rec_tail));
+ else
+ dtokp->set_dsize(ehdr._xidsize + ehdr._dsize + sizeof(enq_hdr) + sizeof(rec_tail));
+ }
+ else if (h._magic == RHM_JDAT_DEQ_MAGIC)
+ {
+ deq_hdr dhdr;
+ std::memcpy(&dhdr, rptr, sizeof(deq_hdr));
+ if (dhdr._xidsize)
+ dtokp->set_dsize(dhdr._xidsize + sizeof(deq_hdr) + sizeof(rec_tail));
+ else
+ dtokp->set_dsize(sizeof(deq_hdr));
+ }
+ else if (h._magic == RHM_JDAT_TXA_MAGIC || h._magic == RHM_JDAT_TXC_MAGIC)
+ {
+ txn_hdr thdr;
+ std::memcpy(&thdr, rptr, sizeof(txn_hdr));
+ dtokp->set_dsize(thdr._xidsize + sizeof(txn_hdr) + sizeof(rec_tail));
+ }
+ else
+ {
+ std::ostringstream oss;
+ oss << "Record type found = \"" << (char*)&h._magic << "\"";
+ throw jexception(jerrno::JERR_RMGR_BADRECTYPE, oss.str(), "rmgr", "consume_xid_rec");
+ }
+ dtokp->set_dblocks_read(0);
+ skip(dtokp);
+}
+
+void
+rmgr::consume_filler()
+{
+ // Filler (Magic "RHMx") is one dblk by definition
+ _pg_offset_dblks++;
+ if (dblks_rem() == 0)
+ rotate_page();
+}
+
+iores
+rmgr::skip(data_tok* dtokp)
+{
+ u_int32_t dsize_dblks = jrec::size_dblks(dtokp->dsize());
+ u_int32_t tot_dblk_cnt = dtokp->dblocks_read();
+ while (true)
+ {
+ u_int32_t this_dblk_cnt = 0;
+ if (dsize_dblks - tot_dblk_cnt > dblks_rem())
+ this_dblk_cnt = dblks_rem();
+ else
+ this_dblk_cnt = dsize_dblks - tot_dblk_cnt;
+ if (this_dblk_cnt)
+ {
+ dtokp->incr_dblocks_read(this_dblk_cnt);
+ _pg_offset_dblks += this_dblk_cnt;
+ tot_dblk_cnt += this_dblk_cnt;
+ }
+ // If skip still incomplete, move to next page and decode again
+ if (tot_dblk_cnt < dsize_dblks)
+ {
+ if (dblks_rem() == 0)
+ rotate_page();
+ if (_page_cb_arr[_pg_index]._state != AIO_COMPLETE)
+ {
+ dtokp->set_rstate(data_tok::SKIP_PART);
+ return RHM_IORES_PAGE_AIOWAIT;
+ }
+ }
+ else
+ {
+ // Skip complete, put state back to unread
+ dtokp->set_rstate(data_tok::UNREAD);
+ dtokp->set_dsize(0);
+ dtokp->set_dblocks_read(0);
+
+ // If we have finished with this page, rotate it
+ if (dblks_rem() == 0)
+ rotate_page();
+ return RHM_IORES_SUCCESS;
+ }
+ }
+}
+
+iores
+rmgr::aio_cycle()
+{
+ // Perform validity checks
+ if (_fhdr_rd_outstanding) // read of file header still outstanding in aio
+ return RHM_IORES_SUCCESS;
+ if (!_rrfc.is_valid())
+ {
+ // Flush and reset all read states and pointers
+ flush(&jcntl::_aio_cmpl_timeout);
+
+ _jc->get_earliest_fid(); // determine initial file to read; calls _rrfc.set_findex() to set value
+ // If this file has not yet been written to, return RHM_IORES_EMPTY
+ if (_rrfc.is_void() && !_rrfc.is_wr_aio_outstanding())
+ return RHM_IORES_EMPTY;
+ init_file_header_read(); // send off AIO read request for file header
+ return RHM_IORES_SUCCESS;
+ }
+
+ int16_t first_uninit = -1;
+ u_int16_t num_uninit = 0;
+ u_int16_t num_compl = 0;
+ bool outstanding = false;
+ // Index must start with current buffer and cycle around so that first
+ // uninitialized buffer is initialized first
+ for (u_int16_t i=_pg_index; i<_pg_index+_cache_num_pages; i++)
+ {
+ int16_t ci = i % _cache_num_pages;
+ switch (_page_cb_arr[ci]._state)
+ {
+ case UNUSED:
+ if (first_uninit < 0)
+ first_uninit = ci;
+ num_uninit++;
+ break;
+ case IN_USE:
+ break;
+ case AIO_PENDING:
+ outstanding = true;
+ break;
+ case AIO_COMPLETE:
+ num_compl++;
+ break;
+ default:;
+ }
+ }
+ iores res = RHM_IORES_SUCCESS;
+ if (num_uninit)
+ res = init_aio_reads(first_uninit, num_uninit);
+ else if (num_compl == _cache_num_pages) // This condition exists after invalidation
+ res = init_aio_reads(0, _cache_num_pages);
+ if (outstanding)
+ get_events(AIO_COMPLETE, 0);
+ return res;
+}
+
+iores
+rmgr::init_aio_reads(const int16_t first_uninit, const u_int16_t num_uninit)
+{
+ for (int16_t i=0; i<num_uninit; i++)
+ {
+ if (_rrfc.is_void()) // Nothing to do; this file not yet written to
+ break;
+
+ if (_rrfc.subm_offs() == 0)
+ {
+ _rrfc.add_subm_cnt_dblks(JRNL_SBLK_SIZE);
+ _rrfc.add_cmpl_cnt_dblks(JRNL_SBLK_SIZE);
+ }
+
+ // TODO: Future perf improvement: Do a single AIO read for all available file
+ // space into all contiguous empty pages in one AIO operation.
+
+ u_int32_t file_rem_dblks = _rrfc.remaining_dblks();
+ file_rem_dblks -= file_rem_dblks % JRNL_SBLK_SIZE; // round down to closest sblk boundary
+ u_int32_t pg_size_dblks = JRNL_RMGR_PAGE_SIZE * JRNL_SBLK_SIZE;
+ u_int32_t rd_size = file_rem_dblks > pg_size_dblks ? pg_size_dblks : file_rem_dblks;
+ if (rd_size)
+ {
+ int16_t pi = (i + first_uninit) % _cache_num_pages;
+ // TODO: For perf, combine contiguous pages into single read
+ // 1 or 2 AIOs needed depending on whether read block folds
+ aio_cb* aiocbp = &_aio_cb_arr[pi];
+ aio::prep_pread_2(aiocbp, _rrfc.fh(), _page_ptr_arr[pi], rd_size * JRNL_DBLK_SIZE, _rrfc.subm_offs());
+ if (aio::submit(_ioctx, 1, &aiocbp) < 0)
+ throw jexception(jerrno::JERR__AIO, "rmgr", "init_aio_reads");
+ _rrfc.add_subm_cnt_dblks(rd_size);
+ _aio_evt_rem++;
+ _page_cb_arr[pi]._state = AIO_PENDING;
+ _page_cb_arr[pi]._rfh = _rrfc.file_controller();
+ }
+ else // If there is nothing to read for this page, neither will there be for the others...
+ break;
+ if (_rrfc.file_rotate())
+ _rrfc.rotate();
+ }
+ return RHM_IORES_SUCCESS;
+}
+
+void
+rmgr::rotate_page()
+{
+ _page_cb_arr[_pg_index]._rdblks = 0;
+ _page_cb_arr[_pg_index]._state = UNUSED;
+ if (_pg_offset_dblks >= JRNL_RMGR_PAGE_SIZE * JRNL_SBLK_SIZE)
+ {
+ _pg_offset_dblks = 0;
+ _pg_cntr++;
+ }
+ if (++_pg_index >= _cache_num_pages)
+ _pg_index = 0;
+ aio_cycle();
+ _pg_offset_dblks = 0;
+ // This counter is for bookkeeping only, page rotates are handled directly in init_aio_reads()
+ // FIXME: _pg_cntr should be sync'd with aio ops, not use of page as it is now...
+ // Need to move reset into if (_rrfc.file_rotate()) above.
+ if (_pg_cntr >= (_jc->jfsize_sblks() / JRNL_RMGR_PAGE_SIZE))
+ _pg_cntr = 0;
+}
+
+u_int32_t
+rmgr::dblks_rem() const
+{
+ return _page_cb_arr[_pg_index]._rdblks - _pg_offset_dblks;
+}
+
+void
+rmgr::set_params_null(void** const datapp, std::size_t& dsize, void** const xidpp, std::size_t& xidsize)
+{
+ *datapp = 0;
+ dsize = 0;
+ *xidpp = 0;
+ xidsize = 0;
+}
+
+void
+rmgr::init_file_header_read()
+{
+ _jc->fhdr_wr_sync(_rrfc.index()); // wait if the file header write is outstanding
+ int rfh = _rrfc.fh();
+ aio::prep_pread_2(_fhdr_aio_cb_ptr, rfh, _fhdr_buffer, _sblksize, 0);
+ if (aio::submit(_ioctx, 1, &_fhdr_aio_cb_ptr) < 0)
+ throw jexception(jerrno::JERR__AIO, "rmgr", "init_file_header_read");
+ _aio_evt_rem++;
+ _rrfc.add_subm_cnt_dblks(JRNL_SBLK_SIZE);
+ _fhdr_rd_outstanding = true;
+}
+
+/* TODO (sometime in the future)
+const iores
+rmgr::get(const u_int64_t& rid, const std::size_t& dsize, const std::size_t& dsize_avail,
+ const void** const data, bool auto_discard)
+{
+ return RHM_IORES_SUCCESS;
+}
+
+const iores
+rmgr::discard(data_tok* dtokp)
+{
+ return RHM_IORES_SUCCESS;
+}
+*/
+
+} // namespace journal
+} // namespace mrg
Propchange: qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/rmgr.cpp
------------------------------------------------------------------------------
svn:eol-style = native
Added: qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/rmgr.h
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/rmgr.h?rev=1501895&view=auto
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/rmgr.h (added)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/rmgr.h Wed Jul 10 18:20:19 2013
@@ -0,0 +1,114 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+/**
+ * \file rmgr.h
+ *
+ * Qpid asynchronous store plugin library
+ *
+ * File containing code for class mrg::journal::rmgr (read manager). See
+ * class documentation for details.
+ *
+ * \author Kim van der Riet
+ */
+
+#ifndef QPID_LEGACYSTORE_JRNL_RMGR_H
+#define QPID_LEGACYSTORE_JRNL_RMGR_H
+
+namespace mrg
+{
+namespace journal
+{
+class rmgr;
+}
+}
+
+#include <cstring>
+#include "qpid/legacystore/jrnl/enums.h"
+#include "qpid/legacystore/jrnl/file_hdr.h"
+#include "qpid/legacystore/jrnl/pmgr.h"
+#include "qpid/legacystore/jrnl/rec_hdr.h"
+#include "qpid/legacystore/jrnl/rrfc.h"
+
+namespace mrg
+{
+namespace journal
+{
+
+ /**
+ * \brief Class for managing a read page cache of arbitrary size and number of pages.
+ *
+ * The read page cache works on the principle of filling as many pages as possilbe in advance of
+ * reading the data. This ensures that delays caused by AIO operations are minimized.
+ */
+ class rmgr : public pmgr
+ {
+ private:
+ rrfc& _rrfc; ///< Ref to read rotating file controller
+ rec_hdr _hdr; ///< Header used to determind record type
+
+ void* _fhdr_buffer; ///< Buffer used for fhdr reads
+ aio_cb* _fhdr_aio_cb_ptr; ///< iocb pointer for fhdr reads
+ file_hdr _fhdr; ///< file header instance for reading file headers
+ bool _fhdr_rd_outstanding; ///< true if a fhdr read is outstanding
+
+ public:
+ rmgr(jcntl* jc, enq_map& emap, txn_map& tmap, rrfc& rrfc);
+ virtual ~rmgr();
+
+ using pmgr::initialize;
+ void initialize(aio_callback* const cbp);
+ iores read(void** const datapp, std::size_t& dsize, void** const xidpp,
+ std::size_t& xidsize, bool& transient, bool& external, data_tok* dtokp,
+ bool ignore_pending_txns);
+ int32_t get_events(page_state state, timespec* const timeout, bool flush = false);
+ void recover_complete();
+ inline iores synchronize() { if (_rrfc.is_valid()) return RHM_IORES_SUCCESS; return aio_cycle(); }
+ void invalidate();
+ bool wait_for_validity(timespec* const timeout, const bool throw_on_timeout = false);
+
+ /* TODO (if required)
+ const iores get(const u_int64_t& rid, const std::size_t& dsize, const std::size_t& dsize_avail,
+ const void** const data, bool auto_discard);
+ const iores discard(data_tok* dtok);
+ */
+
+ private:
+ void clean();
+ void flush(timespec* timeout);
+ iores pre_read_check(data_tok* dtokp);
+ iores read_enq(rec_hdr& h, void* rptr, data_tok* dtokp);
+ void consume_xid_rec(rec_hdr& h, void* rptr, data_tok* dtokp);
+ void consume_filler();
+ iores skip(data_tok* dtokp);
+ iores aio_cycle();
+ iores init_aio_reads(const int16_t first_uninit, const u_int16_t num_uninit);
+ void rotate_page();
+ u_int32_t dblks_rem() const;
+ void set_params_null(void** const datapp, std::size_t& dsize, void** const xidpp,
+ std::size_t& xidsize);
+ void init_file_header_read();
+ };
+
+} // namespace journal
+} // namespace mrg
+
+#endif // ifndef QPID_LEGACYSTORE_JRNL_RMGR_H
Propchange: qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/rmgr.h
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/rmgr.h
------------------------------------------------------------------------------
svn:keywords = Author Date Id Rev URL
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org