You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by as...@apache.org on 2013/10/08 17:09:01 UTC

svn commit: r1530301 [5/8] - in /qpid/trunk/qpid: cpp/src/tests/legacystore/ cpp/src/tests/legacystore/federation/ cpp/src/tests/legacystore/jrnl/ cpp/src/tests/legacystore/jrnl/jtt/ cpp/src/tests/legacystore/python_tests/ tools/src/py/ tools/src/py/qp...

Added: qpid/trunk/qpid/cpp/src/tests/legacystore/jrnl/jtt/args.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/legacystore/jrnl/jtt/args.cpp?rev=1530301&view=auto
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/legacystore/jrnl/jtt/args.cpp (added)
+++ qpid/trunk/qpid/cpp/src/tests/legacystore/jrnl/jtt/args.cpp Tue Oct  8 15:09:00 2013
@@ -0,0 +1,226 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "args.h"
+
+#include <cstddef>
+#include <iostream>
+
+namespace po = boost::program_options;
+
+namespace mrg
+{
+namespace jtt
+{
+
+args::args(std::string opt_title):
+    _options_descr(opt_title),
+    format_chk(false),
+    keep_jrnls(false),
+    lld_rd_num(10),
+    lld_skip_num(100),
+    num_jrnls(1),
+    pause_secs(0),
+    randomize(false),
+    read_mode(),
+    read_prob(50),
+    recover_mode(false),
+    repeat_flag(false),
+    reuse_instance(false),
+    seed(0)
+{
+    _options_descr.add_options()
+        ("csv-file,c",
+         po::value<std::string>(&test_case_csv_file_name)->default_value("jtt.csv"),
+         "CSV file containing test cases.")
+
+        ("format-chk",
+         po::value<bool>(&format_chk)->zero_tokens(),
+         "Check the format of each journal file.")
+
+        ("help,h", "This help message.")
+
+        ("jrnl-dir",
+        po::value<std::string>(&journal_dir)->default_value("/var/tmp/jtt"),
+        "Directory in which journal files will be placed.")
+
+        ("keep-jrnls",
+         po::value<bool>(&keep_jrnls)->zero_tokens(),
+         "Keep all test journals.")
+
+        ("lld-rd-num",
+         po::value<unsigned>(&lld_rd_num)->default_value(10),
+         "Number of consecutive messages to read after only dequeueing lld-skip-num "
+         "messages during lazy-loading. Ignored if read-mode is not set to LAZYLOAD.")
+
+        ("lld-skip-num",
+         po::value<unsigned>(&lld_skip_num)->default_value(100),
+         "Number of consecutive messages to dequeue only (without reading) prior to "
+         "reading lld-rd-num messages. Ignored if read-mode is not set to LAZYLOAD.")
+
+        ("num-jrnls",
+         po::value<unsigned>(&num_jrnls)->default_value(1),
+         "Number of simultaneous journal instances to test.")
+
+        ("pause",
+         po::value<unsigned>(&pause_secs)->default_value(0),
+         "Pause in seconds between test cases (allows disk to catch up).")
+
+        ("randomize",
+         po::value<bool>(&randomize)->zero_tokens(),
+         "Randomize the order of the tests.")
+
+        ("read-mode",
+         po::value<read_arg>(&read_mode)->default_value(read_arg::NONE),
+         read_arg::descr().c_str())
+
+        ("read-prob",
+         po::value<unsigned>(&read_prob)->default_value(50),
+         "Read probability (percent) for each message when read-mode is set to RANDOM.")
+
+        ("recover-mode",
+         po::value<bool>(&recover_mode)->zero_tokens(),
+         "Recover journal from the previous test for each test case.")
+
+        ("repeat",
+         po::value<bool>(&repeat_flag)->zero_tokens(),
+         "Repeat all test cases indefinitely.")
+
+        ("reuse-instance",
+         po::value<bool>(&reuse_instance)->zero_tokens(),
+         "Reuse journal instance for all test cases.")
+
+        ("seed",
+         po::value<unsigned>(&seed)->default_value(0),
+         "Seed for use in random number generator.")
+
+        ("analyzer",
+        po::value<std::string>(&jfile_analyzer)->default_value("./file_chk.py"),
+        "Journal file analyzer program to use when the --format-chk option is used, ignored otherwise.")
+
+        ;
+}
+
+bool
+args::parse(int argc, char** argv) // return true if error, false if ok
+{
+    try
+    {
+        po::store(po::parse_command_line(argc, argv, _options_descr), _vmap);
+        po::notify(_vmap);
+    }
+    catch (const std::exception& e)
+    {
+        std::cout << "ERROR: " << e.what() << std::endl;
+        return usage();
+    }
+    if (_vmap.count("help"))
+        return usage();
+    if (num_jrnls == 0)
+    {
+        std::cout << "ERROR: num-jrnls must be 1 or more." << std::endl;
+        return usage();
+    }
+    if (read_prob > 100) // read_prob is unsigned, so no need to check < 0
+    {
+        std::cout << "ERROR: read-prob must be between 0 and 100 inclusive." << std::endl;
+        return usage();
+    }
+    if (repeat_flag && keep_jrnls)
+    {
+        std::string resp;
+        std::cout << "WARNING: repeat and keep-jrnls: Monitor disk usage as test journals will"
+                " accumulate." << std::endl;
+        std::cout << "Continue? <y/n> ";
+        std::cin >> resp;
+        if (resp.size() == 1)
+        {
+            if (resp[0] != 'y' && resp[0] != 'Y')
+                return true;
+        }
+        else if (resp.size() == 3) // any combo of lower- and upper-case
+        {
+            if (resp[0] != 'y' && resp[0] != 'Y')
+                return true;
+            if (resp[1] != 'e' && resp[1] != 'E')
+                return true;
+            if (resp[2] != 's' && resp[2] != 'S')
+                return true;
+        }
+        else
+            return true;
+    }
+    return false;
+}
+
+bool
+args::usage() const
+{
+    std::cout << _options_descr << std::endl;
+    return true;
+}
+
+void
+args::print_args() const
+{
+    std::cout << "Number of journals: " << num_jrnls << std::endl;
+    std::cout << "Read mode: " << read_mode << std::endl;
+    if (read_mode.val() == read_arg::RANDOM)
+        std::cout << "Read probability: " << read_prob << " %" << std::endl;
+    if (read_mode.val() == read_arg::LAZYLOAD)
+    {
+        std::cout << "Lazy-load skips: " << lld_skip_num << std::endl;
+        std::cout << "Lazy-load reads: " << lld_rd_num << std::endl;
+    }
+    if (pause_secs)
+        std::cout << "Pause between test cases: " << pause_secs << " sec." << std::endl;
+    if (seed)
+        std::cout << "Randomize seed: " << seed << std::endl;
+    print_flags();
+}
+
+void
+args::print_flags() const
+{
+    if (format_chk || keep_jrnls || randomize || recover_mode || repeat_flag ||
+            reuse_instance)
+    {
+        std::cout << "Flag options:";
+        // TODO: Get flag args and their strings directly from _options_descr.
+        if (format_chk)
+            std::cout << " format-chk";
+        if (keep_jrnls)
+            std::cout << " keep-jrnls";
+        if (randomize)
+            std::cout << " randomize";
+        if (recover_mode)
+            std::cout << " recover-mode";
+        if (repeat_flag)
+            std::cout << " repeat-flag";
+        if (reuse_instance)
+            std::cout << " reuse-instance";
+        std::cout << std::endl;
+    }
+    std::cout << std::endl;
+}
+
+} // namespace jtt
+} // namespace mrg

Added: qpid/trunk/qpid/cpp/src/tests/legacystore/jrnl/jtt/args.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/legacystore/jrnl/jtt/args.h?rev=1530301&view=auto
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/legacystore/jrnl/jtt/args.h (added)
+++ qpid/trunk/qpid/cpp/src/tests/legacystore/jrnl/jtt/args.h Tue Oct  8 15:09:00 2013
@@ -0,0 +1,66 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#ifndef mrg_jtt_args_hpp
+#define mrg_jtt_args_hpp
+
+#include <boost/program_options.hpp>
+#include "read_arg.h"
+
+namespace mrg
+{
+namespace jtt
+{
+
+    struct args
+    {
+        boost::program_options::options_description _options_descr;
+        boost::program_options::variables_map _vmap;
+
+        // Add args here
+        std::string jfile_analyzer;
+        std::string test_case_csv_file_name;
+        std::string journal_dir;
+        bool format_chk;
+        bool keep_jrnls;
+        unsigned lld_rd_num;
+        unsigned lld_skip_num;
+        unsigned num_jrnls;
+        unsigned pause_secs;
+        bool randomize;
+        read_arg read_mode;
+        unsigned read_prob;
+    	bool recover_mode;
+        bool repeat_flag;
+        bool reuse_instance;
+        unsigned seed;
+
+        args(std::string opt_title);
+        bool parse(int argc, char** argv); // return true if error, false if ok
+        bool usage() const; // return true
+        void print_args() const;
+        void print_flags() const;
+    };
+
+} // namespace jtt
+} // namespace mrg
+
+#endif // ifndef mrg_jtt_args_hpp

Added: qpid/trunk/qpid/cpp/src/tests/legacystore/jrnl/jtt/data_src.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/legacystore/jrnl/jtt/data_src.cpp?rev=1530301&view=auto
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/legacystore/jrnl/jtt/data_src.cpp (added)
+++ qpid/trunk/qpid/cpp/src/tests/legacystore/jrnl/jtt/data_src.cpp Tue Oct  8 15:09:00 2013
@@ -0,0 +1,87 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "data_src.h"
+
+#include <cstddef>
+#include <iomanip>
+#include <sstream>
+
+namespace mrg
+{
+namespace jtt
+{
+
+char data_src::_data_src[data_src::max_dsize];
+char data_src::_xid_src[data_src::max_xsize];
+bool data_src::_initialized = data_src::__init();
+u_int64_t data_src::_xid_cnt = 0ULL;
+mrg::journal::smutex data_src::_sm;
+
+data_src::data_src()
+{}
+
+bool
+data_src::__init()
+{
+    for (unsigned i=0; i<max_dsize; i++)
+        _data_src[i] = '0' + ((i + 1) % 10); // 123456789012345...
+    for (unsigned j=0; j<max_xsize; j++)
+        _xid_src[j] = 'a' + (j % 26);        // abc...xyzabc...
+    return true;
+}
+
+const char*
+data_src::get_data(const std::size_t offs)
+{
+    if (offs >= max_dsize) return 0;
+    return _data_src + offs;
+}
+
+std::string
+data_src::get_xid(const std::size_t xid_size)
+{
+    if (xid_size == 0)
+        return "";
+    std::ostringstream oss;
+    oss << std::setfill('0');
+    if (xid_size < 9)
+        oss << std::setw(xid_size) << get_xid_cnt();
+    else if (xid_size < 13)
+        oss << "xid:" << std::setw(xid_size - 4) << get_xid_cnt();
+    else
+    {
+        oss << "xid:" << std::setw(8) << get_xid_cnt() << ":";
+        oss.write(get_xid_content(13), xid_size - 13);
+    }
+    return oss.str();
+}
+
+const char*
+data_src::get_xid_content(const std::size_t offs)
+{
+    if (offs >= max_xsize) return 0;
+    return _xid_src + offs;
+}
+
+} // namespace jtt
+} // namespace mrg
+

Added: qpid/trunk/qpid/cpp/src/tests/legacystore/jrnl/jtt/data_src.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/legacystore/jrnl/jtt/data_src.h?rev=1530301&view=auto
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/legacystore/jrnl/jtt/data_src.h (added)
+++ qpid/trunk/qpid/cpp/src/tests/legacystore/jrnl/jtt/data_src.h Tue Oct  8 15:09:00 2013
@@ -0,0 +1,66 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#ifndef mrg_jtt_data_src_hpp
+#define mrg_jtt_data_src_hpp
+
+#include <cstddef>
+#include "qpid/legacystore/jrnl/slock.h"
+#include "qpid/legacystore/jrnl/smutex.h"
+#include <pthread.h>
+#include <string>
+#include <sys/types.h>
+
+#define DATA_SIZE 1024 * 1024
+#define XID_SIZE  1024 * 1024
+
+namespace mrg
+{
+namespace jtt
+{
+    class data_src
+    {
+    public:
+        static const std::size_t max_dsize = DATA_SIZE;
+        static const std::size_t max_xsize = XID_SIZE;
+
+    private:
+        static char _data_src[];
+        static char _xid_src[];
+        static u_int64_t _xid_cnt;
+        static bool _initialized;
+        static mrg::journal::smutex _sm;
+
+    public:
+        static const char* get_data(const std::size_t offs);
+        static std::string get_xid(const std::size_t xid_size);
+
+    private:
+        data_src();
+        static u_int64_t get_xid_cnt() { mrg::journal::slock s(_sm); return _xid_cnt++; }
+        static const char* get_xid_content(const std::size_t offs);
+        static bool __init();
+    };
+
+} // namespace jtt
+} // namespace mrg
+
+#endif // ifndef mrg_jtt_data_src_hpp

Added: qpid/trunk/qpid/cpp/src/tests/legacystore/jrnl/jtt/jfile_chk.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/legacystore/jrnl/jtt/jfile_chk.py?rev=1530301&view=auto
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/legacystore/jrnl/jtt/jfile_chk.py (added)
+++ qpid/trunk/qpid/cpp/src/tests/legacystore/jrnl/jtt/jfile_chk.py Tue Oct  8 15:09:00 2013
@@ -0,0 +1,838 @@
+#!/usr/bin/env python
+
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+import sys
+import getopt
+import string
+import xml.parsers.expat
+from struct import unpack, calcsize
+from time import gmtime, strftime
+
+dblk_size = 128
+sblk_size = 4 * dblk_size
+jfsize = None
+hdr_ver = 1
+
+TEST_NUM_COL = 0
+NUM_MSGS_COL = 5
+MIN_MSG_SIZE_COL = 7
+MAX_MSG_SIZE_COL = 8
+MIN_XID_SIZE_COL = 9
+MAX_XID_SIZE_COL = 10
+AUTO_DEQ_COL = 11
+TRANSIENT_COL = 12
+EXTERN_COL = 13
+COMMENT_COL = 20
+
+owi_mask       = 0x01
+transient_mask = 0x10
+extern_mask    = 0x20
+
+printchars = '0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ!"#$%&\'()*+,-./:;<=>?@[\\]^_`{|}~ '
+
+
+
+#== global functions ===========================================================
+
+def load(f, klass):
+    args = load_args(f, klass)
+    subclass = klass.discriminate(args)
+    result = subclass(*args)
+    if subclass != klass:
+        result.init(f, *load_args(f, subclass))
+    result.skip(f)
+    return result;
+
+def load_args(f, klass):
+    size = calcsize(klass.format)
+    foffs = f.tell(),
+    bin = f.read(size)
+    if len(bin) != size:
+        raise Exception("end of file")
+    return foffs + unpack(klass.format, bin)
+
+def size_blks(size, blk_size):
+    return (size + blk_size - 1)/blk_size
+
+def rem_in_blk(f, blk_size):
+    foffs = f.tell()
+    return (size_blks(f.tell(), blk_size) * blk_size) - foffs;
+
+def file_full(f):
+    return f.tell() >= jfsize
+
+def isprintable(s):
+    return s.strip(printchars) == ''
+
+def print_xid(xidsize, xid):
+    if xid == None:
+        if xidsize > 0:
+            raise Exception('Inconsistent XID size: xidsize=%d, xid=None' % xidsize)
+        return ''
+    if isprintable(xid):
+        xidstr = split_str(xid)
+    else:
+        xidstr = hex_split_str(xid)
+    if xidsize != len(xid):
+        raise Exception('Inconsistent XID size: xidsize=%d, xid(%d)=\"%s\"' % (xidsize, len(xid), xidstr))
+    return 'xid(%d)=\"%s\" ' % (xidsize, xidstr)
+
+def print_data(dsize, data):
+    if data == None:
+        return ''
+    if isprintable(data):
+        datastr = split_str(data)
+    else:
+        datastr = hex_split_str(data)
+    if dsize != len(data):
+        raise Exception('Inconsistent data size: dsize=%d, data(%d)=\"%s\"' % (dsize, len(data), datastr))
+    return 'data(%d)=\"%s\" ' % (dsize, datastr)
+
+def hex_split_str(s, split_size = 50):
+    if len(s) <= split_size:
+        return hex_str(s, 0, len(s))
+    if len(s) > split_size + 25:
+        return hex_str(s, 0, 10) + ' ... ' + hex_str(s, 55, 65) + ' ... ' + hex_str(s, len(s)-10, len(s))
+    return hex_str(s, 0, 10) + ' ... ' + hex_str(s, len(s)-10, len(s))
+
+def hex_str(s, b, e):
+    o = ''
+    for i in range(b, e):
+        if isprintable(s[i]):
+            o += s[i]
+        else:
+            o += '\\%02x' % ord(s[i])
+    return o
+
+def split_str(s, split_size = 50):
+    if len(s) < split_size:
+        return s
+    return s[:25] + ' ... ' + s[-25:]
+
+def inv_str(s):
+    si = ''
+    for i in range(0,len(s)):
+        si += chr(~ord(s[i]) & 0xff)
+    return si
+
+def load_file_data(f, size, data):
+    if size == 0:
+        return (data, True)
+    if data == None:
+        loaded = 0
+    else:
+        loaded = len(data)
+    foverflow = f.tell() + size - loaded > jfsize
+    if foverflow:
+        rsize = jfsize - f.tell()
+    else:
+        rsize = size - loaded
+    bin = f.read(rsize)
+    if data == None:
+        data = unpack('%ds' % (rsize), bin)[0]
+    else:
+        data = data + unpack('%ds' % (rsize), bin)[0]
+    return (data, not foverflow)
+
+def exit(code, qflag):
+    if code != 0 or not qflag:
+        print out.getvalue()
+    out.close()
+    sys.exit(code)
+
+#== class Sizeable =============================================================
+
+class Sizeable:
+
+    def size(self):
+        classes = [self.__class__]
+
+        size = 0
+        while classes:
+            cls = classes.pop()
+            if hasattr(cls, "format"):
+                size += calcsize(cls.format)
+            classes.extend(cls.__bases__)
+
+        return size
+
+
+#== class Hdr ==================================================================
+
+class Hdr(Sizeable):
+ 
+    format = '=4sBBHQ'
+    
+    def discriminate(args):
+        return CLASSES.get(args[1][-1], Hdr)
+    discriminate = staticmethod(discriminate)
+
+    def __init__(self, foffs, magic, ver, end, flags, rid):
+        self.foffs = foffs
+        self.magic = magic
+        self.ver = ver
+        self.end = end
+        self.flags = flags
+        self.rid = rid
+        if self.magic[-1] not in ['0x00', 'a', 'c', 'd', 'e', 'f', 'x']:
+            error = 3
+        
+    def __str__(self):
+        if self.empty():
+            return '0x%08x: <empty>' % (self.foffs)
+        if self.magic[-1] == 'x':
+            return '0x%08x: [\"%s\"]' % (self.foffs, self.magic)
+        if self.magic[-1] in ['a', 'c', 'd', 'e', 'f', 'x']:
+            return '0x%08x: [\"%s\" v=%d e=%d f=0x%04x rid=0x%x]' % (self.foffs, self.magic, self.ver, self.end, self.flags, self.rid)
+        return '0x%08x: <error, unknown magic \"%s\" (possible overwrite boundary?)>' %  (self.foffs, self.magic)
+
+    def empty(self):
+        return self.magic == '\x00'*4
+
+    def owi(self):
+        return self.flags & owi_mask != 0
+
+    def skip(self, f):
+        f.read(rem_in_blk(f, dblk_size))
+
+    def check(self):
+        if self.empty() or self.magic[:3] != 'RHM' or self.magic[3] not in ['a', 'c', 'd', 'e', 'f', 'x']:
+            return True
+        if self.ver != hdr_ver and self.magic[-1] != 'x':
+            raise Exception('%s: Invalid header version: found %d, expected %d.' % (self, self.ver, hdr_ver))
+        return False
+        
+
+#== class FileHdr ==============================================================
+
+class FileHdr(Hdr):
+
+    format = '=2H4x3Q'
+
+    def init(self, f, foffs, fid, lid, fro, time_sec, time_ns):
+        self.fid = fid
+        self.lid = lid
+        self.fro = fro
+        self.time_sec = time_sec
+        self.time_ns = time_ns
+        
+    def __str__(self):
+        return '%s fid=%d lid=%d fro=0x%08x t=%s' % (Hdr.__str__(self), self.fid, self.lid, self.fro, self.timestamp_str())
+
+    def skip(self, f):
+        f.read(rem_in_blk(f, sblk_size))
+
+    def timestamp(self):
+        return (self.time_sec, self.time_ns)
+
+    def timestamp_str(self):
+        ts = gmtime(self.time_sec)
+        fstr = '%%a %%b %%d %%H:%%M:%%S.%09d %%Y' % (self.time_ns)
+        return strftime(fstr, ts)
+
+
+#== class DeqHdr ===============================================================
+
+class DeqHdr(Hdr):
+
+    format = '=QQ'
+
+    def init(self, f, foffs, deq_rid, xidsize):
+        self.deq_rid = deq_rid
+        self.xidsize = xidsize
+        self.xid = None
+        self.deq_tail = None
+        self.xid_complete = False
+        self.tail_complete = False
+        self.tail_bin = None
+        self.tail_offs = 0
+        self.load(f)
+
+    def load(self, f):
+        if self.xidsize == 0:
+            self.xid_complete = True
+            self.tail_complete = True
+        else:
+            if not self.xid_complete:
+                ret = load_file_data(f, self.xidsize, self.xid)
+                self.xid = ret[0]
+                self.xid_complete = ret[1]
+            if self.xid_complete and not self.tail_complete:
+                ret = load_file_data(f, calcsize(RecTail.format), self.tail_bin)
+                self.tail_bin = ret[0]
+                if ret[1]:
+                    self.enq_tail = RecTail(self.tail_offs, *unpack(RecTail.format, self.tail_bin))
+                    if self.enq_tail.magic_inv != inv_str(self.magic) or self.enq_tail.rid != self.rid:
+                        print " > %s" % self
+                        raise Exception('Invalid dequeue record tail (magic=%s; rid=%d) at 0x%08x' % (self.enq_tail, self.enq_tail.rid, self.enq_tail.foffs))
+                    self.enq_tail.skip(f)
+                self.tail_complete = ret[1]
+        return self.complete()
+
+    def complete(self):
+        return self.xid_complete and self.tail_complete
+
+    def __str__(self):
+        return '%s %sdrid=0x%x' % (Hdr.__str__(self), print_xid(self.xidsize, self.xid), self.deq_rid)
+
+
+#== class TxnHdr ===============================================================
+
+class TxnHdr(Hdr):
+
+    format = '=Q'
+
+    def init(self, f, foffs, xidsize):
+        self.xidsize = xidsize
+        self.xid = None
+        self.tx_tail = None
+        self.xid_complete = False
+        self.tail_complete = False
+        self.tail_bin = None
+        self.tail_offs = 0
+        self.load(f)
+
+    def load(self, f):
+        if not self.xid_complete:
+            ret = load_file_data(f, self.xidsize, self.xid)
+            self.xid = ret[0]
+            self.xid_complete = ret[1]
+        if self.xid_complete and not self.tail_complete:
+            ret = load_file_data(f, calcsize(RecTail.format), self.tail_bin)
+            self.tail_bin = ret[0]
+            if ret[1]:
+                self.enq_tail = RecTail(self.tail_offs, *unpack(RecTail.format, self.tail_bin))
+                if self.enq_tail.magic_inv != inv_str(self.magic) or self.enq_tail.rid != self.rid:
+                    print " > %s" % self
+                    raise Exception('Invalid transaction record tail (magic=%s; rid=%d) at 0x%08x' % (self.enq_tail, self.enq_tail.rid, self.enq_tail.foffs))
+                self.enq_tail.skip(f)
+            self.tail_complete = ret[1]
+        return self.complete()
+
+    def complete(self):
+        return self.xid_complete and self.tail_complete
+
+    def __str__(self):
+        return '%s %s' % (Hdr.__str__(self), print_xid(self.xidsize, self.xid))
+
+
+#== class RecTail ==============================================================
+
+class RecTail(Sizeable):
+
+    format = '=4sQ'
+
+    def __init__(self, foffs, magic_inv, rid):
+        self.foffs = foffs
+        self.magic_inv = magic_inv
+        self.rid = rid
+
+    def __str__(self):
+        magic = inv_str(self.magic_inv)
+        return '[\"%s\" rid=0x%x]' % (magic, self.rid)
+
+    def skip(self, f):
+        f.read(rem_in_blk(f, dblk_size))
+
+
+#== class EnqRec ===============================================================
+
+class EnqRec(Hdr):
+
+    format = '=QQ'
+
+    def init(self, f, foffs, xidsize, dsize):
+        self.xidsize = xidsize
+        self.dsize = dsize
+        self.transient = self.flags & transient_mask > 0
+        self.extern = self.flags & extern_mask > 0
+        self.xid = None
+        self.data = None
+        self.enq_tail = None
+        self.xid_complete = False
+        self.data_complete = False
+        self.tail_complete = False
+        self.tail_bin = None
+        self.tail_offs = 0
+        self.load(f)
+
+    def load(self, f):
+        if not self.xid_complete:
+            ret = load_file_data(f, self.xidsize, self.xid)
+            self.xid = ret[0]
+            self.xid_complete = ret[1]
+        if self.xid_complete and not self.data_complete:
+            if self.extern:
+                self.data_complete = True
+            else:
+                ret = load_file_data(f, self.dsize, self.data)
+                self.data = ret[0]
+                self.data_complete = ret[1]
+        if self.data_complete and not self.tail_complete:
+            ret = load_file_data(f, calcsize(RecTail.format), self.tail_bin)
+            self.tail_bin = ret[0]
+            if ret[1]:
+                self.enq_tail = RecTail(self.tail_offs, *unpack(RecTail.format, self.tail_bin))
+                if self.enq_tail.magic_inv != inv_str(self.magic) or self.enq_tail.rid != self.rid:
+                    print " > %s" % self
+                    raise Exception('Invalid enqueue record tail (magic=%s; rid=%d) at 0x%08x' % (self.enq_tail, self.enq_tail.rid, self.enq_tail.foffs))
+                self.enq_tail.skip(f)
+            self.tail_complete = ret[1]
+        return self.complete()
+
+    def complete(self):
+        return self.xid_complete and self.data_complete and self.tail_complete
+
+    def print_flags(self):
+        s = ''
+        if self.transient:
+            s = '*TRANSIENT'
+        if self.extern:
+            if len(s) > 0:
+                s += ',EXTERNAL'
+            else:
+                s = '*EXTERNAL'
+        if len(s) > 0:
+            s += '*'
+        return s
+
+    def __str__(self):
+        return '%s %s%s %s %s' % (Hdr.__str__(self), print_xid(self.xidsize, self.xid), print_data(self.dsize, self.data), self.enq_tail, self.print_flags())
+
+
+#== class Main =================================================================
+
+class Main:
+    def __init__(self, argv):
+        self.bfn = None
+        self.csvfn = None
+        self.jdir = None
+        self.aflag = False
+        self.hflag = False
+        self.qflag = False
+        self.tnum = None
+        self.num_jfiles = None
+        self.num_msgs = None
+        self.msg_len = None
+        self.auto_deq = None
+        self.xid_len = None
+        self.transient = None
+        self.extern = None
+
+        self.file_start = 0
+        self.file_num = 0
+        self.fro = 0x200
+        self.emap = {}
+        self.tmap = {}
+        self.rec_cnt = 0
+        self.msg_cnt = 0
+        self.txn_msg_cnt = 0
+        self.fhdr = None
+        self.f = None
+        self.first_rec = False
+        self.last_file = False
+        self.last_rid = -1
+        self.fhdr_owi_at_msg_start = None
+
+        self.proc_args(argv)
+        self.proc_csv()
+        self.read_jinf()
+    
+    def run(self):
+        try:
+            start_info = self.analyze_files()
+            stop = self.advance_file(*start_info)
+        except Exception:
+            print 'WARNING: All journal files are empty.'
+            if self.num_msgs > 0:
+                raise Exception('All journal files are empty, but %d msgs expectd.' % self.num_msgs)
+            else:
+                stop = True
+        while not stop:
+            warn = ''
+            if file_full(self.f):
+                stop = self.advance_file()
+                if stop:
+                    break
+            hdr = load(self.f, Hdr)
+            if hdr.empty():
+                stop = True;
+                break
+            if hdr.check():
+                stop = True;
+            else:
+                self.rec_cnt += 1
+                self.fhdr_owi_at_msg_start = self.fhdr.owi()
+                if self.first_rec:
+                    if self.fhdr.fro != hdr.foffs:
+                        raise Exception('File header first record offset mismatch: fro=0x%08x; rec_offs=0x%08x' % (self.fhdr.fro, hdr.foffs))
+                    else:
+                        if not self.qflag: print ' * fro ok: 0x%08x' % self.fhdr.fro
+                    self.first_rec = False
+                if isinstance(hdr, EnqRec) and not stop:
+                    while not hdr.complete():
+                        stop = self.advance_file()
+                        if stop:
+                            break
+                        hdr.load(self.f)
+                    if self.extern != None:
+                        if hdr.extern:
+                            if hdr.data != None:
+                                raise Exception('Message data found on external record')
+                        else:
+                            if self.msg_len > 0 and len(hdr.data) != self.msg_len:
+                                raise Exception('Message length (%d) incorrect; expected %d' % (len(hdr.data), self.msg_len))
+                    else:
+                        if self.msg_len > 0 and len(hdr.data) != self.msg_len:
+                            raise Exception('Message length (%d) incorrect; expected %d' % (len(hdr.data), self.msg_len))
+                    if self.xid_len > 0 and len(hdr.xid) != self.xid_len:
+                        print '  ERROR: XID length (%d) incorrect; expected %d' % (len(hdr.xid), self.xid_len)
+                        sys.exit(1)
+                        #raise Exception('XID length (%d) incorrect; expected %d' % (len(hdr.xid), self.xid_len))
+                    if self.transient != None:
+                        if self.transient:
+                            if not hdr.transient:
+                                raise Exception('Expected transient record, found persistent')
+                        else:
+                            if hdr.transient:
+                                raise Exception('Expected persistent record, found transient')
+                    stop = not self.check_owi(hdr)
+                    if  stop:
+                        warn = ' (WARNING: OWI mismatch - could be overwrite boundary.)'
+                    else:
+                        self.msg_cnt += 1
+                        if self.aflag or self.auto_deq:
+                            if hdr.xid == None:
+                                self.emap[hdr.rid] = (self.fhdr.fid, hdr, False)
+                            else:
+                                self.txn_msg_cnt += 1
+                                if hdr.xid in self.tmap:
+                                    self.tmap[hdr.xid].append((self.fhdr.fid, hdr)) #Append tuple to existing list
+                                else:
+                                    self.tmap[hdr.xid] = [(self.fhdr.fid, hdr)] # Create new list
+                elif isinstance(hdr, DeqHdr) and not stop:
+                    while not hdr.complete():
+                        stop = self.advance_file()
+                        if stop:
+                            break
+                        hdr.load(self.f)
+                    stop = not self.check_owi(hdr)
+                    if stop:
+                        warn = ' (WARNING: OWI mismatch - could be overwrite boundary.)'
+                    else:
+                        if self.auto_deq != None:
+                            if not self.auto_deq:
+                                warn = ' WARNING: Dequeue record rid=%d found in non-dequeue test - ignoring.' % hdr.rid
+                        if self.aflag or self.auto_deq:
+                            if hdr.xid == None:
+                                if hdr.deq_rid in self.emap:
+                                    if self.emap[hdr.deq_rid][2]:
+                                        warn = ' (WARNING: dequeue rid 0x%x dequeues locked enqueue record 0x%x)' % (hdr.rid, hdr.deq_rid)
+                                    del self.emap[hdr.deq_rid]
+                                else:
+                                    warn = ' (WARNING: rid being dequeued 0x%x not found in enqueued records)' % hdr.deq_rid
+                            else:
+                                if hdr.deq_rid in self.emap:
+                                    t = self.emap[hdr.deq_rid]
+                                    self.emap[hdr.deq_rid] = (t[0], t[1], True) # Lock enq record
+                                if hdr.xid in self.tmap:
+                                    self.tmap[hdr.xid].append((self.fhdr.fid, hdr)) #Append to existing list
+                                else:
+                                    self.tmap[hdr.xid] = [(self.fhdr.fid, hdr)] # Create new list
+                elif isinstance(hdr, TxnHdr) and not stop:
+                    while not hdr.complete():
+                        stop = self.advance_file()
+                        if stop:
+                            break
+                        hdr.load(self.f)
+                    stop = not self.check_owi(hdr)
+                    if stop:
+                        warn = ' (WARNING: OWI mismatch - could be overwrite boundary.)'
+                    else:
+                        if hdr.xid in self.tmap:
+                            mismatched_rids = []
+                            if hdr.magic[-1] == 'c': # commit
+                                for rec in self.tmap[hdr.xid]:
+                                    if isinstance(rec[1], EnqRec):
+                                        self.emap[rec[1].rid] = (rec[0], rec[1], False) # Transfer enq to emap
+                                    elif isinstance(rec[1], DeqHdr):
+                                        if rec[1].deq_rid in self.emap:
+                                            del self.emap[rec[1].deq_rid] # Delete from emap
+                                        else:
+                                            mismatched_rids.append('0x%x' % rec[1].deq_rid)
+                                    else:
+                                        raise Exception('Unknown header found in txn map: %s' % rec[1])
+                            elif hdr.magic[-1] == 'a': # abort
+                                for rec in self.tmap[hdr.xid]:
+                                    if isinstance(rec[1], DeqHdr):
+                                        if rec[1].deq_rid in self.emap:
+                                            t = self.emap[rec[1].deq_rid]
+                                            self.emap[rec[1].deq_rid] = (t[0], t[1], False) # Unlock enq record
+                            del self.tmap[hdr.xid]
+                            if len(mismatched_rids) > 0:
+                                warn = ' (WARNING: transactional dequeues not found in enqueue map; rids=%s)' % mismatched_rids
+                        else:
+                            warn = ' (WARNING: %s not found in transaction map)' % print_xid(len(hdr.xid), hdr.xid)
+                if not self.qflag: print ' > %s%s' % (hdr, warn)
+                if not stop:
+                    stop = (self.last_file and hdr.check()) or hdr.empty() or self.fhdr.empty()
+
+    def analyze_files(self):
+        fname = ''
+        fnum = -1
+        rid = -1
+        fro = -1
+        tss = ''
+        if not self.qflag: print 'Analyzing journal files:'
+        owi_found = False
+        for i in range(0, self.num_jfiles):
+            jfn = self.jdir + '/' + self.bfn + '.%04d.jdat' % i
+            f = open(jfn)
+            fhdr = load(f, Hdr)
+            if fhdr.empty():
+                if not self.qflag:
+                    print '  %s: file empty' % jfn
+                break
+            if i == 0:
+                init_owi = fhdr.owi()
+                fname = jfn
+                fnum = i
+                rid = fhdr.rid
+                fro = fhdr.fro
+                tss = fhdr.timestamp_str()
+            elif fhdr.owi() != init_owi and not owi_found:
+                fname = jfn
+                fnum = i
+                rid = fhdr.rid
+                fro = fhdr.fro
+                tss = fhdr.timestamp_str()
+                owi_found = True
+            if not self.qflag:
+                print '  %s: owi=%s rid=0x%x, fro=0x%08x ts=%s' % (jfn, fhdr.owi(), fhdr.rid, fhdr.fro, fhdr.timestamp_str())
+        if fnum < 0 or rid < 0 or fro < 0:
+            raise Exception('All journal files empty')
+        if not self.qflag: print '  Oldest complete file: %s: rid=%d, fro=0x%08x ts=%s' % (fname, rid, fro, tss)
+        return (fnum, rid, fro)
+
+    def advance_file(self, *start_info):
+        seek_flag = False
+        if len(start_info) == 3:
+            self.file_start = self.file_num = start_info[0]
+            self.fro = start_info[2]
+            seek_flag = True
+        if self.f != None and file_full(self.f):
+            self.file_num = self.incr_fnum()
+            if self.file_num == self.file_start:
+                return True
+            if self.file_start == 0:
+                self.last_file = self.file_num == self.num_jfiles - 1
+            else:
+                self.last_file = self.file_num == self.file_start - 1
+        if self.file_num < 0 or self.file_num >= self.num_jfiles:
+            raise Exception('Bad file number %d' % self.file_num)
+        jfn = self.jdir + '/' + self.bfn + '.%04d.jdat' % self.file_num
+        self.f = open(jfn)
+        self.fhdr = load(self.f, Hdr)
+        if seek_flag and self.f.tell() != self.fro:
+            self.f.seek(self.fro)
+        self.first_rec = True
+        if not self.qflag: print jfn, ": ", self.fhdr
+        return False
+
+    def incr_fnum(self):
+        self.file_num += 1
+        if self.file_num >= self.num_jfiles:
+            self.file_num = 0;
+        return self.file_num
+
+    def check_owi(self, hdr):
+        return self.fhdr_owi_at_msg_start == hdr.owi()
+
+    def check_rid(self, hdr):
+        if  self.last_rid != -1 and hdr.rid <= self.last_rid:
+            return False
+        self.last_rid = hdr.rid
+        return True
+
+    def read_jinf(self):
+        filename = self.jdir + '/' + self.bfn + '.jinf'
+        try:
+            f = open(filename, 'r')
+        except IOError:
+            print 'ERROR: Unable to open jinf file %s' % filename
+            sys.exit(1)
+        p = xml.parsers.expat.ParserCreate()
+        p.StartElementHandler = self.handleStartElement
+        p.CharacterDataHandler = self.handleCharData
+        p.EndElementHandler = self.handleEndElement
+        p.ParseFile(f)
+        if self.num_jfiles == None:
+            print 'ERROR: number_jrnl_files not found in jinf file "%s"!' % filename
+        if jfsize == None:
+            print 'ERROR: jrnl_file_size_sblks not found in jinf file "%s"!' % filename
+        if self.num_jfiles == None or jfsize == None:
+            sys.exit(1)
+
+    def handleStartElement(self, name, attrs):
+        global jfsize
+        if name == 'number_jrnl_files':
+            self.num_jfiles = int(attrs['value'])
+        if name == 'jrnl_file_size_sblks':
+            jfsize = (int(attrs['value']) + 1) * sblk_size
+
+    def handleCharData(self, data): pass
+
+    def handleEndElement(self, name): pass
+
+    def proc_csv(self):
+        if self.csvfn != None and self.tnum != None:
+            tparams = self.get_test(self.csvfn, self.tnum)
+            if tparams == None:
+                print 'ERROR: Test %d not found in CSV file "%s"' % (self.tnum, self.csvfn)
+                sys.exit(1)
+            self.num_msgs = tparams['num_msgs']
+            if tparams['min_size'] == tparams['max_size']:
+                self.msg_len = tparams['max_size']
+            else:
+                self.msg_len = 0
+            self.auto_deq = tparams['auto_deq']
+            if tparams['xid_min_size'] == tparams['xid_max_size']:
+                self.xid_len = tparams['xid_max_size']
+            else:
+                self.xid_len = 0
+            self.transient = tparams['transient']
+            self.extern = tparams['extern']
+
+    def get_test(self, filename, tnum):
+        try:
+            f=open(filename, 'r')
+        except IOError:
+            print 'ERROR: Unable to open CSV file "%s"' % filename
+            sys.exit(1)
+        for l in f:
+            sl = l.strip().split(',')
+            if len(sl[0]) > 0 and sl[0][0] != '"':
+                try:
+                    if (int(sl[TEST_NUM_COL]) == tnum):
+                        return { 'num_msgs':int(sl[NUM_MSGS_COL]),
+                                 'min_size':int(sl[MIN_MSG_SIZE_COL]),
+                                 'max_size':int(sl[MAX_MSG_SIZE_COL]),
+                                 'auto_deq':not (sl[AUTO_DEQ_COL] == 'FALSE' or sl[AUTO_DEQ_COL] == '0'),
+                                 'xid_min_size':int(sl[MIN_XID_SIZE_COL]),
+                                 'xid_max_size':int(sl[MAX_XID_SIZE_COL]),
+                                 'transient':not (sl[TRANSIENT_COL] == 'FALSE' or sl[TRANSIENT_COL] == '0'),
+                                 'extern':not (sl[EXTERN_COL] == 'FALSE' or sl[EXTERN_COL] == '0'),
+                                 'comment':sl[COMMENT_COL] }
+                except Exception:
+                    pass
+        return None
+        
+    def proc_args(self, argv):
+        try:
+            opts, args = getopt.getopt(sys.argv[1:], "ab:c:d:hqt:", ["analyse", "base-filename=", "csv-filename=", "dir=", "help", "quiet", "test-num="])
+        except getopt.GetoptError:
+            self.usage()
+            sys.exit(2)
+        for o, a in opts:
+            if o in ("-h", "--help"):
+                self.usage()
+                sys.exit()
+            if o in ("-a", "--analyze"):
+                self.aflag = True
+            if o in ("-b", "--base-filename"):
+                self.bfn = a
+            if o in ("-c", "--csv-filename"):
+                self.csvfn = a
+            if o in ("-d", "--dir"):
+                self.jdir = a
+            if o in ("-q", "--quiet"):
+                self.qflag = True
+            if o in ("-t", "--test-num"):
+                if not a.isdigit():
+                    print 'ERROR: Illegal test-num argument. Must be a non-negative number'
+                    sys.exit(2)
+                self.tnum = int(a)
+        if self.bfn == None or self.jdir == None:
+            print 'ERROR: Missing requred args.'
+            self.usage()
+            sys.exit(2)
+        if self.tnum != None and self.csvfn == None:
+            print 'ERROR: Test number specified, but not CSV file'
+            self.usage()
+            sys.exit(2)
+
+    def usage(self):
+        print 'Usage: %s opts' % sys.argv[0]
+        print '  where opts are in either short or long format (*=req\'d):'
+        print '  -a --analyze                  Analyze enqueue/dequeue records'
+        print '  -b --base-filename [string] * Base filename for journal files'
+        print '  -c --csv-filename  [string]   CSV filename containing test parameters'
+        print '  -d --dir           [string] * Journal directory containing journal files'
+        print '  -h --help                     Print help'
+        print '  -q --quiet                    Quiet (reduced output)'
+        print '  -t --test-num      [int]      Test number from CSV file - only valid if CSV file named'
+
+    def report(self):
+        if not self.qflag:
+            print
+            print ' === REPORT ===='
+            if self.num_msgs > 0 and self.msg_cnt != self.num_msgs:
+                print 'WARNING: Found %d messages; %d expected.' % (self.msg_cnt, self.num_msgs)
+            if len(self.emap) > 0:
+                print
+                print 'Remaining enqueued records (sorted by rid): '
+                keys = sorted(self.emap.keys())
+                for k in keys:
+                    if self.emap[k][2] == True: # locked
+                        locked = ' (locked)'
+                    else:
+                        locked = ''
+                    print "  fid=%d %s%s" % (self.emap[k][0], self.emap[k][1], locked)
+                print 'WARNING: Enqueue-Dequeue mismatch, %d enqueued records remain.' % len(self.emap)
+            if len(self.tmap) > 0:
+                txn_rec_cnt = 0
+                print
+                print 'Remaining transactions: '
+                for t in self.tmap:
+                    print_xid(len(t), t)
+                    for r in self.tmap[t]:
+                        print "  fid=%d %s" % (r[0], r[1])
+                    print " Total: %d records for xid %s" % (len(self.tmap[t]), t)
+                    txn_rec_cnt += len(self.tmap[t])
+                print 'WARNING: Incomplete transactions, %d xids remain containing %d records.' % (len(self.tmap), txn_rec_cnt)
+            print '%d enqueues, %d journal records processed.' % (self.msg_cnt, self.rec_cnt)
+
+
+#===============================================================================
+
+CLASSES = {
+    "a": TxnHdr,
+    "c": TxnHdr,
+    "d": DeqHdr,
+    "e": EnqRec,
+    "f": FileHdr
+}
+
+m = Main(sys.argv)
+m.run()
+m.report()
+
+sys.exit(None)

Propchange: qpid/trunk/qpid/cpp/src/tests/legacystore/jrnl/jtt/jfile_chk.py
------------------------------------------------------------------------------
    svn:executable = *

Added: qpid/trunk/qpid/cpp/src/tests/legacystore/jrnl/jtt/jrnl_init_params.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/legacystore/jrnl/jtt/jrnl_init_params.cpp?rev=1530301&view=auto
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/legacystore/jrnl/jtt/jrnl_init_params.cpp (added)
+++ qpid/trunk/qpid/cpp/src/tests/legacystore/jrnl/jtt/jrnl_init_params.cpp Tue Oct  8 15:09:00 2013
@@ -0,0 +1,77 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "jrnl_init_params.h"
+
+namespace mrg
+{
+namespace jtt
+{
+
+jrnl_init_params::jrnl_init_params(const std::string& jid, const std::string& jdir, const std::string& base_filename,
+        const u_int16_t num_jfiles, const bool ae, const u_int16_t ae_max_jfiles, const u_int32_t jfsize_sblks,
+        const u_int16_t wcache_num_pages, const u_int32_t wcache_pgsize_sblks):
+        _jid(jid),
+        _jdir(jdir),
+        _base_filename(base_filename),
+        _num_jfiles(num_jfiles),
+        _ae(ae),
+        _ae_max_jfiles(ae_max_jfiles),
+        _jfsize_sblks(jfsize_sblks),
+        _wcache_num_pages(wcache_num_pages),
+        _wcache_pgsize_sblks(wcache_pgsize_sblks)
+{}
+
+jrnl_init_params::jrnl_init_params(const jrnl_init_params& jp):
+        _jid(jp._jid),
+        _jdir(jp._jdir),
+        _base_filename(jp._base_filename),
+        _num_jfiles(jp._num_jfiles),
+        _ae(jp._ae),
+        _ae_max_jfiles(jp._ae_max_jfiles),
+        _jfsize_sblks(jp._jfsize_sblks),
+        _wcache_num_pages(jp._wcache_num_pages),
+        _wcache_pgsize_sblks(jp._wcache_pgsize_sblks)
+{}
+
+jrnl_init_params::jrnl_init_params(const jrnl_init_params* const jp_ptr):
+        _jid(jp_ptr->_jid),
+        _jdir(jp_ptr->_jdir),
+        _base_filename(jp_ptr->_base_filename),
+        _num_jfiles(jp_ptr->_num_jfiles),
+        _ae(jp_ptr->_ae),
+        _ae_max_jfiles(jp_ptr->_ae_max_jfiles),
+        _jfsize_sblks(jp_ptr->_jfsize_sblks),
+        _wcache_num_pages(jp_ptr->_wcache_num_pages),
+        _wcache_pgsize_sblks(jp_ptr->_wcache_pgsize_sblks)
+{}
+
+// static initializers
+
+const u_int16_t jrnl_init_params::def_num_jfiles = 8;
+const bool      jrnl_init_params::def_ae = false;
+const u_int16_t jrnl_init_params::def_ae_max_jfiles = 0;
+const u_int32_t jrnl_init_params::def_jfsize_sblks = 0xc00;
+const u_int16_t jrnl_init_params::def_wcache_num_pages = 32;
+const u_int32_t jrnl_init_params::def_wcache_pgsize_sblks = 64;
+
+} // namespace jtt
+} // namespace mrg

Added: qpid/trunk/qpid/cpp/src/tests/legacystore/jrnl/jtt/jrnl_init_params.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/legacystore/jrnl/jtt/jrnl_init_params.h?rev=1530301&view=auto
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/legacystore/jrnl/jtt/jrnl_init_params.h (added)
+++ qpid/trunk/qpid/cpp/src/tests/legacystore/jrnl/jtt/jrnl_init_params.h Tue Oct  8 15:09:00 2013
@@ -0,0 +1,80 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#ifndef mrg_jtt_jrnl_init_params_hpp
+#define mrg_jtt_jrnl_init_params_hpp
+
+#include <boost/shared_ptr.hpp>
+#include <string>
+#include <sys/types.h>
+
+namespace mrg
+{
+namespace jtt
+{
+
+    class jrnl_init_params
+    {
+    public:
+        static const u_int16_t def_num_jfiles;
+        static const bool def_ae;
+        static const u_int16_t def_ae_max_jfiles;
+        static const u_int32_t def_jfsize_sblks;
+        static const u_int16_t def_wcache_num_pages;
+        static const u_int32_t def_wcache_pgsize_sblks;
+
+        typedef boost::shared_ptr<jrnl_init_params> shared_ptr;
+
+    private:
+        std::string _jid;
+        std::string _jdir;
+        std::string _base_filename;
+        u_int16_t _num_jfiles;
+        bool _ae;
+        u_int16_t _ae_max_jfiles;
+        u_int32_t _jfsize_sblks;
+        u_int16_t _wcache_num_pages;
+        u_int32_t _wcache_pgsize_sblks;
+
+    public:
+        jrnl_init_params(const std::string& jid, const std::string& jdir, const std::string& base_filename,
+                const u_int16_t num_jfiles = def_num_jfiles, const bool ae = def_ae,
+                const u_int16_t ae_max_jfiles = def_ae_max_jfiles, const u_int32_t jfsize_sblks = def_jfsize_sblks,
+                const u_int16_t wcache_num_pages = def_wcache_num_pages,
+                const u_int32_t wcache_pgsize_sblks = def_wcache_pgsize_sblks);
+        jrnl_init_params(const jrnl_init_params& jp);
+        jrnl_init_params(const jrnl_init_params* const jp_ptr);
+
+        inline const std::string& jid() const { return _jid; }
+        inline const std::string& jdir() const { return _jdir; }
+        inline const std::string& base_filename() const { return _base_filename; }
+        inline u_int16_t num_jfiles() const { return _num_jfiles; }
+        inline bool is_ae() const { return _ae; }
+        inline u_int16_t ae_max_jfiles() const { return _ae_max_jfiles; }
+        inline u_int32_t jfsize_sblks() const { return _jfsize_sblks; }
+        inline u_int16_t wcache_num_pages() const { return _wcache_num_pages; }
+        inline u_int32_t wcache_pgsize_sblks() const { return _wcache_pgsize_sblks; }
+    };
+
+} // namespace jtt
+} // namespace mrg
+
+#endif // ifndef mrg_jtt_jrnl_init_params_hpp

Added: qpid/trunk/qpid/cpp/src/tests/legacystore/jrnl/jtt/jrnl_instance.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/legacystore/jrnl/jtt/jrnl_instance.cpp?rev=1530301&view=auto
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/legacystore/jrnl/jtt/jrnl_instance.cpp (added)
+++ qpid/trunk/qpid/cpp/src/tests/legacystore/jrnl/jtt/jrnl_instance.cpp Tue Oct  8 15:09:00 2013
@@ -0,0 +1,439 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "jrnl_instance.h"
+
+#include <cstdlib>
+#include "data_src.h"
+#include "qpid/legacystore/jrnl/data_tok.h"
+#include "qpid/legacystore/jrnl/jerrno.h"
+#include "test_case_result.h"
+
+#define MAX_WR_WAIT             10 // in ms
+#define MAX_RD_WAIT             100 // in ms
+#define MAX_ENQCAPTHRESH_CNT    1000 // 10s if MAX_WR_WAIT is 10 ms
+
+namespace mrg
+{
+namespace jtt
+{
+
+jrnl_instance::jrnl_instance(const std::string& jid, const std::string& jdir, const std::string& base_filename,
+        const u_int16_t num_jfiles, const bool ae, const u_int16_t ae_max_jfiles, const u_int32_t jfsize_sblks,
+        const u_int16_t wcache_num_pages, const u_int32_t wcache_pgsize_sblks):
+        mrg::journal::jcntl(jid, jdir, base_filename),
+        _jpp(new jrnl_init_params(jid, jdir, base_filename, num_jfiles, ae, ae_max_jfiles, jfsize_sblks,
+                wcache_num_pages, wcache_pgsize_sblks)),
+        _args_ptr(0),
+        _dtok_master_enq_list(),
+        _dtok_master_txn_list(),
+        _dtok_rd_list(),
+        _dtok_deq_list(),
+        _rd_aio_cv(_rd_aio_mutex),
+        _wr_full_cv(_wr_full_mutex),
+        _rd_list_cv(_rd_list_mutex),
+        _deq_list_cv(_deq_list_mutex),
+        _tcp(),
+        _tcrp()
+{}
+
+jrnl_instance::jrnl_instance(const jrnl_init_params::shared_ptr& p):
+        mrg::journal::jcntl(p->jid(), p->jdir(), p->base_filename()),
+        _jpp(p),
+        _args_ptr(0),
+        _dtok_master_enq_list(),
+        _dtok_master_txn_list(),
+        _dtok_rd_list(),
+        _dtok_deq_list(),
+        _rd_aio_cv(_rd_aio_mutex),
+        _wr_full_cv(_wr_full_mutex),
+        _rd_list_cv(_rd_list_mutex),
+        _deq_list_cv(_deq_list_mutex),
+        _tcp(),
+        _tcrp()
+{}
+
+jrnl_instance::~jrnl_instance() {}
+
+
+void
+jrnl_instance::init_tc(test_case::shared_ptr& tcp, const args* const args_ptr) throw ()
+{
+    test_case_result::shared_ptr p(new test_case_result(_jpp->jid()));
+    _tcrp = p;
+    _args_ptr = args_ptr;
+    try
+    {
+        _tcp = tcp;
+        _dtok_master_enq_list.clear();
+        _dtok_master_txn_list.clear();
+        _dtok_rd_list.clear();
+        _dtok_deq_list.clear();
+
+        if (_args_ptr->recover_mode)
+        {
+            try
+            {
+            u_int64_t highest_rid;
+            recover(_jpp->num_jfiles(), _jpp->is_ae(), _jpp->ae_max_jfiles(), _jpp->jfsize_sblks(),
+                    _jpp->wcache_num_pages(), _jpp->wcache_pgsize_sblks(), this,
+                    0, highest_rid);
+            recover_complete();
+            }
+            catch (const mrg::journal::jexception& e)
+            {
+                if (e.err_code() == mrg::journal::jerrno::JERR_JDIR_STAT)
+                    initialize(_jpp->num_jfiles(), _jpp->is_ae(), _jpp->ae_max_jfiles(), _jpp->jfsize_sblks(),
+                            _jpp->wcache_num_pages(), _jpp->wcache_pgsize_sblks(), this);
+                else
+                    throw;
+            }
+        }
+        else
+            initialize(_jpp->num_jfiles(), _jpp->is_ae(), _jpp->ae_max_jfiles(), _jpp->jfsize_sblks(),
+                    _jpp->wcache_num_pages(), _jpp->wcache_pgsize_sblks(), this);
+    }
+    catch (const mrg::journal::jexception& e) { _tcrp->add_exception(e); }
+    catch (const std::exception& e) { _tcrp->add_exception(e.what()); }
+    catch (...) { _tcrp->add_exception("Unknown exception"); }
+}
+
+void
+jrnl_instance::run_tc() throw ()
+{
+    _tcrp->set_start_time();
+    ::pthread_create(&_enq_thread, 0, run_enq, this);
+    ::pthread_create(&_read_thread, 0, run_read, this);
+    ::pthread_create(&_deq_thread, 0, run_deq, this);
+}
+
+void
+jrnl_instance::tc_wait_compl() throw ()
+{
+    try
+    {
+        ::pthread_join(_deq_thread, 0);
+        ::pthread_join(_read_thread, 0);
+        ::pthread_join(_enq_thread, 0);
+        stop(true);
+    }
+    catch (const mrg::journal::jexception& e) { _tcrp->add_exception(e); panic(); }
+    catch (const std::exception& e) { _tcrp->add_exception(e.what()); panic(); }
+    catch (...) { _tcrp->add_exception("Unknown exception"); panic(); }
+    _lpmgr.finalize();
+    _tcrp->set_stop_time();
+    _tcp->add_result(_tcrp);
+}
+
+void
+jrnl_instance::run_enq() throw ()
+{
+    try
+    {
+        unsigned sleep_cnt = 0U;
+        while(_tcrp->num_enq() < _tcp->num_msgs() && !_tcrp->exception())
+        {
+            dtok_ptr p(new mrg::journal::data_tok);
+            _dtok_master_enq_list.push_back(p);
+            const char* msgp = data_src::get_data(_tcrp->num_enq() % 10);
+            const std::size_t msg_size = _tcp->this_data_size();
+            const std::size_t xid_size = _tcp->this_xid_size();
+            const std::string xid(data_src::get_xid(xid_size));
+            const bool external = _tcp->this_external();
+            const bool transient = _tcp->this_transience();
+            mrg::journal::iores res;
+            if (xid_size)
+            {
+                if (external)
+                    res = enqueue_extern_txn_data_record(msg_size, p.get(), xid, transient);
+                else
+                    res = enqueue_txn_data_record(msgp, msg_size, msg_size, p.get(), xid,
+                            transient);
+            }
+            else
+            {
+                if (external)
+                    res = enqueue_extern_data_record(msg_size, p.get(), transient);
+                else
+                    res = enqueue_data_record(msgp, msg_size, msg_size, p.get(), transient);
+            }
+            switch (res)
+            {
+            case mrg::journal::RHM_IORES_SUCCESS:
+                sleep_cnt = 0U;
+                _tcrp->incr_num_enq();
+                if (p->has_xid() && !_tcp->auto_deq())
+                    commit(p.get());
+                break;
+            case mrg::journal::RHM_IORES_ENQCAPTHRESH:
+                if (++sleep_cnt > MAX_ENQCAPTHRESH_CNT)
+                {
+                    _tcrp->add_exception("Timeout waiting for RHM_IORES_ENQCAPTHRESH to clear.");
+                    panic();
+                }
+                else if (get_wr_events(0) == 0) // *** GEV2
+                {
+                    mrg::journal::slock sl(_wr_full_mutex);
+                    _wr_full_cv.waitintvl(MAX_WR_WAIT * 1000000); // MAX_WR_WAIT in ms
+                }
+                break;
+            default:
+                std::ostringstream oss;
+                oss << "ERROR: enqueue operation in journal \"" << _jid << "\" returned ";
+                oss << mrg::journal::iores_str(res) << ".";
+                _tcrp->add_exception(oss.str());
+            }
+        }
+        flush(true);
+    }
+    catch (const mrg::journal::jexception& e) { _tcrp->add_exception(e); panic(); }
+    catch (const std::exception& e) { _tcrp->add_exception(e.what()); panic(); }
+    catch (...) { _tcrp->add_exception("Unknown exception"); panic(); }
+}
+
+void
+jrnl_instance::run_read() throw ()
+{
+    try
+    {
+        read_arg::read_mode_t rd_mode = _args_ptr->read_mode.val();
+        if (rd_mode != read_arg::NONE)
+        {
+            while (_tcrp->num_rproc() < _tcp->num_msgs() && !_tcrp->exception())
+            {
+                journal::data_tok* dtokp = 0;
+                {
+                    mrg::journal::slock sl(_rd_list_mutex);
+                    if (_dtok_rd_list.empty())
+                        _rd_list_cv.wait();
+                    if (!_dtok_rd_list.empty())
+                    {
+                        dtokp = _dtok_rd_list.front();
+                        _dtok_rd_list.pop_front();
+                    }
+                }
+                if (dtokp)
+                {
+                    _tcrp->incr_num_rproc();
+
+                    bool do_read = true;
+                    if (rd_mode == read_arg::RANDOM)
+                        do_read = 1.0 * std::rand() / RAND_MAX <  _args_ptr->read_prob / 100.0;
+                    else if (rd_mode == read_arg::LAZYLOAD)
+                        do_read = _tcrp->num_rproc() >= _args_ptr->lld_skip_num &&
+                                        _tcrp->num_read() < _args_ptr->lld_rd_num;
+                    bool read_compl = false;
+                    while (do_read && !read_compl && !_tcrp->exception())
+                    {
+                        void* dptr = 0;
+                        std::size_t dsize = 0;
+                        void* xptr = 0;
+                        std::size_t xsize = 0;
+                        bool tr = false;
+                        bool ext = false;
+                        mrg::journal::iores res = read_data_record(&dptr, dsize, &xptr, xsize, tr,
+                                ext, dtokp);
+                        switch (res)
+                        {
+                        case mrg::journal::RHM_IORES_SUCCESS:
+                            {
+                                mrg::journal::slock sl(_deq_list_mutex);
+                                _dtok_deq_list.push_back(dtokp);
+                                _deq_list_cv.broadcast();
+                            }
+                            read_compl = true;
+                            _tcrp->incr_num_read();
+
+                            // clean up
+                            if (xsize)
+                                std::free(xptr);
+                            else if (dsize)
+                                std::free(dptr);
+                            dptr = 0;
+                            xptr = 0;
+                            break;
+                        case mrg::journal::RHM_IORES_PAGE_AIOWAIT:
+                            if (get_rd_events(0) == 0)
+                            {
+                                mrg::journal::slock sl(_rd_aio_mutex);
+                                _rd_aio_cv.waitintvl(MAX_RD_WAIT * 1000000); // MAX_RD_WAIT in ms
+                            }
+                            break;
+                        default:
+                            std::ostringstream oss;
+                            oss << "ERROR: read operation in journal \"" << _jid;
+                            oss << "\" returned " << mrg::journal::iores_str(res) << ".";
+                            _tcrp->add_exception(oss.str());
+                            {
+                                mrg::journal::slock sl(_deq_list_mutex);
+                                _deq_list_cv.broadcast(); // wake up deq thread
+                            }
+                        }
+                    }
+                }
+            }
+        }
+    }
+    catch (const mrg::journal::jexception& e) { _tcrp->add_exception(e); panic(); }
+    catch (const std::exception& e) { _tcrp->add_exception(e.what()); panic(); }
+    catch (...) { _tcrp->add_exception("Unknown exception"); panic(); }
+}
+
+void
+jrnl_instance::run_deq() throw ()
+{
+    try
+    {
+        if (_tcp->auto_deq())
+        {
+            while(_tcrp->num_deq() < _tcp->num_msgs() && !_tcrp->exception())
+            {
+                journal::data_tok* dtokp = 0;
+                {
+                    mrg::journal::slock sl(_deq_list_mutex);
+                    if (_dtok_deq_list.empty())
+                        _deq_list_cv.wait();
+                    if (!_dtok_deq_list.empty())
+                    {
+                        dtokp = _dtok_deq_list.front();
+                        _dtok_deq_list.pop_front();
+                    }
+                }
+                if (dtokp)
+                {
+                    mrg::journal::iores res;
+                    if (dtokp->has_xid())
+                        res = dequeue_txn_data_record(dtokp, dtokp->xid());
+                    else
+                        res = dequeue_data_record(dtokp);
+                    if (res == mrg::journal::RHM_IORES_SUCCESS)
+                    {
+                        _tcrp->incr_num_deq();
+                        commit(dtokp);
+                    }
+                    else
+                    {
+                        std::ostringstream oss;
+                        oss << "ERROR: dequeue operation in journal \"" << _jid;
+                        oss << "\" returned " << mrg::journal::iores_str(res) << ".";
+                        _tcrp->add_exception(oss.str());
+                    }
+                }
+            }
+            flush(true);
+        }
+    }
+    catch (const mrg::journal::jexception& e) { _tcrp->add_exception(e); panic(); }
+    catch (const std::exception& e) { _tcrp->add_exception(e.what()); panic(); }
+    catch (...) { _tcrp->add_exception("Unknown exception"); panic(); }
+}
+
+void
+jrnl_instance::abort(const mrg::journal::data_tok* dtokp)
+{
+    txn(dtokp, false);
+}
+
+void
+jrnl_instance::commit(const mrg::journal::data_tok* dtokp)
+{
+    txn(dtokp, true);
+}
+
+void
+jrnl_instance::txn(const mrg::journal::data_tok* dtokp, const bool commit)
+{
+    if (dtokp->has_xid())
+    {
+        mrg::journal::data_tok* p = prep_txn_dtok(dtokp);
+        mrg::journal::iores res = commit ? txn_commit(p, p->xid()) : txn_abort(p, p->xid());
+        if (res != mrg::journal::RHM_IORES_SUCCESS)
+        {
+            std::ostringstream oss;
+            oss << "ERROR: " << (commit ? "commit" : "abort") << " operation in journal \"";
+            oss << _jid << "\" returned " << mrg::journal::iores_str(res) << ".";
+            _tcrp->add_exception(oss.str());
+        }
+    }
+}
+
+mrg::journal::data_tok*
+jrnl_instance::prep_txn_dtok(const mrg::journal::data_tok* dtokp)
+{
+    dtok_ptr p(new mrg::journal::data_tok);
+    _dtok_master_txn_list.push_back(p);
+    p->set_xid(dtokp->xid());
+    return p.get();
+}
+
+void
+jrnl_instance::panic()
+{
+    // In the event of a panic or exception condition, release all waiting CVs
+    _rd_aio_cv.broadcast();
+    _wr_full_cv.broadcast();
+    _rd_list_cv.broadcast();
+    _deq_list_cv.broadcast();
+}
+
+// AIO callbacks
+
+void
+jrnl_instance::wr_aio_cb(std::vector<journal::data_tok*>& dtokl)
+{
+    for (std::vector<journal::data_tok*>::const_iterator i=dtokl.begin(); i!=dtokl.end(); i++)
+    {
+        if ((*i)->wstate() == journal::data_tok::ENQ || (*i)->wstate() == journal::data_tok::DEQ)
+        {
+            journal::data_tok* dtokp = *i;
+            if (dtokp->wstate() == journal::data_tok::ENQ)
+            {
+                if (_args_ptr->read_mode.val() == read_arg::NONE)
+                {
+                    mrg::journal::slock sl(_deq_list_mutex);
+                    _dtok_deq_list.push_back(dtokp);
+                    _deq_list_cv.broadcast();
+                }
+                else
+                {
+                    mrg::journal::slock sl(_rd_list_mutex);
+                    _dtok_rd_list.push_back(dtokp);
+                    _rd_list_cv.broadcast();
+                }
+            }
+            else // DEQ
+            {
+                mrg::journal::slock sl(_wr_full_mutex);
+                _wr_full_cv.broadcast();
+            }
+        }
+    }
+}
+
+void
+jrnl_instance::rd_aio_cb(std::vector<u_int16_t>& /*pil*/)
+{
+    mrg::journal::slock sl(_rd_aio_mutex);
+    _rd_aio_cv.broadcast();
+}
+
+} // namespace jtt
+} // namespace mrg

Added: qpid/trunk/qpid/cpp/src/tests/legacystore/jrnl/jtt/jrnl_instance.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/legacystore/jrnl/jtt/jrnl_instance.h?rev=1530301&view=auto
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/legacystore/jrnl/jtt/jrnl_instance.h (added)
+++ qpid/trunk/qpid/cpp/src/tests/legacystore/jrnl/jtt/jrnl_instance.h Tue Oct  8 15:09:00 2013
@@ -0,0 +1,121 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#ifndef mrg_jtt_jrnl_instance_hpp
+#define mrg_jtt_jrnl_instance_hpp
+
+#include "args.h"
+#include "jrnl_init_params.h"
+#include "test_case.h"
+
+#include <boost/shared_ptr.hpp>
+#include "qpid/legacystore/jrnl/cvar.h"
+#include "qpid/legacystore/jrnl/data_tok.h"
+#include "qpid/legacystore/jrnl/jcntl.h"
+#include "qpid/legacystore/jrnl/slock.h"
+#include "qpid/legacystore/jrnl/smutex.h"
+#include <list>
+#include <vector>
+
+namespace mrg
+{
+namespace jtt
+{
+
+    class jrnl_instance : public mrg::journal::jcntl, public virtual mrg::journal::aio_callback
+    {
+    public:
+        typedef boost::shared_ptr<jrnl_instance> shared_ptr;
+        typedef boost::shared_ptr<journal::data_tok> dtok_ptr;
+
+    private:
+        jrnl_init_params::shared_ptr _jpp;
+        const args* _args_ptr;
+        std::vector<dtok_ptr> _dtok_master_enq_list;
+        std::vector<dtok_ptr> _dtok_master_txn_list;
+        std::list<journal::data_tok*> _dtok_rd_list;
+        std::list<journal::data_tok*> _dtok_deq_list;
+        mrg::journal::smutex _rd_aio_mutex;     ///< Mutex for read aio wait conditions
+        mrg::journal::cvar _rd_aio_cv;          ///< Condition var for read aio wait conditions
+        mrg::journal::smutex _wr_full_mutex;    ///< Mutex for write full conditions
+        mrg::journal::cvar _wr_full_cv;         ///< Condition var for write full conditions
+        mrg::journal::smutex _rd_list_mutex;    ///< Mutex for _dtok_rd_list
+        mrg::journal::cvar _rd_list_cv;         ///< Condition var for _dtok_rd_list
+        mrg::journal::smutex _deq_list_mutex;   ///< Mutex for _dtok_deq_list
+        mrg::journal::cvar _deq_list_cv;        ///< Condition var for _dtok_deq_list
+        pthread_t _enq_thread;
+        pthread_t _deq_thread;
+        pthread_t _read_thread;
+        test_case::shared_ptr _tcp;
+        test_case_result::shared_ptr _tcrp;
+
+    public:
+        jrnl_instance(const std::string& jid, const std::string& jdir,
+            const std::string& base_filename,
+            const u_int16_t num_jfiles = jrnl_init_params::def_num_jfiles,
+            const bool ae = jrnl_init_params::def_ae,
+            const u_int16_t ae_max_jfiles = jrnl_init_params::def_ae_max_jfiles,
+            const u_int32_t jfsize_sblks = jrnl_init_params::def_jfsize_sblks,
+            const u_int16_t wcache_num_pages = jrnl_init_params::def_wcache_num_pages,
+            const u_int32_t wcache_pgsize_sblks = jrnl_init_params::def_wcache_pgsize_sblks);
+        jrnl_instance(const jrnl_init_params::shared_ptr& params);
+        virtual ~jrnl_instance();
+
+        inline const jrnl_init_params::shared_ptr& params() const { return _jpp; }
+        inline const std::string& jid() const { return _jpp->jid(); }
+
+        void init_tc(test_case::shared_ptr& tcp, const args* const args_ptr) throw ();
+        void run_tc() throw ();
+        void tc_wait_compl() throw ();
+
+        // AIO callbacks
+        virtual void wr_aio_cb(std::vector<journal::data_tok*>& dtokl);
+        virtual void rd_aio_cb(std::vector<u_int16_t>& pil);
+
+    private:
+        void run_enq() throw ();
+        inline static void* run_enq(void* p)
+                { static_cast<jrnl_instance*>(p)->run_enq(); return 0; }
+
+        void run_read() throw ();
+        inline static void* run_read(void* p)
+                { static_cast<jrnl_instance*>(p)->run_read(); return 0; }
+
+        void run_deq() throw ();
+        inline static void* run_deq(void* p)
+                { static_cast<jrnl_instance*>(p)->run_deq(); return 0; }
+
+        void abort(const mrg::journal::data_tok* dtokp);
+        void commit(const mrg::journal::data_tok* dtokp);
+        void txn(const mrg::journal::data_tok* dtokp, const bool commit);
+        mrg::journal::data_tok* prep_txn_dtok(const mrg::journal::data_tok* dtokp);
+
+        void panic();
+
+//         // static callbacks
+//         static void aio_rd_callback(jcntl* journal, std::vector<u_int16_t>& pil);
+//         static void aio_wr_callback(jcntl* journal, std::vector<journal::data_tok*>& dtokl);
+    };
+
+} // namespace jtt
+} // namespace mrg
+
+#endif // ifndef mrg_jtt_jrnl_instance_hpp



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org