You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by as...@apache.org on 2013/10/08 17:09:01 UTC

svn commit: r1530301 [2/8] - in /qpid/trunk/qpid: cpp/src/tests/legacystore/ cpp/src/tests/legacystore/federation/ cpp/src/tests/legacystore/jrnl/ cpp/src/tests/legacystore/jrnl/jtt/ cpp/src/tests/legacystore/python_tests/ tools/src/py/ tools/src/py/qp...

Added: qpid/trunk/qpid/cpp/src/tests/legacystore/jrnl/_st_helper_fns.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/legacystore/jrnl/_st_helper_fns.h?rev=1530301&view=auto
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/legacystore/jrnl/_st_helper_fns.h (added)
+++ qpid/trunk/qpid/cpp/src/tests/legacystore/jrnl/_st_helper_fns.h Tue Oct  8 15:09:00 2013
@@ -0,0 +1,882 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+// NOTE: This file is included in _st_*.cpp files inside the QPID_AUTO_TEST_SUITE()
+// definition.
+
+#define MAX_AIO_SLEEPS 500
+#define AIO_SLEEP_TIME 1000
+#define NUM_TEST_JFILES 4
+#define NUM_DEFAULT_JFILES  8
+#define JRNL_DEFAULT_FSIZE  24 // Multiples of JRNL_RMGR_PAGE_SIZE
+#define TEST_JFSIZE_SBLKS 128
+#define DEFAULT_JFSIZE_SBLKS (JRNL_DEFAULT_FSIZE * JRNL_RMGR_PAGE_SIZE)
+#define NUM_MSGS 5
+#define MSG_REC_SIZE_DBLKS 2
+#define MSG_SIZE (MSG_REC_SIZE_DBLKS * JRNL_DBLK_SIZE) - sizeof(enq_hdr) - sizeof(rec_tail)
+#define LARGE_MSG_REC_SIZE_DBLKS (JRNL_SBLK_SIZE * JRNL_RMGR_PAGE_SIZE)
+#define LARGE_MSG_SIZE (LARGE_MSG_REC_SIZE_DBLKS * JRNL_DBLK_SIZE) - sizeof(enq_hdr) - sizeof(rec_tail)
+#define XID_SIZE 64
+
+#define XLARGE_MSG_RATIO (1.0 * LARGE_MSG_REC_SIZE / JRNL_DBLK_SIZE / JRNL_SBLK_SIZE / JRNL_RMGR_PAGE_SIZE)
+#define XLARGE_MSG_THRESHOLD (int)(JRNL_DEFAULT_FSIZE * NUM_DEFAULT_JFILES * JRNL_ENQ_THRESHOLD / 100 / LARGE_MSG_RATIO)
+
+#define NUM_JFILES 4
+#define JFSIZE_SBLKS 128
+
+const char* tdp = getenv("TMP_DATA_DIR");
+const string test_dir(tdp && strlen(tdp) > 0 ? string(tdp) + "/" + test_filename : "/var/tmp/jrnl_test");
+
+class test_dtok : public data_tok
+{
+private:
+    bool flag;
+public:
+    test_dtok() : data_tok(), flag(false) {}
+    virtual ~test_dtok() {}
+    bool done() { if (flag || _wstate == NONE) return true; else { flag = true; return false; } }
+};
+
+class test_jrnl_cb : public aio_callback {
+    virtual void wr_aio_cb(std::vector<data_tok*>& dtokl)
+    {
+        for (std::vector<data_tok*>::const_iterator i=dtokl.begin(); i!=dtokl.end(); i++)
+        {
+            test_dtok* dtp = static_cast<test_dtok*>(*i);
+            if (dtp->done())
+               delete dtp;
+        }
+    }
+    virtual void rd_aio_cb(std::vector<u_int16_t>& /*pil*/) {}
+};
+
+class test_jrnl : public jcntl
+{
+test_jrnl_cb* cb;
+
+public:
+    test_jrnl(const std::string& jid, const std::string& jdir, const std::string& base_filename, test_jrnl_cb& cb0) :
+        jcntl(jid, jdir, base_filename),
+        cb(&cb0) {}
+    virtual ~test_jrnl() {}
+    void initialize(const u_int16_t num_jfiles, const bool ae, const u_int16_t ae_max_jfiles,
+            const u_int32_t jfsize_sblks)
+    {
+        jcntl::initialize(num_jfiles, ae, ae_max_jfiles, jfsize_sblks, JRNL_WMGR_DEF_PAGES, JRNL_WMGR_DEF_PAGE_SIZE,
+                cb);
+        _jdir.create_dir();
+    }
+    void recover(const u_int16_t num_jfiles, const bool ae, const u_int16_t ae_max_jfiles, const u_int32_t jfsize_sblks,
+            vector<string>* txn_list, u_int64_t& highest_rid)
+    { jcntl::recover(num_jfiles, ae, ae_max_jfiles, jfsize_sblks, JRNL_WMGR_DEF_PAGES, JRNL_WMGR_DEF_PAGE_SIZE, cb,
+            txn_list, highest_rid); }
+};
+
+/*
+* This class is for testing recover functionality by maintaining an internal lfid-pfid map, then creating physical
+* journal file stubs (just the fhdr section of the journal) and jinf file. This allows the recover functionality (which
+* analyzes these components to determine recover order).
+*
+* First set up a map or "blueprint" of what the journal should look like for recovery, then have the class create the
+* physical files. The jinf object under test then reads and analyzes the created journal, and it's analysis is checked
+* against what is expected.
+*
+* General usage pattern:
+* 1. Create instance of lfid_pfid_map.
+* 2. Call lfid_pfid_map::journal_create() to simulate initial journal creation.
+* 3. (optional) Call lfid_pfid_map::journal_insert() one or more times to simulate the addition of journal files.
+* 4. Call lfid_pfid_map::write_journal() to create dummy journal files (files containing only file headers)
+* 5. Create and initialize the jinf object under test
+* 6. Call jinf::analyze() to determine the pfid order - and thus also first and last lids
+* 7. Call lfid_pfid_map::check_analysis() to check the conclusions of the analysis
+* 8. Call lfid_pfid_map::destroy_journal() to delete the journal files and reset the lfid_pfid_map object.
+* 9. (optional) Back to step 2 for more tests
+*
+* See the individual methods below for more details.
+*/
+class lfid_pfid_map
+{
+    public:
+        typedef pair<u_int16_t, file_hdr> lppair;           // Used for loading the map
+        typedef multimap<u_int16_t, file_hdr> lpmap;        // Stores the journal "plan" before it is created on-disk
+        typedef lpmap::const_iterator lpmap_citr;           // General purpose iterator
+        typedef pair<lpmap_citr, lpmap_citr> lpmap_range;   // Range of values returned by multimap's equal_range() fn
+
+    private:
+        string _jid;                // Journal id
+        string _base_filename;      // Base filename
+        lpmap _map;                 // Stores the journal "blueprint" before it is created on-disk
+        u_int16_t _num_used_files;  // number of files which contain jorunals
+        u_int16_t _oldest_lfid;     // lfid where owi flips; always 0 if !_full
+        u_int16_t _last_pfid;       // last pfid (ie last file added)
+
+    public:
+        lfid_pfid_map(const string& jid, const string& base_filename) :
+                _jid(jid), _base_filename(base_filename), _num_used_files(0), _oldest_lfid(0), _last_pfid(0)
+        {}
+        virtual ~lfid_pfid_map() {}
+
+        // Mainly used for debugging
+        void print()
+        {
+            int cnt = 0;
+            for (lpmap_citr i=_map.begin(); i!=_map.end(); i++, cnt++)
+            {
+                const file_hdr fh = i->second;
+                cout << "  " << cnt << ": owi=" << (fh.get_owi()?"t":"f") << hex << " frid=0x" << fh._rid;
+                cout << " pfid=0x" << fh._pfid << " lfid=0x" << fh._lfid << " fro=0x" << fh._fro << dec << endl;
+            }
+        }
+
+        std::size_t size()
+        {
+            return _map.size();
+        }
+
+        /*
+        * Method journal_create(): Used to simulate the initial creation of a journal before file insertions
+        * take place.
+        *
+        *      num_jfiles: The initial journal file count.
+        * num_used_jfiles: If this number is less than num_jfiles, it indicates a clean journal that has not yet
+        *                  completed its first rotation, and some files are empty (ie all null). The first
+        *                  num_used_jfiles will contain file headers, the remainder will be blank.
+        *     oldest_lfid: The lfid (==pfid, see note 1 below) at which the owi flag flips. During normal operation,
+        *                  each time the journal rotates back to file 0, a flag (called the overwrite indicator or owi)
+        *                  is flipped. This flag is saved in the file header. During recovery, if scanning from logical
+        *                  file 0 upwards, the file at which this flag reverses from its value in file 0 is the file
+        *                  that was to have been overwritten next, and is thus the "oldest" file. Recovery analysis must
+        *                  start with this file. oldest_lfid sets the file at which this flag will flip value for the
+        *                  simulated recovery analysis. Note that this will be ignored if num_used_jfiles < num_jfiles,
+        *                  as it is not possible for an overwrite to have occurred if not all the files have been used.
+        *       first_owi: Sets the value of the owi flag in file 0. If set to false, then the flip will be found with
+        *                  a true flag (and visa versa).
+        *
+        * NOTES:
+        * 1. By definition, the lfids and pfids coincide for a journal containing no inserted files. Thus pfid == lfid
+        *    for all journals created after using initial_journal_create() alone.
+        * 2. By definition, if a journal is not full (num_used_jfiles < num_jfiles), then all owi flags for those files
+        *    that are used must be the same. It is not possible for an overwrite situation to arise if a journal is not
+        *    full.
+        * 3. This function acts on map _map only, and does not create any test files. Call write_journal() to do that.
+        * 4. This function must be called on a clean test object or on one where the previous test data has been
+        *    cleared by calling journal_destroy(). Running this function more than once on existing data will
+        *    result in invalid journals which cannot be recovered.
+        */
+        void journal_create(const u_int16_t num_jfiles,      // Total number of files
+                            const u_int16_t num_used_jfiles, // Number of used files, rest empty at end
+                            const u_int16_t oldest_lfid = 0, // Fid where owi reverses
+                            const u_int16_t bad_lfid = 0,    // Fid where owi reverses again (must be > oldest_lifd),
+                                                             // used for testing bad owi detection
+                            const bool first_owi = false)    // Value of first owi flag (ie pfid=0)
+        {
+            const bool full = num_used_jfiles == num_jfiles;
+            bool owi = first_owi;
+            _oldest_lfid = full ? oldest_lfid : 0;
+            for (u_int16_t lfid = 0; lfid < num_jfiles; lfid++)
+            {
+                const u_int16_t pfid = lfid;
+                file_hdr fh;
+                if (pfid < num_used_jfiles)
+                {
+                    _num_used_files = num_used_jfiles;
+                    /*
+                    * Invert the owi flag from its current value (initially given by first_owi param) only if:
+                    * 1. The journal is full (ie all files are used)
+                    *    AND
+                    * 2. oldest_lfid param is non-zero (this is default, but lfid 0 being inverted is logically
+                    *    inconsistent with first_owi parameter being present)
+                    *    AND
+                    * 3. Either:
+                    *    * current lfid == oldest_lfid (ie we are preparing the oldest lfid)
+                    *      OR
+                    *    * current lfid == bad_lfid AND bad_lfid > oldest (ie we are past the oldest and preparing the
+                    *      bad lfid)
+                    */
+                    if (full && oldest_lfid > 0 &&
+                                    (lfid == oldest_lfid || (bad_lfid > oldest_lfid && lfid == bad_lfid)))
+                        owi = !owi;
+                    const u_int64_t frid = u_int64_t(random());
+                    init_fhdr(fh, frid, pfid, lfid, owi);
+                }
+                _map.insert(lppair(lfid, fh));
+            }
+        }
+
+        /*
+        * Method journal_insert(): Used to simulate the insertion of journal files into an existing journal.
+        *
+        *  after_lfid: The logical file id (lfid) after which the new file is to be inserted.
+        *   num_files: The number of files to be inserted.
+        * adjust_lids: Flag indicating that the lids of files _following_ the inserted files are to be adjusted upwards
+        *              by the number of inserted files. Not doing so simulates a recovery immediately after insertion
+        *              but before the following files are overwritten with their new lids. If this is set false, then:
+        *              a) after_lfid MUST be the most recent file (_oldest_lfid-1 ie last lfid before owi changes).
+        *              b) This call must be the last insert call.
+        *
+        * NOTES:
+        * 1. It is not possible to insert before lfid/pfid 0; thus these are always coincidental. This operation is
+        *    logically equivalent to inserting after the last lfid, which is possible.
+        * 2. It is not possible to insert into a journal that is not full. Doing so will result in an unrecoverable
+        *    journal (one that is logically inconsistent that can never occur in reality).
+        * 3. If a journal is stopped/interrupted immediately after a file insertion, there could be duplicate lids in
+        *    play at recovery, as the following file lids in their headers are only overwritten when the file is
+        *    eventually written to during normal operation. The owi flags, however, are used to determine which of the
+        *    ambiguous lids are the inserted files.
+        * 4. This function acts on map _map only, and does not create any test files. Call write_journal() to do that.
+        */
+        void journal_insert(const u_int16_t after_lfid,     // Insert files after this lfid
+                            const u_int16_t num_files = 1,  // Number of files to insert
+                            const bool adjust_lids = true)  // Adjust lids following inserted files
+        {
+            if (num_files == 0) return;
+            _num_used_files += num_files;
+            const u_int16_t num_jfiles_before_append = _map.size();
+            lpmap_citr i = _map.find(after_lfid);
+            if (i == _map.end()) BOOST_FAIL("Unable to find lfid=" << after_lfid << " in map.");
+            const file_hdr fh_before = (*i).second;
+
+            // Move overlapping lids (if req'd)
+            if (adjust_lids && after_lfid < num_jfiles_before_append - 1)
+            {
+                for (u_int16_t lfid = num_jfiles_before_append - 1; lfid > after_lfid; lfid--)
+                {
+                    lpmap_citr itr = _map.find(lfid);
+                    if (itr == _map.end()) BOOST_FAIL("Unable to find lfid=" << after_lfid << " in map.");
+                    file_hdr fh = itr->second;
+                    _map.erase(lfid);
+                    fh._lfid += num_files;
+                    if (lfid == _oldest_lfid)
+                        _oldest_lfid += num_files;
+                    _map.insert(lppair(fh._lfid, fh));
+                }
+            }
+
+            // Add new file headers
+            u_int16_t pfid = num_jfiles_before_append;
+            u_int16_t lfid = after_lfid + 1;
+            while (pfid < num_jfiles_before_append + num_files)
+            {
+                const u_int64_t frid = u_int64_t(random());
+                const size_t fro = 0x200;
+                const file_hdr fh(RHM_JDAT_FILE_MAGIC, RHM_JDAT_VERSION, frid, pfid, lfid, fro, fh_before.get_owi(),
+                        true);
+                _map.insert(lppair(lfid, fh));
+                _last_pfid = pfid;
+                pfid++;
+                lfid++;
+            }
+        }
+
+        /*
+        * Get the list of pfids in the map in order of lfid. The pfids are appended to the supplied vector. Only
+        * as many headers as are in the map are appended.
+        * NOTE: will clear any contents from supplied vector before appending pfid list.
+        */
+        void get_pfid_list(vector<u_int16_t>& pfid_list)
+        {
+            pfid_list.clear();
+            for (lpmap_citr i = _map.begin(); i != _map.end(); i++)
+                pfid_list.push_back(i->second._pfid);
+        }
+
+        /*
+        * Get the list of lfids in the map. The lfids are appended to the supplied vector in the order they appear
+        * in the map (which is not necessarily the natural or sorted order).
+        * NOTE: will clear any contents from supplied vector before appending lfid list.
+        */
+        void get_lfid_list(vector<u_int16_t>& lfid_list)
+        {
+            lfid_list.clear();
+            lfid_list.assign(_map.size(), 0);
+            for (lpmap_citr i = _map.begin(); i != _map.end(); i++)
+                lfid_list[i->second._pfid] = i->first;
+        }
+
+        /*
+        * Method check_analysis(): Used to check the result of the test jinf object analysis by comparing the pfid order
+        * array it produces against the internal map.
+        *
+        * ji: A ref to the jinf object under test.
+        */
+        void check_analysis(jinf& ji)   // jinf object under test after analyze() has been called
+        {
+            BOOST_CHECK_EQUAL(ji.get_first_pfid(), get_first_pfid());
+            BOOST_CHECK_EQUAL(ji.get_last_pfid(), get_last_pfid());
+
+            jinf::pfid_list& pfidl = ji.get_pfid_list();
+            const u_int16_t num_jfiles = _map.size();
+            const bool all_used = _num_used_files == num_jfiles;
+            BOOST_CHECK_EQUAL(pfidl.size(), _num_used_files);
+
+            const u_int16_t lfid_start = all_used ? _oldest_lfid : 0;
+            // Because a simulated failure would leave lfid dups in map and last_fid would not exist in map in this
+            // case, we must find lfid_stop via pfid instead. Search for pfid == num_files.
+            lpmap_citr itr = _map.begin();
+            while (itr != _map.end() && itr->second._pfid != _num_used_files - 1) itr++;
+            if (itr == _map.end())
+                BOOST_FAIL("check(): Unable to find pfid=" << (_num_used_files - 1) << " in map.");
+            const u_int16_t lfid_stop = itr->second._lfid;
+
+            std::size_t fidl_index = 0;
+            for (u_int16_t lfid_cnt = lfid_start; lfid_cnt < lfid_stop; lfid_cnt++, fidl_index++)
+            {
+                const u_int16_t lfid = lfid_cnt % num_jfiles;
+                lpmap_citr itr = _map.find(lfid);
+                if (itr == _map.end())
+                    BOOST_FAIL("check(): Unable to find lfid=" << lfid << " in map.");
+                BOOST_CHECK_EQUAL(itr->second._pfid, pfidl[fidl_index]);
+            }
+        }
+
+        /*
+        * Method get_pfid(): Look up a pfid from a known lfid.
+        */
+        u_int16_t get_pfid(const u_int16_t lfid, const bool initial_owi = false)
+        {
+            switch (_map.count(lfid))
+            {
+                case 1:
+                    return _map.find(lfid)->second._pfid;
+                case 2:
+                    for (lpmap_citr itr = _map.lower_bound(lfid); itr != _map.upper_bound(lfid); itr++)
+                    {
+                        if (itr->second.get_owi() != initial_owi)
+                            return itr->second._pfid;
+                    }
+                default:;
+            }
+            BOOST_FAIL("get_pfid(): lfid=" << lfid << " not found in map.");
+            return 0xffff;
+        }
+
+        /*
+        * Method get_first_pfid(): Look up the first (oldest, or next-to-be-overwritten) pfid in the analysis sequence.
+        */
+        u_int16_t get_first_pfid()
+        {
+            return get_pfid(_oldest_lfid);
+        }
+
+        /*
+        * Method get_last_pfid(): Look up the last (newest, or most recently written) pfid in the analysis sequence.
+        */
+        u_int16_t get_last_pfid()
+        {
+            u_int16_t flfid = 0;
+            if (_num_used_files == _map.size()) // journal full?
+            {
+                if (_oldest_lfid)
+                {
+                    // if failed insert, cycle past duplicate lids
+                    while (_map.count(_oldest_lfid) == 2)
+                        _oldest_lfid++;
+                    while (_map.find(_oldest_lfid) != _map.end() && _map.find(_oldest_lfid)->second.get_owi() == false)
+                        _oldest_lfid++;
+                    flfid = _oldest_lfid - 1;
+                }
+                else
+                    flfid = _map.size() - 1;
+            }
+            else
+                flfid = _num_used_files - 1;
+            return get_pfid(flfid, true);
+        }
+
+        /*
+        * Method write_journal(): Used to create the dummy journal files from the built-up map created by calling
+        * initial_journal_create() and optionally journal_append() one or more times. Since the jinf object reads the
+        * jinf file and the file headers only, the create object creates a dummy journal file containing only a file
+        * header (512 bytes each) and a single jinf file which contains the journal metadata required for recovery
+        * analysis.
+        */
+        void write_journal(const bool ae, const u_int16_t ae_max_jfiles, const u_int32_t fsize_sblks = JFSIZE_SBLKS)
+        {
+            create_jinf(ae, ae_max_jfiles);
+            u_int16_t pfid = 0;
+            for (lpmap_citr itr = _map.begin(); itr != _map.end(); itr++, pfid++)
+            {
+                if (itr->second._pfid == 0 && itr->second._magic == 0) // empty header, use pfid counter instead
+                    create_journal_file(pfid, itr->second, _base_filename, fsize_sblks);
+                else
+                    create_journal_file(itr->second._pfid, itr->second, _base_filename, fsize_sblks);
+            }
+        }
+
+        /*
+        * Method destroy_journal(): Destroy the files created by create_journal() and reset the lfid_pfid_map test
+        * object. A new test may be started using the same lfid_pfid_map test object once this call has been made.
+        */
+        void destroy_journal()
+        {
+            for (u_int16_t pfid = 0; pfid < _map.size(); pfid++)
+            {
+                string fn = create_journal_filename(pfid, _base_filename);
+                BOOST_WARN_MESSAGE(::unlink(fn.c_str()) == 0, "destroy_journal(): Failed to remove file " << fn);
+            }
+            clean_journal_info_file(_base_filename);
+            _map.clear();
+            _num_used_files = 0;
+            _oldest_lfid = 0;
+            _last_pfid = 0;
+        }
+
+        /*
+        * Method create_new_jinf(): This static call creates a default jinf file only. This is used to test the read
+        * constructor of a jinf test object which reads a jinf file at instantiation.
+        */
+        static void create_new_jinf(const string jid, const string base_filename, const bool ae)
+        {
+            if (jdir::exists(test_dir))
+                jdir::delete_dir(test_dir);
+            create_jinf(NUM_JFILES, ae, (ae ? 5 * NUM_JFILES : 0), jid, base_filename);
+        }
+
+        /*
+        * Method clean_journal_info_file(): This static method deletes only a jinf file without harming any other
+        * journal file or its directory. This is used to clear those tests which rely only on the existence of a
+        * jinf file.
+        */
+        static void clean_journal_info_file(const string base_filename)
+        {
+            stringstream fn;
+            fn << test_dir << "/" << base_filename << "." << JRNL_INFO_EXTENSION;
+            BOOST_WARN_MESSAGE(::unlink(fn.str().c_str()) == 0, "clean_journal_info_file(): Failed to remove file " <<
+                            fn.str());
+        }
+
+        static string create_journal_filename(const u_int16_t pfid, const string base_filename)
+        {
+            stringstream fn;
+            fn << test_dir << "/" << base_filename << ".";
+            fn << setfill('0') << hex << setw(4) << pfid << "." << JRNL_DATA_EXTENSION;
+            return fn.str();
+        }
+
+    private:
+        static void init_fhdr(file_hdr& fh,
+                              const u_int64_t frid,
+                              const u_int16_t pfid,
+                              const u_int16_t lfid,
+                              const bool owi,
+                              const bool no_enq = false)
+        {
+            fh._magic = RHM_JDAT_FILE_MAGIC;
+            fh._version = RHM_JDAT_VERSION;
+#if defined(JRNL_BIG_ENDIAN)
+            fh._eflag = RHM_BENDIAN_FLAG;
+#else
+            fh._eflag = RHM_LENDIAN_FLAG;
+#endif
+            fh._uflag = owi ? rec_hdr::HDR_OVERWRITE_INDICATOR_MASK : 0;
+            fh._rid = frid;
+            fh._pfid = pfid;
+            fh._lfid = lfid;
+            fh._fro = no_enq ? 0 : 0x200;
+            timespec ts;
+            ::clock_gettime(CLOCK_REALTIME, &ts);
+            fh._ts_sec = ts.tv_sec;
+            fh._ts_nsec = ts.tv_nsec;
+        }
+
+        void create_jinf(const bool ae, const u_int16_t ae_max_jfiles)
+        {
+            if (jdir::exists(test_dir))
+                jdir::delete_dir(test_dir);
+            create_jinf(_map.size(), ae, ae_max_jfiles, _jid, _base_filename);
+        }
+
+        static void create_jinf(u_int16_t num_files, const bool ae, const u_int16_t ae_max_jfiles, const string jid,
+                const string base_filename)
+        {
+            jdir::create_dir(test_dir); // Check test dir exists; create it if not
+            timespec ts;
+            ::clock_gettime(CLOCK_REALTIME, &ts);
+            jinf ji(jid, test_dir, base_filename, num_files, ae, ae_max_jfiles, JFSIZE_SBLKS, JRNL_WMGR_DEF_PAGE_SIZE,
+                    JRNL_WMGR_DEF_PAGES, ts);
+            ji.write();
+        }
+
+        static void create_journal_file(const u_int16_t pfid,
+                                        const file_hdr& fh,
+                                        const string base_filename,
+                                        const u_int32_t fsize_sblks = JFSIZE_SBLKS,
+                                        const char fill_char = 0)
+        {
+            const std::string filename = create_journal_filename(pfid, base_filename);
+            ofstream of(filename.c_str(), ofstream::out | ofstream::trunc);
+            if (!of.good())
+                BOOST_FAIL("Unable to open test journal file \"" << filename << "\" for writing.");
+
+            write_file_header(filename, of, fh, fill_char);
+            write_file_body(of, fsize_sblks, fill_char);
+
+            of.close();
+            if (of.fail() || of.bad())
+                BOOST_FAIL("Error closing test journal file \"" << filename << "\".");
+        }
+
+        static void write_file_header(const std::string& filename,
+                                      ofstream& of,
+                                      const file_hdr& fh,
+                                      const char fill_char)
+        {
+            // write file header
+            u_int32_t cnt = sizeof(file_hdr);
+            of.write((const char*)&fh, cnt);
+            if (of.fail() || of.bad())
+                BOOST_FAIL("Error writing file header to test journal file \"" << filename << "\".");
+
+            // fill remaining sblk with fill char
+            while (cnt++ < JRNL_DBLK_SIZE * JRNL_SBLK_SIZE)
+            {
+                of.put(fill_char);
+                if (of.fail() || of.bad())
+                    BOOST_FAIL("Error writing filler to test journal file \"" << filename << "\".");
+            }
+        }
+
+        static void write_file_body(ofstream& of, const u_int32_t fsize_sblks, const char fill_char)
+        {
+            if (fsize_sblks > 1)
+            {
+                std::vector<char> sblk_buffer(JRNL_DBLK_SIZE * JRNL_SBLK_SIZE, fill_char);
+                u_int32_t fwritten_sblks = 0; // hdr
+                while (fwritten_sblks++ < fsize_sblks)
+                    of.write(&sblk_buffer[0], JRNL_DBLK_SIZE * JRNL_SBLK_SIZE);
+            }
+        }
+};
+
+const string
+get_test_name(const string& file, const string& test_name)
+{
+    cout << test_filename << "." << test_name << ": " << flush;
+    return file + "." + test_name;
+}
+
+bool
+check_iores(const string& ctxt, const iores ret, const iores exp_ret, test_dtok* dtp)
+{
+    if (ret != exp_ret)
+    {
+        delete dtp;
+        BOOST_FAIL(ctxt << ": Expected " << iores_str(exp_ret) << "; got " << iores_str(ret));
+    }
+    return false;
+}
+
+bool
+handle_jcntl_response(const iores res, jcntl& jc, unsigned& aio_sleep_cnt, const std::string& ctxt, const iores exp_ret,
+                test_dtok* dtp)
+{
+    if (res == RHM_IORES_PAGE_AIOWAIT)
+    {
+        if (++aio_sleep_cnt <= MAX_AIO_SLEEPS)
+        {
+            jc.get_wr_events(0); // *** GEV2
+            usleep(AIO_SLEEP_TIME);
+        }
+        else
+            return check_iores(ctxt, res, exp_ret, dtp);
+    }
+    else
+        return check_iores(ctxt, res, exp_ret, dtp);
+    return true;
+}
+
+u_int64_t
+enq_msg(jcntl& jc,
+        const u_int64_t rid,
+        const string& msg,
+        const bool transient,
+        const iores exp_ret = RHM_IORES_SUCCESS)
+{
+    ostringstream ctxt;
+    ctxt << "enq_msg(" << rid << ")";
+    test_dtok* dtp = new test_dtok;
+    BOOST_CHECK_MESSAGE(dtp != 0, "Data token allocation failed (dtp == 0).");
+    dtp->set_rid(rid);
+    dtp->set_external_rid(true);
+    try
+    {
+        iores res = jc.enqueue_data_record(msg.c_str(), msg.size(), msg.size(), dtp, transient);
+        check_iores(ctxt.str(), res, exp_ret, dtp);
+        u_int64_t dtok_rid = dtp->rid();
+        if (dtp->done()) delete dtp;
+        return dtok_rid;
+    }
+    catch (exception& e) { delete dtp; throw; }
+}
+
+u_int64_t
+enq_extern_msg(jcntl& jc, const u_int64_t rid, const std::size_t msg_size, const bool transient,
+        const iores exp_ret = RHM_IORES_SUCCESS)
+{
+    ostringstream ctxt;
+    ctxt << "enq_extern_msg(" << rid << ")";
+    test_dtok* dtp = new test_dtok;
+    BOOST_CHECK_MESSAGE(dtp != 0, "Data token allocation failed (dtp == 0).");
+    dtp->set_rid(rid);
+    dtp->set_external_rid(true);
+    try
+    {
+    iores res = jc.enqueue_extern_data_record(msg_size, dtp, transient);
+    check_iores(ctxt.str(), res, exp_ret, dtp);
+    u_int64_t dtok_rid = dtp->rid();
+    if (dtp->done()) delete dtp;
+    return dtok_rid;
+    }
+    catch (exception& e) { delete dtp; throw; }
+}
+
+u_int64_t
+enq_txn_msg(jcntl& jc, const u_int64_t rid, const string& msg, const string& xid, const bool transient,
+                const iores exp_ret = RHM_IORES_SUCCESS)
+{
+    ostringstream ctxt;
+    ctxt << "enq_txn_msg(" << rid << ")";
+    test_dtok* dtp = new test_dtok;
+    BOOST_CHECK_MESSAGE(dtp != 0, "Data token allocation failed (dtp == 0).");
+    dtp->set_rid(rid);
+    dtp->set_external_rid(true);
+    try
+    {
+        iores res = jc.enqueue_txn_data_record(msg.c_str(), msg.size(), msg.size(), dtp, xid,
+                transient);
+        check_iores(ctxt.str(), res, exp_ret, dtp);
+        u_int64_t dtok_rid = dtp->rid();
+        if (dtp->done()) delete dtp;
+        return dtok_rid;
+    }
+    catch (exception& e) { delete dtp; throw; }
+}
+
+u_int64_t
+enq_extern_txn_msg(jcntl& jc, const u_int64_t rid, const std::size_t msg_size, const string& xid, const bool transient,
+                const iores exp_ret = RHM_IORES_SUCCESS)
+{
+    ostringstream ctxt;
+    ctxt << "enq_extern_txn_msg(" << rid << ")";
+    test_dtok* dtp = new test_dtok;
+    BOOST_CHECK_MESSAGE(dtp != 0, "Data token allocation failed (dtp == 0).");
+    dtp->set_rid(rid);
+    dtp->set_external_rid(true);
+    try
+    {
+        iores res = jc.enqueue_extern_txn_data_record(msg_size, dtp, xid, transient);
+        check_iores(ctxt.str(), res, exp_ret, dtp);
+        u_int64_t dtok_rid = dtp->rid();
+        if (dtp->done()) delete dtp;
+        return dtok_rid;
+    }
+    catch (exception& e) { delete dtp; throw; }
+}
+
+u_int64_t
+deq_msg(jcntl& jc, const u_int64_t drid, const u_int64_t rid, const iores exp_ret = RHM_IORES_SUCCESS)
+{
+    ostringstream ctxt;
+    ctxt << "deq_msg(" << drid << ")";
+    test_dtok* dtp = new test_dtok;
+    BOOST_CHECK_MESSAGE(dtp != 0, "Data token allocation failed (dtp == 0).");
+    dtp->set_rid(rid);
+    dtp->set_dequeue_rid(drid);
+    dtp->set_external_rid(true);
+    dtp->set_wstate(data_tok::ENQ);
+    try
+    {
+        iores res = jc.dequeue_data_record(dtp);
+        check_iores(ctxt.str(), res, exp_ret, dtp);
+        u_int64_t dtok_rid = dtp->rid();
+        if (dtp->done()) delete dtp;
+        return dtok_rid;
+    }
+    catch (exception& e) { delete dtp; throw; }
+}
+
+u_int64_t
+deq_txn_msg(jcntl& jc, const u_int64_t drid, const u_int64_t rid, const string& xid,
+                const iores exp_ret = RHM_IORES_SUCCESS)
+{
+    ostringstream ctxt;
+    ctxt << "deq_txn_msg(" << drid << ")";
+    test_dtok* dtp = new test_dtok;
+    BOOST_CHECK_MESSAGE(dtp != 0, "Data token allocation failed (dtp == 0).");
+    dtp->set_rid(rid);
+    dtp->set_dequeue_rid(drid);
+    dtp->set_external_rid(true);
+    dtp->set_wstate(data_tok::ENQ);
+    try
+    {
+        iores res = jc.dequeue_txn_data_record(dtp, xid);
+        check_iores(ctxt.str(), res, exp_ret, dtp);
+        u_int64_t dtok_rid = dtp->rid();
+        if (dtp->done()) delete dtp;
+        return dtok_rid;
+    }
+    catch (exception& e) { delete dtp; throw; }
+}
+
+u_int64_t
+txn_abort(jcntl& jc, const u_int64_t rid, const string& xid, const iores exp_ret = RHM_IORES_SUCCESS)
+{
+    test_dtok* dtp = new test_dtok;
+    BOOST_CHECK_MESSAGE(dtp != 0, "Data token allocation failed (dtp == 0).");
+    dtp->set_rid(rid);
+    dtp->set_external_rid(true);
+    try
+    {
+        iores res = jc.txn_abort(dtp, xid);
+        check_iores("txn_abort", res, exp_ret, dtp);
+        u_int64_t dtok_rid = dtp->rid();
+        if (dtp->done()) delete dtp;
+        return dtok_rid;
+    }
+    catch (exception& e) { delete dtp; throw; }
+}
+
+u_int64_t
+txn_commit(jcntl& jc, const u_int64_t rid, const string& xid, const iores exp_ret = RHM_IORES_SUCCESS)
+{
+    test_dtok* dtp = new test_dtok;
+    BOOST_CHECK_MESSAGE(dtp != 0, "Data token allocation failed (dtp == 0).");
+    dtp->set_rid(rid);
+    dtp->set_external_rid(true);
+    try
+    {
+        iores res = jc.txn_commit(dtp, xid);
+        check_iores("txn_commit", res, exp_ret, dtp);
+        u_int64_t dtok_rid = dtp->rid();
+        if (dtp->done()) delete dtp;
+        return dtok_rid;
+    }
+    catch (exception& e) { delete dtp; throw; }
+}
+
+void
+read_msg(jcntl& jc, string& msg, string& xid, bool& transient, bool& external, const iores exp_ret = RHM_IORES_SUCCESS)
+{
+    void* mp = 0;
+    std::size_t msize = 0;
+    void* xp = 0;
+    std::size_t xsize = 0;
+    test_dtok* dtp = new test_dtok;
+    BOOST_CHECK_MESSAGE(dtp != 0, "Data token allocation failed (dtp == 0).");
+    dtp->set_wstate(data_tok::ENQ);
+
+    unsigned aio_sleep_cnt = 0;
+    try
+    {
+        iores res = jc.read_data_record(&mp, msize, &xp, xsize, transient, external, dtp);
+        while (handle_jcntl_response(res, jc, aio_sleep_cnt, "read_msg", exp_ret, dtp))
+            res = jc.read_data_record(&mp, msize, &xp, xsize, transient, external, dtp);
+    }
+    catch (exception& e) { delete dtp; throw; }
+
+    if (mp)
+        msg.assign((char*)mp, msize);
+    if (xp)
+    {
+        xid.assign((char*)xp, xsize);
+        std::free(xp);
+        xp = 0;
+    }
+    else if (mp)
+    {
+        std::free(mp);
+        mp = 0;
+    }
+    delete dtp;
+}
+
+/*
+ * Returns the number of messages of size msg_rec_size_dblks that will fit into an empty journal with or without
+ * corresponding dequeues (controlled by include_deq) without a threshold - ie until the journal is full. Assumes
+ * that dequeue records fit into one dblk.
+ */
+u_int32_t
+num_msgs_to_full(const u_int16_t num_files, const u_int32_t file_size_dblks, const u_int32_t msg_rec_size_dblks,
+                bool include_deq)
+{
+    u_int32_t rec_size_dblks = msg_rec_size_dblks;
+    if (include_deq)
+        rec_size_dblks++;
+    return u_int32_t(::floor(1.0 * num_files * file_size_dblks / rec_size_dblks));
+}
+
+/*
+ * Returns the number of messages of size msg_rec_size_dblks that will fit into an empty journal before the enqueue
+ * threshold of JRNL_ENQ_THRESHOLD (%).
+ */
+u_int32_t
+num_msgs_to_threshold(const u_int16_t num_files, const u_int32_t file_size_dblks, const u_int32_t msg_rec_size_dblks)
+{
+    return u_int32_t(::floor(1.0 * num_files * file_size_dblks * JRNL_ENQ_THRESHOLD / msg_rec_size_dblks / 100));
+}
+
+/*
+ * Returns the amount of space reserved in dblks (== num dequeues assuming dequeue size of 1 dblk) for the enqueue
+ * threshold of JRNL_ENQ_THRESHOLD (%).
+ */
+u_int32_t
+num_dequeues_rem(const u_int16_t num_files, const u_int32_t file_size_dblks)
+{
+    /*
+     *  Fraction of journal remaining after threshold is used --------------+
+     *  Total no. dblks in journal ------+                                  |
+     *                                   |                                  |
+     *                      +------------+------------+   +-----------------+---------------------+
+     */
+    return u_int32_t(::ceil(num_files * file_size_dblks * (1.0 - (1.0 * JRNL_ENQ_THRESHOLD / 100))));
+}
+
+string&
+create_msg(string& s, const int msg_num, const int len)
+{
+    ostringstream oss;
+    oss << "MSG_" << setfill('0') << setw(6) << msg_num << "_";
+    for (int i=12; i<=len; i++)
+        oss << (char)('0' + i%10);
+    s.assign(oss.str());
+    return s;
+}
+
+string&
+create_xid(string& s, const int msg_num, const int len)
+{
+    ostringstream oss;
+    oss << "XID_" << setfill('0') << setw(6) << msg_num << "_";
+    for (int i=11; i<len; i++)
+        oss << (char)('a' + i%26);
+    s.assign(oss.str());
+    return s;
+}
+
+long
+get_seed()
+{
+    timespec ts;
+    if (::clock_gettime(CLOCK_REALTIME, &ts))
+        BOOST_FAIL("Unable to read clock to generate seed.");
+    long tenths = ts.tv_nsec / 100000000;
+    return long(10 * ts.tv_sec + tenths); // time in tenths of a second
+}

Added: qpid/trunk/qpid/cpp/src/tests/legacystore/jrnl/_st_read.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/legacystore/jrnl/_st_read.cpp?rev=1530301&view=auto
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/legacystore/jrnl/_st_read.cpp (added)
+++ qpid/trunk/qpid/cpp/src/tests/legacystore/jrnl/_st_read.cpp Tue Oct  8 15:09:00 2013
@@ -0,0 +1,460 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "../unit_test.h"
+#include <cmath>
+#include <iostream>
+#include "qpid/legacystore/jrnl/jcntl.h"
+
+using namespace boost::unit_test;
+using namespace mrg::journal;
+using namespace std;
+
+QPID_AUTO_TEST_SUITE(journal_read)
+
+const string test_filename("_st_read");
+
+#include "_st_helper_fns.h"
+
+// === Test suite ===
+
+#ifndef LONG_TEST
+/*
+ * ==============================================
+ *                  NORMAL TESTS
+ * This section contains normal "make check" tests
+ * for building/packaging. These are built when
+ * LONG_TEST is _not_ defined.
+ * ==============================================
+ */
+
+QPID_AUTO_TEST_CASE(empty_read)
+{
+    string test_name = get_test_name(test_filename, "empty_read");
+    try
+    {
+        string msg;
+        string rmsg;
+        string xid;
+        bool transientFlag;
+        bool externalFlag;
+
+        test_jrnl_cb cb;
+        test_jrnl jc(test_name, test_dir, test_name, cb);
+        jc.initialize(NUM_TEST_JFILES, false, 0, TEST_JFSIZE_SBLKS);
+        read_msg(jc, rmsg, xid, transientFlag, externalFlag, RHM_IORES_EMPTY);
+    }
+    catch(const exception& e) { BOOST_FAIL(e.what()); }
+    cout << "ok" << endl;
+}
+
+QPID_AUTO_TEST_CASE(enqueue_read_dequeue_block)
+{
+    string test_name = get_test_name(test_filename, "enqueue_read_dequeue_block");
+    try
+    {
+        string msg;
+        string rmsg;
+        string xid;
+        bool transientFlag;
+        bool externalFlag;
+
+        test_jrnl_cb cb;
+        test_jrnl jc(test_name, test_dir, test_name, cb);
+        jc.initialize(NUM_TEST_JFILES, false, 0, TEST_JFSIZE_SBLKS);
+        for (int m=0; m<NUM_MSGS; m++)
+            enq_msg(jc, m, create_msg(msg, m, MSG_SIZE), false);
+        jc.flush();
+        for (int m=0; m<NUM_MSGS; m++)
+        {
+            read_msg(jc, rmsg, xid, transientFlag, externalFlag);
+            BOOST_CHECK_EQUAL(create_msg(msg, m, MSG_SIZE), rmsg);
+            BOOST_CHECK_EQUAL(xid.size(), std::size_t(0));
+            BOOST_CHECK_EQUAL(transientFlag, false);
+            BOOST_CHECK_EQUAL(externalFlag, false);
+        }
+        read_msg(jc, rmsg, xid, transientFlag, externalFlag, RHM_IORES_EMPTY);
+        for (int m=0; m<NUM_MSGS; m++)
+            deq_msg(jc, m, m+NUM_MSGS);
+        read_msg(jc, rmsg, xid, transientFlag, externalFlag, RHM_IORES_EMPTY);
+    }
+    catch(const exception& e) { BOOST_FAIL(e.what()); }
+    cout << "ok" << endl;
+}
+
+QPID_AUTO_TEST_CASE(enqueue_read_dequeue_interleaved)
+{
+    string test_name = get_test_name(test_filename, "enqueue_read_dequeue_interleaved");
+    try
+    {
+        string msg;
+        string rmsg;
+        string xid;
+        bool transientFlag;
+        bool externalFlag;
+
+        test_jrnl_cb cb;
+        test_jrnl jc(test_name, test_dir, test_name, cb);
+        jc.initialize(NUM_TEST_JFILES, false, 0, TEST_JFSIZE_SBLKS);
+        for (int m=0; m<500*NUM_MSGS; m+=2)
+        {
+            enq_msg(jc, m, create_msg(msg, m, MSG_SIZE), false);
+            jc.flush();
+            read_msg(jc, rmsg, xid, transientFlag, externalFlag);
+            BOOST_CHECK_EQUAL(create_msg(msg, m, MSG_SIZE), rmsg);
+            BOOST_CHECK_EQUAL(xid.size(), std::size_t(0));
+            BOOST_CHECK_EQUAL(transientFlag, false);
+            BOOST_CHECK_EQUAL(externalFlag, false);
+            deq_msg(jc, m, m+1);
+            jc.flush();
+            read_msg(jc, rmsg, xid, transientFlag, externalFlag, RHM_IORES_EMPTY);
+        }
+    }
+    catch(const exception& e) { BOOST_FAIL(e.what()); }
+    cout << "ok" << endl;
+}
+
+QPID_AUTO_TEST_CASE(enqueue_recovered_read_dequeue)
+{
+    string test_name = get_test_name(test_filename, "enqueue_recovered_read_dequeue");
+    try
+    {
+        {
+            string msg;
+
+            test_jrnl_cb cb;
+            test_jrnl jc(test_name, test_dir, test_name, cb);
+            jc.initialize(NUM_TEST_JFILES, false, 0, TEST_JFSIZE_SBLKS);
+            for (int m=0; m<NUM_MSGS; m++)
+                enq_msg(jc, m, create_msg(msg, m, MSG_SIZE), false);
+        }
+        {
+            string msg;
+            u_int64_t hrid;
+            string rmsg;
+            string xid;
+            bool transientFlag;
+            bool externalFlag;
+
+            test_jrnl_cb cb;
+            test_jrnl jc(test_name, test_dir, test_name, cb);
+            jc.recover(NUM_TEST_JFILES, false, 0, TEST_JFSIZE_SBLKS, 0, hrid);
+            BOOST_CHECK_EQUAL(hrid, u_int64_t(NUM_MSGS - 1));
+            jc.recover_complete();
+            for (int m=0; m<NUM_MSGS; m++)
+            {
+                read_msg(jc, rmsg, xid, transientFlag, externalFlag);
+                BOOST_CHECK_EQUAL(create_msg(msg, m, MSG_SIZE), rmsg);
+                BOOST_CHECK_EQUAL(xid.size(), std::size_t(0));
+                BOOST_CHECK_EQUAL(transientFlag, false);
+                BOOST_CHECK_EQUAL(externalFlag, false);
+            }
+            read_msg(jc, rmsg, xid, transientFlag, externalFlag, RHM_IORES_EMPTY);
+            for (int m=0; m<NUM_MSGS; m++)
+                deq_msg(jc, m, m+NUM_MSGS);
+            read_msg(jc, rmsg, xid, transientFlag, externalFlag, RHM_IORES_EMPTY);
+        }
+    }
+    catch(const exception& e) { BOOST_FAIL(e.what()); }
+    cout << "ok" << endl;
+}
+
+QPID_AUTO_TEST_CASE(multi_page_enqueue_recovered_read_dequeue_block)
+{
+    string test_name = get_test_name(test_filename, "multi_page_enqueue_recovered_read_dequeue_block");
+    try
+    {
+        {
+            string msg;
+
+            test_jrnl_cb cb;
+            test_jrnl jc(test_name, test_dir, test_name, cb);
+            jc.initialize(2*NUM_TEST_JFILES, false, 0, 10*TEST_JFSIZE_SBLKS);
+            for (int m=0; m<NUM_MSGS*125; m++)
+                enq_msg(jc, m, create_msg(msg, m, 16*MSG_SIZE), false);
+        }
+        {
+            string msg;
+            u_int64_t hrid;
+            string rmsg;
+            string xid;
+            bool transientFlag;
+            bool externalFlag;
+
+            test_jrnl_cb cb;
+            test_jrnl jc(test_name, test_dir, test_name, cb);
+            jc.recover(2*NUM_TEST_JFILES, false, 0, 10*TEST_JFSIZE_SBLKS, 0, hrid);
+            BOOST_CHECK_EQUAL(hrid, u_int64_t(NUM_MSGS*125 - 1));
+            jc.recover_complete();
+            for (int m=0; m<NUM_MSGS*125; m++)
+            {
+                read_msg(jc, rmsg, xid, transientFlag, externalFlag);
+                BOOST_CHECK_EQUAL(create_msg(msg, m, 16*MSG_SIZE), rmsg);
+                BOOST_CHECK_EQUAL(xid.size(), std::size_t(0));
+                BOOST_CHECK_EQUAL(transientFlag, false);
+                BOOST_CHECK_EQUAL(externalFlag, false);
+            }
+            read_msg(jc, rmsg, xid, transientFlag, externalFlag, RHM_IORES_EMPTY);
+            for (int m=0; m<NUM_MSGS*125; m++)
+                deq_msg(jc, m, m+NUM_MSGS*125);
+            read_msg(jc, rmsg, xid, transientFlag, externalFlag, RHM_IORES_EMPTY);
+        }
+    }
+    catch(const exception& e) { BOOST_FAIL(e.what()); }
+    cout << "ok" << endl;
+}
+
+QPID_AUTO_TEST_CASE(enqueue_recover_read_recovered_read_dequeue_block)
+{
+    string test_name = get_test_name(test_filename, "enqueue_recover_read_recovered_read_dequeue_block");
+    try
+    {
+        {
+            string msg;
+
+            test_jrnl_cb cb;
+            test_jrnl jc(test_name, test_dir, test_name, cb);
+            jc.initialize(NUM_TEST_JFILES, false, 0, TEST_JFSIZE_SBLKS);
+            for (int m=0; m<NUM_MSGS; m++)
+                enq_msg(jc, m, create_msg(msg, m, MSG_SIZE), false);
+        }
+        {
+            string msg;
+            u_int64_t hrid;
+            string rmsg;
+            string xid;
+            bool transientFlag;
+            bool externalFlag;
+
+            test_jrnl_cb cb;
+            test_jrnl jc(test_name, test_dir, test_name, cb);
+            jc.recover(NUM_TEST_JFILES, false, 0, TEST_JFSIZE_SBLKS, 0, hrid);
+            BOOST_CHECK_EQUAL(hrid, u_int64_t(NUM_MSGS - 1));
+            for (int m=0; m<NUM_MSGS; m++)
+            {
+                read_msg(jc, rmsg, xid, transientFlag, externalFlag);
+                BOOST_CHECK_EQUAL(create_msg(msg, m, MSG_SIZE), rmsg);
+                BOOST_CHECK_EQUAL(xid.size(), std::size_t(0));
+                BOOST_CHECK_EQUAL(transientFlag, false);
+                BOOST_CHECK_EQUAL(externalFlag, false);
+            }
+            read_msg(jc, rmsg, xid, transientFlag, externalFlag, RHM_IORES_EMPTY);
+        }
+        {
+            string msg;
+            u_int64_t hrid;
+            string rmsg;
+            string xid;
+            bool transientFlag;
+            bool externalFlag;
+
+            test_jrnl_cb cb;
+            test_jrnl jc(test_name, test_dir, test_name, cb);
+            jc.recover(NUM_TEST_JFILES, false, 0, TEST_JFSIZE_SBLKS, 0, hrid);
+            BOOST_CHECK_EQUAL(hrid, u_int64_t(NUM_MSGS - 1));
+            for (int m=0; m<NUM_MSGS; m++)
+            {
+                read_msg(jc, rmsg, xid, transientFlag, externalFlag);
+                BOOST_CHECK_EQUAL(create_msg(msg, m, MSG_SIZE), rmsg);
+                BOOST_CHECK_EQUAL(xid.size(), std::size_t(0));
+                BOOST_CHECK_EQUAL(transientFlag, false);
+                BOOST_CHECK_EQUAL(externalFlag, false);
+            }
+            read_msg(jc, rmsg, xid, transientFlag, externalFlag, RHM_IORES_EMPTY);
+            jc.recover_complete();
+            for (int m=0; m<NUM_MSGS; m++)
+            {
+                read_msg(jc, rmsg, xid, transientFlag, externalFlag);
+                BOOST_CHECK_EQUAL(create_msg(msg, m, MSG_SIZE), rmsg);
+                BOOST_CHECK_EQUAL(xid.size(), std::size_t(0));
+                BOOST_CHECK_EQUAL(transientFlag, false);
+                BOOST_CHECK_EQUAL(externalFlag, false);
+            }
+            read_msg(jc, rmsg, xid, transientFlag, externalFlag, RHM_IORES_EMPTY);
+            for (int m=0; m<NUM_MSGS; m++)
+                deq_msg(jc, m, m+NUM_MSGS);
+            read_msg(jc, rmsg, xid, transientFlag, externalFlag, RHM_IORES_EMPTY);
+        }
+    }
+    catch(const exception& e) { BOOST_FAIL(e.what()); }
+    cout << "ok" << endl;
+}
+
+QPID_AUTO_TEST_CASE(delayed_read)
+{
+    string test_name = get_test_name(test_filename, "delayed_read");
+    try
+    {
+        string msg;
+        string rmsg;
+        string xid;
+        bool transientFlag;
+        bool externalFlag;
+
+        test_jrnl_cb cb;
+        test_jrnl jc(test_name, test_dir, test_name, cb);
+        jc.initialize(NUM_TEST_JFILES, false, 0, TEST_JFSIZE_SBLKS);
+        unsigned m;
+        for (m=0; m<2*NUM_MSGS; m+=2)
+        {
+            enq_msg(jc, m, create_msg(msg, m, MSG_SIZE), false);
+            deq_msg(jc, m, m+1);
+        }
+        enq_msg(jc, m, create_msg(msg, m, MSG_SIZE), false);
+        jc.flush();
+        read_msg(jc, rmsg, xid, transientFlag, externalFlag);
+        BOOST_CHECK_EQUAL(msg, rmsg);
+    }
+    catch(const exception& e) { BOOST_FAIL(e.what()); }
+    cout << "ok" << endl;
+}
+
+QPID_AUTO_TEST_CASE(cache_cycled_delayed_read)
+{
+    string test_name = get_test_name(test_filename, "cache_cycled_delayed_read");
+    try
+    {
+        string msg;
+        string rmsg;
+        string xid;
+        bool transientFlag;
+        bool externalFlag;
+
+        test_jrnl_cb cb;
+        test_jrnl jc(test_name, test_dir, test_name, cb);
+        jc.initialize(NUM_TEST_JFILES, false, 0, TEST_JFSIZE_SBLKS);
+        unsigned m;
+        unsigned read_buffer_size_dblks = JRNL_RMGR_PAGES * JRNL_RMGR_PAGE_SIZE * JRNL_SBLK_SIZE;
+        unsigned n = num_msgs_to_full(1, read_buffer_size_dblks, 16*MSG_REC_SIZE_DBLKS, true);
+        for (m=0; m<2*2*n + 20; m+=2) // fill read buffer twice + 10 msgs
+        {
+            enq_msg(jc, m, create_msg(msg, m, 16*MSG_SIZE), false);
+            deq_msg(jc, m, m+1);
+        }
+        enq_msg(jc, m, create_msg(msg, m, MSG_SIZE), false);
+        jc.flush();
+        read_msg(jc, rmsg, xid, transientFlag, externalFlag);
+        BOOST_CHECK_EQUAL(msg, rmsg);
+    }
+    catch(const exception& e) { BOOST_FAIL(e.what()); }
+    cout << "ok" << endl;
+}
+
+#else
+/*
+ * ==============================================
+ *                  LONG TESTS
+ * This section contains long tests and soak tests,
+ * and are run using target check-long (ie "make
+ * check-long"). These are built when LONG_TEST is
+ * defined.
+ * ==============================================
+ */
+
+QPID_AUTO_TEST_CASE(multi_page_enqueue_read_dequeue_block)
+{
+    string test_name = get_test_name(test_filename, "multi_page_enqueue_read_dequeue_block");
+    try
+    {
+        string msg;
+        string rmsg;
+        string xid;
+        bool transientFlag;
+        bool externalFlag;
+
+        test_jrnl_cb cb;
+        test_jrnl jc(test_name, test_dir, test_name, cb);
+        jc.initialize(2*NUM_TEST_JFILES, false, 0, 10*TEST_JFSIZE_SBLKS);
+        for (int i=0; i<10; i++)
+        {
+            for (int m=0; m<NUM_MSGS*125; m++)
+                enq_msg(jc, m, create_msg(msg, m, 16*MSG_SIZE), false);
+            jc.flush();
+            for (int m=0; m<NUM_MSGS*125; m++)
+            {
+                read_msg(jc, rmsg, xid, transientFlag, externalFlag);
+                BOOST_CHECK_EQUAL(create_msg(msg, m, 16*MSG_SIZE), rmsg);
+                BOOST_CHECK_EQUAL(xid.size(), std::size_t(0));
+                BOOST_CHECK_EQUAL(transientFlag, false);
+                BOOST_CHECK_EQUAL(externalFlag, false);
+            }
+            read_msg(jc, rmsg, xid, transientFlag, externalFlag, RHM_IORES_EMPTY);
+            for (int m=0; m<NUM_MSGS*125; m++)
+                deq_msg(jc, m, m+NUM_MSGS*125);
+            read_msg(jc, rmsg, xid, transientFlag, externalFlag, RHM_IORES_EMPTY);
+        }
+    }
+    catch(const exception& e) { BOOST_FAIL(e.what()); }
+    cout << "ok" << endl;
+}
+
+QPID_AUTO_TEST_CASE(increasing_interval_delayed_read)
+{
+    string test_name = get_test_name(test_filename, "increasing_interval_delayed_read");
+    try
+    {
+        string msg;
+        string rmsg;
+        string xid;
+        bool transientFlag;
+        bool externalFlag;
+
+        test_jrnl_cb cb;
+        test_jrnl jc(test_name, test_dir, test_name, cb);
+        jc.initialize(NUM_TEST_JFILES, false, 0, TEST_JFSIZE_SBLKS);
+        unsigned read_buffer_size_dblks = JRNL_RMGR_PAGES * JRNL_RMGR_PAGE_SIZE * JRNL_SBLK_SIZE;
+        unsigned n = num_msgs_to_full(1, read_buffer_size_dblks, MSG_REC_SIZE_DBLKS, true);
+        unsigned m = 0;
+
+        // Validate read pipeline
+        enq_msg(jc, m, create_msg(msg, m, MSG_SIZE), false);
+        jc.flush();
+        read_msg(jc, rmsg, xid, transientFlag, externalFlag);
+        deq_msg(jc, m, m+1);
+        m += 2;
+
+        // repeat the following multiple times...
+        for (int i=0; i<10; i++)
+        {
+            // Invalidate read pipeline with large write
+            unsigned t = m + (i*n) + 25;
+            for (; m<t; m+=2)
+            {
+                enq_msg(jc, m, create_msg(msg, m, MSG_SIZE), false);
+                deq_msg(jc, m, m+1);
+            }
+
+            // Revalidate read pipeline
+            enq_msg(jc, m, create_msg(msg, m, MSG_SIZE), false);
+            jc.flush();
+            read_msg(jc, rmsg, xid, transientFlag, externalFlag);
+            BOOST_CHECK_EQUAL(msg, rmsg);
+            deq_msg(jc, m, m+1);
+            m += 2;
+        }
+    }
+    catch(const exception& e) { BOOST_FAIL(e.what()); }
+    cout << "ok" << endl;
+}
+
+#endif
+
+QPID_AUTO_TEST_SUITE_END()

Added: qpid/trunk/qpid/cpp/src/tests/legacystore/jrnl/_st_read_txn.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/legacystore/jrnl/_st_read_txn.cpp?rev=1530301&view=auto
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/legacystore/jrnl/_st_read_txn.cpp (added)
+++ qpid/trunk/qpid/cpp/src/tests/legacystore/jrnl/_st_read_txn.cpp Tue Oct  8 15:09:00 2013
@@ -0,0 +1,353 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "../unit_test.h"
+#include <cmath>
+#include <iostream>
+#include "qpid/legacystore/jrnl/jcntl.h"
+
+using namespace boost::unit_test;
+using namespace mrg::journal;
+using namespace std;
+
+QPID_AUTO_TEST_SUITE(journal_read_txn)
+
+const string test_filename("_st_read_txn");
+
+#include "_st_helper_fns.h"
+
+// === Test suite ===
+
+QPID_AUTO_TEST_CASE(tx_enqueue_commit_block)
+{
+    string test_name = get_test_name(test_filename, "tx_enqueue_commit_block");
+    try
+    {
+        string msg;
+        string xid;
+        string rmsg;
+        string rxid;
+        bool transientFlag;
+        bool externalFlag;
+        test_jrnl_cb cb;
+        test_jrnl jc(test_name, test_dir, test_name, cb);
+        jc.initialize(NUM_TEST_JFILES, false, 0, TEST_JFSIZE_SBLKS);
+        create_xid(xid, 0, XID_SIZE);
+        for (int m=0; m<NUM_MSGS; m++)
+            enq_txn_msg(jc, m, create_msg(msg, m, MSG_SIZE), xid, false);
+        jc.flush();
+        read_msg(jc, rmsg, rxid, transientFlag, externalFlag, RHM_IORES_TXPENDING);
+        txn_commit(jc, NUM_MSGS, xid);
+        jc.flush();
+        for (int m=0; m<NUM_MSGS; m++)
+        {
+            read_msg(jc, rmsg, rxid, transientFlag, externalFlag);
+            BOOST_CHECK_EQUAL(create_msg(msg, m, MSG_SIZE), rmsg);
+            BOOST_CHECK_EQUAL(rxid, xid);
+            BOOST_CHECK_EQUAL(transientFlag, false);
+            BOOST_CHECK_EQUAL(externalFlag, false);
+        }
+        read_msg(jc, rmsg, rxid, transientFlag, externalFlag, RHM_IORES_EMPTY);
+    }
+    catch(const exception& e) { BOOST_FAIL(e.what()); }
+    cout << "ok" << endl;
+}
+
+QPID_AUTO_TEST_CASE(tx_enqueue_commit_interleaved)
+{
+    string test_name = get_test_name(test_filename, "tx_enqueue_commit_interleaved");
+    try
+    {
+        string msg;
+        string xid;
+        string rmsg;
+        string rxid;
+        bool transientFlag;
+        bool externalFlag;
+
+        test_jrnl_cb cb;
+        test_jrnl jc(test_name, test_dir, test_name, cb);
+        jc.initialize(NUM_TEST_JFILES, false, 0, TEST_JFSIZE_SBLKS);
+        for (int m=0; m<NUM_MSGS; m++)
+        {
+            create_xid(xid, 2*m, XID_SIZE);
+            enq_txn_msg(jc, 2*m, create_msg(msg, 2*m, MSG_SIZE), xid, false);
+            jc.flush();
+            read_msg(jc, rmsg, rxid, transientFlag, externalFlag, RHM_IORES_TXPENDING);
+            txn_commit(jc, 2*m+1, xid);
+            jc.flush();
+            read_msg(jc, rmsg, rxid, transientFlag, externalFlag);
+            BOOST_CHECK_EQUAL(create_msg(msg, 2*m, MSG_SIZE), rmsg);
+            BOOST_CHECK_EQUAL(rxid, xid);
+            BOOST_CHECK_EQUAL(transientFlag, false);
+            BOOST_CHECK_EQUAL(externalFlag, false);
+        }
+    }
+    catch(const exception& e) { BOOST_FAIL(e.what()); }
+    cout << "ok" << endl;
+}
+
+QPID_AUTO_TEST_CASE(tx_enqueue_abort_block)
+{
+    string test_name = get_test_name(test_filename, "tx_enqueue_abort_block");
+    try
+    {
+        string msg;
+        string xid;
+        string rmsg;
+        string rxid;
+        bool transientFlag;
+        bool externalFlag;
+        test_jrnl_cb cb;
+        test_jrnl jc(test_name, test_dir, test_name, cb);
+        jc.initialize(NUM_TEST_JFILES, false, 0, TEST_JFSIZE_SBLKS);
+        create_xid(xid, 1, XID_SIZE);
+        for (int m=0; m<NUM_MSGS; m++)
+            enq_txn_msg(jc, m, create_msg(msg, m, MSG_SIZE), xid, false);
+        jc.flush();
+        read_msg(jc, rmsg, rxid, transientFlag, externalFlag, RHM_IORES_TXPENDING);
+        txn_abort(jc, NUM_MSGS, xid);
+        jc.flush();
+        read_msg(jc, rmsg, rxid, transientFlag, externalFlag, RHM_IORES_EMPTY);
+    }
+    catch(const exception& e) { BOOST_FAIL(e.what()); }
+    cout << "ok" << endl;
+}
+
+QPID_AUTO_TEST_CASE(tx_enqueue_abort_interleaved)
+{
+    string test_name = get_test_name(test_filename, "tx_enqueue_abort_interleaved");
+    try
+    {
+        string msg;
+        string xid;
+        string rmsg;
+        string rxid;
+        bool transientFlag;
+        bool externalFlag;
+
+        test_jrnl_cb cb;
+        test_jrnl jc(test_name, test_dir, test_name, cb);
+        jc.initialize(NUM_TEST_JFILES, false, 0, TEST_JFSIZE_SBLKS);
+        for (int m=0; m<NUM_MSGS; m++)
+        {
+            create_xid(xid, 2*m, XID_SIZE);
+            enq_txn_msg(jc, 2*m, create_msg(msg, 2*m, MSG_SIZE), xid, false);
+            jc.flush();
+            read_msg(jc, rmsg, rxid, transientFlag, externalFlag, RHM_IORES_TXPENDING);
+            txn_abort(jc, 2*m+1, xid);
+            jc.flush();
+            read_msg(jc, rmsg, rxid, transientFlag, externalFlag, RHM_IORES_EMPTY);
+        }
+    }
+    catch(const exception& e) { BOOST_FAIL(e.what()); }
+    cout << "ok" << endl;
+}
+
+QPID_AUTO_TEST_CASE(tx_enqueue_commit_dequeue_block)
+{
+    string test_name = get_test_name(test_filename, "tx_enqueue_commit_dequeue_block");
+    try
+    {
+        string msg;
+        string xid;
+        string rmsg;
+        string rxid;
+        bool transientFlag;
+        bool externalFlag;
+
+        test_jrnl_cb cb;
+        test_jrnl jc(test_name, test_dir, test_name, cb);
+        jc.initialize(NUM_TEST_JFILES, false, 0, TEST_JFSIZE_SBLKS);
+        create_xid(xid, 2, XID_SIZE);
+        for (int m=0; m<NUM_MSGS; m++)
+            enq_txn_msg(jc, m, create_msg(msg, m, MSG_SIZE), xid, false);
+        txn_commit(jc, NUM_MSGS, xid);
+        for (int m=0; m<NUM_MSGS; m++)
+            deq_msg(jc, m, m+NUM_MSGS+1);
+        jc.flush();
+        read_msg(jc, rmsg, rxid, transientFlag, externalFlag, RHM_IORES_EMPTY);
+    }
+    catch(const exception& e) { BOOST_FAIL(e.what()); }
+    cout << "ok" << endl;
+}
+
+QPID_AUTO_TEST_CASE(tx_enqueue_commit_dequeue_interleaved)
+{
+    string test_name = get_test_name(test_filename, "tx_enqueue_commit_dequeue_interleaved");
+    try
+    {
+        string msg;
+        string xid;
+        string rmsg;
+        string rxid;
+        bool transientFlag;
+        bool externalFlag;
+
+        test_jrnl_cb cb;
+        test_jrnl jc(test_name, test_dir, test_name, cb);
+        jc.initialize(NUM_TEST_JFILES, false, 0, TEST_JFSIZE_SBLKS);
+        for (int m=0; m<NUM_MSGS; m++)
+        {
+            create_xid(xid, 3*m, XID_SIZE);
+            enq_txn_msg(jc, 3*m, create_msg(msg, m, MSG_SIZE), xid, false);
+            txn_commit(jc, 3*m+1, xid);
+            deq_msg(jc, 3*m, 3*m+2);
+            jc.flush();
+            read_msg(jc, rmsg, rxid, transientFlag, externalFlag, RHM_IORES_EMPTY);
+        }
+    }
+    catch(const exception& e) { BOOST_FAIL(e.what()); }
+    cout << "ok" << endl;
+}
+
+QPID_AUTO_TEST_CASE(enqueue_tx_dequeue_commit_block)
+{
+    string test_name = get_test_name(test_filename, "enqueue_tx_dequeue_commit_block");
+    try
+    {
+        string msg;
+        string xid;
+        string rmsg;
+        string rxid;
+        bool transientFlag;
+        bool externalFlag;
+
+        create_xid(xid, 3, XID_SIZE);
+        test_jrnl_cb cb;
+        test_jrnl jc(test_name, test_dir, test_name, cb);
+        jc.initialize(NUM_TEST_JFILES, false, 0, TEST_JFSIZE_SBLKS);
+        for (int m=0; m<NUM_MSGS; m++)
+            enq_msg(jc, m, create_msg(msg, m, MSG_SIZE), false);
+        for (int m=0; m<NUM_MSGS; m++)
+            deq_txn_msg(jc, m, m+NUM_MSGS, xid);
+        jc.flush();
+        read_msg(jc, rmsg, rxid, transientFlag, externalFlag, RHM_IORES_TXPENDING);
+        txn_commit(jc, 2*NUM_MSGS, xid);
+        jc.flush();
+        read_msg(jc, rmsg, rxid, transientFlag, externalFlag, RHM_IORES_EMPTY);
+    }
+    catch(const exception& e) { BOOST_FAIL(e.what()); }
+    cout << "ok" << endl;
+}
+
+QPID_AUTO_TEST_CASE(enqueue_tx_dequeue_commit_interleaved)
+{
+    string test_name = get_test_name(test_filename, "enqueue_tx_dequeue_commit_interleaved");
+    try
+    {
+        string msg;
+        string xid;
+        string rmsg;
+        string rxid;
+        bool transientFlag;
+        bool externalFlag;
+
+        test_jrnl_cb cb;
+        test_jrnl jc(test_name, test_dir, test_name, cb);
+        jc.initialize(NUM_TEST_JFILES, false, 0, TEST_JFSIZE_SBLKS);
+        for (int m=0; m<NUM_MSGS; m++)
+        {
+            enq_msg(jc, 3*m, create_msg(msg, 3*m, MSG_SIZE), false);
+            create_xid(xid, 3*m, XID_SIZE);
+            deq_txn_msg(jc, 3*m, 3*m+1, xid);
+            jc.flush();
+            read_msg(jc, rmsg, rxid, transientFlag, externalFlag, RHM_IORES_TXPENDING);
+            txn_commit(jc, 3*m+2, xid);
+            jc.flush();
+            read_msg(jc, rmsg, rxid, transientFlag, externalFlag, RHM_IORES_EMPTY);
+        }
+    }
+    catch(const exception& e) { BOOST_FAIL(e.what()); }
+    cout << "ok" << endl;
+}
+
+QPID_AUTO_TEST_CASE(enqueue_tx_dequeue_abort_block)
+{
+    string test_name = get_test_name(test_filename, "enqueue_tx_dequeue_abort_block");
+    try
+    {
+        string msg;
+        string xid;
+        string rmsg;
+        string rxid;
+        bool transientFlag;
+        bool externalFlag;
+
+        create_xid(xid, 4, XID_SIZE);
+        test_jrnl_cb cb;
+        test_jrnl jc(test_name, test_dir, test_name, cb);
+        jc.initialize(NUM_TEST_JFILES, false, 0, TEST_JFSIZE_SBLKS);
+        for (int m=0; m<NUM_MSGS; m++)
+            enq_msg(jc, m, create_msg(msg, m, MSG_SIZE), false);
+        for (int m=0; m<NUM_MSGS; m++)
+            deq_txn_msg(jc, m, m+NUM_MSGS, xid);
+        jc.flush();
+        read_msg(jc, rmsg, rxid, transientFlag, externalFlag, RHM_IORES_TXPENDING);
+        txn_abort(jc, 2*NUM_MSGS, xid);
+        jc.flush();
+        for (int m=0; m<NUM_MSGS; m++)
+        {
+            read_msg(jc, rmsg, rxid, transientFlag, externalFlag);
+            BOOST_CHECK_EQUAL(create_msg(msg, m, MSG_SIZE), rmsg);
+            BOOST_CHECK_EQUAL(rxid.length(), std::size_t(0));
+            BOOST_CHECK_EQUAL(transientFlag, false);
+            BOOST_CHECK_EQUAL(externalFlag, false);
+        }
+        read_msg(jc, rmsg, rxid, transientFlag, externalFlag, RHM_IORES_EMPTY);
+    }
+    catch(const exception& e) { BOOST_FAIL(e.what()); }
+    cout << "ok" << endl;
+}
+
+QPID_AUTO_TEST_CASE(enqueue_tx_dequeue_abort_interleaved)
+{
+    string test_name = get_test_name(test_filename, "enqueue_tx_dequeue_abort_interleaved");
+    try
+    {
+        string msg;
+        string xid;
+        string rmsg;
+        string rxid;
+        bool transientFlag;
+        bool externalFlag;
+
+        test_jrnl_cb cb;
+        test_jrnl jc(test_name, test_dir, test_name, cb);
+        jc.initialize(NUM_TEST_JFILES, false, 0, TEST_JFSIZE_SBLKS);
+        for (int m=0; m<NUM_MSGS; m++)
+        {
+            enq_msg(jc, 3*m, create_msg(msg, 3*m, MSG_SIZE), false);
+            create_xid(xid, 3*m, XID_SIZE);
+            deq_txn_msg(jc, 3*m, 3*m+1, xid);
+            jc.flush();
+            read_msg(jc, rmsg, rxid, transientFlag, externalFlag, RHM_IORES_TXPENDING);
+            txn_abort(jc, 3*m+2, xid);
+            jc.flush();
+            read_msg(jc, rmsg, rxid, transientFlag, externalFlag);
+            read_msg(jc, rmsg, rxid, transientFlag, externalFlag, RHM_IORES_EMPTY);
+        }
+    }
+    catch(const exception& e) { BOOST_FAIL(e.what()); }
+    cout << "ok" << endl;
+}
+
+QPID_AUTO_TEST_SUITE_END()

Added: qpid/trunk/qpid/cpp/src/tests/legacystore/jrnl/_ut_enq_map.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/legacystore/jrnl/_ut_enq_map.cpp?rev=1530301&view=auto
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/legacystore/jrnl/_ut_enq_map.cpp (added)
+++ qpid/trunk/qpid/cpp/src/tests/legacystore/jrnl/_ut_enq_map.cpp Tue Oct  8 15:09:00 2013
@@ -0,0 +1,320 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "../unit_test.h"
+
+#include <iostream>
+#include "qpid/legacystore/jrnl/enq_map.h"
+#include "qpid/legacystore/jrnl/jerrno.h"
+
+using namespace boost::unit_test;
+using namespace mrg::journal;
+using namespace std;
+
+QPID_AUTO_TEST_SUITE(enq_map_suite)
+
+const string test_filename("_ut_enq_map");
+
+QPID_AUTO_TEST_CASE(constructor)
+{
+    cout << test_filename << ".constructor: " << flush;
+    enq_map e1;
+    BOOST_CHECK(e1.empty());
+    BOOST_CHECK_EQUAL(e1.size(), u_int32_t(0));
+    cout << "ok" << endl;
+}
+
+QPID_AUTO_TEST_CASE(insert_get)
+{
+    cout << test_filename << ".insert_get: " << flush;
+    u_int16_t pfid;
+    u_int64_t rid;
+    u_int16_t pfid_start = 0x2000U;
+    u_int64_t rid_begin = 0xffffffff00000000ULL;
+    u_int64_t rid_end   = 0xffffffff00000200ULL;
+
+    // insert with no dups
+    u_int64_t rid_incr_1 = 4ULL;
+    enq_map e2;
+    e2.set_num_jfiles(pfid_start + (rid_end - rid_begin)/rid_incr_1);
+    for (rid = rid_begin, pfid = pfid_start; rid < rid_end; rid += rid_incr_1, pfid++)
+        BOOST_CHECK_EQUAL(e2.insert_pfid(rid, pfid), enq_map::EMAP_OK);
+    BOOST_CHECK(!e2.empty());
+    BOOST_CHECK_EQUAL(e2.size(), u_int32_t(128));
+
+    // get
+    u_int64_t rid_incr_2 = 6ULL;
+    for (u_int64_t rid = rid_begin; rid < rid_end; rid += rid_incr_2)
+    {
+        BOOST_CHECK_EQUAL(e2.is_enqueued(rid), (rid%rid_incr_1 ? false : true));
+        u_int16_t exp_pfid = pfid_start + (u_int16_t)((rid - rid_begin)/rid_incr_1);
+        int16_t ret_fid = e2.get_pfid(rid);
+        if (ret_fid < enq_map::EMAP_OK) // fail
+        {
+            BOOST_CHECK_EQUAL(ret_fid, enq_map::EMAP_RID_NOT_FOUND);
+            BOOST_CHECK(rid%rid_incr_1);
+        }
+        else
+        {
+            BOOST_CHECK_EQUAL(ret_fid, exp_pfid);
+            BOOST_CHECK(rid%rid_incr_1 == 0);
+        }
+        if ((rid + rid_incr_2)%(8 * rid_incr_2) == 0)
+            pfid++;
+    }
+
+    // insert with dups
+    for (rid = rid_begin, pfid = pfid_start; rid < rid_end; rid += rid_incr_2, pfid++)
+    {
+        int16_t res = e2.insert_pfid(rid, pfid);
+        if (res < enq_map::EMAP_OK) // fail
+        {
+            BOOST_CHECK_EQUAL(res, enq_map::EMAP_DUP_RID);
+            BOOST_CHECK(rid%rid_incr_1 == 0);
+        }
+        else
+            BOOST_CHECK(rid%rid_incr_1);
+    }
+    BOOST_CHECK_EQUAL(e2.size(), u_int32_t(171));
+    e2.clear();
+    BOOST_CHECK(e2.empty());
+    BOOST_CHECK_EQUAL(e2.size(), u_int32_t(0));
+    cout << "ok" << endl;
+}
+
+QPID_AUTO_TEST_CASE(get_remove)
+{
+    cout << test_filename << ".get_remove: " << flush;
+    u_int16_t pfid;
+    u_int64_t rid;
+    u_int16_t pfid_start = 0x3000U;
+    u_int64_t rid_begin = 0xeeeeeeee00000000ULL;
+    u_int64_t rid_end   = 0xeeeeeeee00000200ULL;
+
+    u_int64_t rid_incr_1 = 4ULL;
+    u_int64_t num_incr_1 = (rid_end - rid_begin)/rid_incr_1;
+    enq_map e3;
+    e3.set_num_jfiles(pfid_start + (rid_end - rid_begin)/rid_incr_1);
+    for (rid = rid_begin, pfid = pfid_start; rid < rid_end; rid += rid_incr_1, pfid++)
+        BOOST_CHECK_EQUAL(e3.insert_pfid(rid, pfid), enq_map::EMAP_OK);
+    BOOST_CHECK_EQUAL(e3.size(), num_incr_1);
+
+    u_int64_t rid_incr_2 = 6ULL;
+    for (rid = rid_begin, pfid = pfid_start; rid < rid_end; rid += rid_incr_2, pfid++)
+    {
+        u_int16_t exp_pfid = pfid_start + (u_int16_t)((rid - rid_begin)/rid_incr_1);
+        int16_t ret_fid = e3.get_remove_pfid(rid);
+        if (ret_fid < enq_map::EMAP_OK) // fail
+        {
+            BOOST_CHECK_EQUAL(ret_fid, enq_map::EMAP_RID_NOT_FOUND);
+            BOOST_CHECK(rid%rid_incr_1);
+        }
+        else
+        {
+            BOOST_CHECK_EQUAL(ret_fid, exp_pfid);
+            BOOST_CHECK(rid%rid_incr_1 == 0);
+        }
+    }
+    BOOST_CHECK_EQUAL(e3.size(), u_int32_t(85));
+    cout << "ok" << endl;
+}
+
+QPID_AUTO_TEST_CASE(lock)
+{
+    cout << test_filename << ".lock: " << flush;
+    u_int16_t pfid;
+    u_int64_t rid;
+    u_int16_t pfid_start = 0x4000U;
+    u_int64_t rid_begin = 0xdddddddd00000000ULL;
+    u_int64_t rid_end   = 0xdddddddd00000200ULL;
+
+    // insert, every second entry is locked
+    u_int64_t rid_incr_1 = 4ULL;
+    u_int64_t num_incr_1 = (rid_end - rid_begin)/rid_incr_1;
+    bool locked = false;
+    enq_map e4;
+    e4.set_num_jfiles(pfid_start + (rid_end - rid_begin)/rid_incr_1);
+    for (rid = rid_begin, pfid = pfid_start; rid < rid_end; rid += rid_incr_1, pfid++)
+    {
+        BOOST_CHECK_EQUAL(e4.insert_pfid(rid, pfid, locked), enq_map::EMAP_OK);
+        locked = !locked;
+    }
+    BOOST_CHECK_EQUAL(e4.size(), num_incr_1);
+
+    // unlock and lock non-existent rids
+    int16_t res = e4.lock(1ULL);
+    if (res < enq_map::EMAP_OK)
+        BOOST_CHECK_EQUAL(res, enq_map::EMAP_RID_NOT_FOUND);
+    else
+        BOOST_ERROR("Failed to detect locking non-existent rid.");
+    res = e4.unlock(2ULL);
+    if (res < enq_map::EMAP_OK)
+        BOOST_CHECK_EQUAL(res, enq_map::EMAP_RID_NOT_FOUND);
+    else
+        BOOST_ERROR("Failed to detect unlocking non-existent rid.");
+
+    // get / unlock
+    for (u_int64_t rid = rid_begin; rid < rid_end; rid += rid_incr_1)
+    {
+        int16_t fid = e4.get_pfid(rid);
+        if (fid < enq_map::EMAP_OK) // fail
+        {
+            BOOST_CHECK_EQUAL(fid, enq_map::EMAP_LOCKED);
+            BOOST_CHECK(rid%(2*rid_incr_1));
+            // unlock, read, then relock
+            BOOST_CHECK_EQUAL(e4.unlock(rid), enq_map::EMAP_OK);
+            BOOST_CHECK(e4.get_pfid(rid) >= enq_map::EMAP_OK);
+            BOOST_CHECK_EQUAL(e4.lock(rid), enq_map::EMAP_OK);
+            fid = e4.get_pfid(rid);
+            if (fid < enq_map::EMAP_OK) // fail
+                BOOST_CHECK_EQUAL(fid, enq_map::EMAP_LOCKED);
+            else
+                BOOST_ERROR("Failed to prevent getting locked record");
+        }
+    }
+
+    // remove all; if locked, use with txn_flag true; should ignore all locked records
+    for (u_int64_t rid = rid_begin; rid < rid_end; rid += rid_incr_1)
+        BOOST_CHECK(e4.get_remove_pfid(rid, true) >= enq_map::EMAP_OK);
+    BOOST_CHECK(e4.empty());
+    cout << "ok" << endl;
+}
+
+QPID_AUTO_TEST_CASE(lists)
+{
+    cout << test_filename << ".lists: " << flush;
+    u_int16_t pfid;
+    u_int64_t rid;
+    u_int16_t pfid_start = 0x5000UL;
+    u_int64_t rid_begin = 0xdddddddd00000000ULL;
+    u_int64_t rid_end   = 0xdddddddd00000200ULL;
+
+    // insert, every second entry is locked
+    u_int64_t rid_incr_1 = 4ULL;
+    u_int64_t num_incr_1 = (rid_end - rid_begin)/rid_incr_1;
+    vector<u_int64_t> rid_list;
+    vector<u_int16_t> pfid_list;
+    enq_map e5;
+    e5.set_num_jfiles(pfid_start + (rid_end - rid_begin)/rid_incr_1);
+    for (rid = rid_begin, pfid = pfid_start; rid < rid_end; rid += rid_incr_1, pfid++)
+    {
+        BOOST_CHECK_EQUAL(e5.insert_pfid(rid, pfid), enq_map::EMAP_OK);
+        rid_list.push_back(rid);
+        pfid_list.push_back(pfid);
+    }
+    BOOST_CHECK_EQUAL(e5.size(), num_incr_1);
+    BOOST_CHECK_EQUAL(rid_list.size(), num_incr_1);
+    BOOST_CHECK_EQUAL(pfid_list.size(), num_incr_1);
+
+    vector<u_int64_t> ret_rid_list;
+    e5.rid_list(ret_rid_list);
+    BOOST_CHECK_EQUAL(ret_rid_list.size(), num_incr_1);
+    for (unsigned i=0; i<ret_rid_list.size(); i++)
+        BOOST_CHECK_EQUAL(rid_list[i], ret_rid_list[i]);
+
+    vector<u_int16_t> ret_pfid_list;
+    e5.pfid_list(ret_pfid_list);
+    BOOST_CHECK_EQUAL(ret_pfid_list.size(), num_incr_1);
+    for (unsigned i=0; i<ret_pfid_list.size(); i++)
+        BOOST_CHECK_EQUAL(pfid_list[i], ret_pfid_list[i]);
+    cout << "ok" << endl;
+}
+
+QPID_AUTO_TEST_CASE(enq_count)
+{
+    cout << test_filename << ".enq_count: " << flush;
+
+    enq_map e6;
+
+    // Check the allocation and cleanup as the file size is set both up and down
+    e6.set_num_jfiles(24);
+    e6.set_num_jfiles(0);
+    e6.set_num_jfiles(100);
+    e6.set_num_jfiles(4);
+
+    // Add 100 enqueues to file 1, check that the counts match
+    for (u_int16_t pfid=0; pfid<4; pfid++)
+        BOOST_CHECK_EQUAL(e6.get_enq_cnt(pfid), u_int32_t(0));
+    for (u_int64_t rid=0; rid<100; rid++)
+        BOOST_CHECK_EQUAL(e6.insert_pfid(rid, 1), enq_map::EMAP_OK);
+    for (u_int16_t pfid=0; pfid<4; pfid++)
+    {
+        if (pfid == 1)
+            BOOST_CHECK_EQUAL(e6.get_enq_cnt(pfid), u_int32_t(100));
+        else
+            BOOST_CHECK_EQUAL(e6.get_enq_cnt(pfid), u_int32_t(0));
+    }
+
+    // Now remove 10 from file 1, check that the counts match
+    for (u_int64_t rid=0; rid<100; rid+=10)
+        //e6.Xget_remove_pfid(rid);
+        BOOST_CHECK(e6.get_remove_pfid(rid) >= enq_map::EMAP_OK);
+    for (u_int16_t pfid=0; pfid<4; pfid++)
+    {
+        if (pfid == 1)
+            BOOST_CHECK_EQUAL(e6.get_enq_cnt(pfid), u_int32_t(90));
+        else
+            BOOST_CHECK_EQUAL(e6.get_enq_cnt(pfid), u_int32_t(0));
+    }
+
+    // Now resize the file up and make sure the count in file 1 still exists
+    e6.set_num_jfiles(8);
+    for (u_int16_t pfid=0; pfid<8; pfid++)
+    {
+        if (pfid == 1)
+            BOOST_CHECK_EQUAL(e6.get_enq_cnt(pfid), u_int32_t(90));
+        else
+            BOOST_CHECK_EQUAL(e6.get_enq_cnt(pfid), u_int32_t(0));
+    }
+
+    cout << "ok" << endl;
+}
+
+QPID_AUTO_TEST_CASE(stress)
+{
+    cout << test_filename << ".stress: " << flush;
+    u_int64_t rid;
+    u_int64_t rid_cnt;
+    u_int64_t rid_begin = 0xffffffff00000000ULL;
+    u_int64_t num_rid = 10;
+
+    enq_map e7;
+    e7.set_num_jfiles(rid_begin + num_rid);
+
+    // insert even rids with no dups
+    for (rid = rid_begin, rid_cnt = u_int64_t(0); rid_cnt < num_rid; rid += 2ULL, rid_cnt++)
+        BOOST_CHECK_EQUAL(e7.insert_pfid(rid, u_int16_t(0)), enq_map::EMAP_OK);
+    BOOST_CHECK_EQUAL(e7.size(), num_rid);
+
+    // insert odd rids with no dups
+    for (rid = rid_begin + 1, rid_cnt = u_int64_t(0); rid_cnt < num_rid; rid += 2ULL, rid_cnt++)
+        BOOST_CHECK_EQUAL(e7.insert_pfid(rid, u_int16_t(0)), enq_map::EMAP_OK);
+    BOOST_CHECK_EQUAL(e7.size(), num_rid * 2);
+
+    // remove even rids
+    for (rid = rid_begin, rid_cnt = u_int64_t(0); rid_cnt < num_rid; rid += 2ULL, rid_cnt++)
+        BOOST_CHECK(e7.get_remove_pfid(rid) >= enq_map::EMAP_OK);
+    BOOST_CHECK_EQUAL(e7.size(), num_rid);
+
+    cout << "ok" << endl;
+}
+
+QPID_AUTO_TEST_SUITE_END()



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org