You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ch...@apache.org on 2012/12/19 21:34:58 UTC

svn commit: r1424091 [5/9] - in /qpid/trunk/qpid/cpp/src: ./ qpid/legacystore/ qpid/legacystore/jrnl/

Added: qpid/trunk/qpid/cpp/src/qpid/legacystore/jrnl/jcntl.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/legacystore/jrnl/jcntl.cpp?rev=1424091&view=auto
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/legacystore/jrnl/jcntl.cpp (added)
+++ qpid/trunk/qpid/cpp/src/qpid/legacystore/jrnl/jcntl.cpp Wed Dec 19 20:34:56 2012
@@ -0,0 +1,987 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+/**
+ * \file jcntl.cpp
+ *
+ * Qpid asynchronous store plugin library
+ *
+ * Messaging journal top-level control and interface class
+ * mrg::journal::jcntl.  See comments in file jcntl.hpp for details.
+ *
+ * \author Kim van der Riet
+ *
+ * Copyright (c) 2007, 2008, 2009, 2010 Red Hat, Inc.
+ *
+ */
+
+
+#include "jrnl/jcntl.hpp"
+
+#include <algorithm>
+#include <cassert>
+#include <cerrno>
+#include <cstdlib>
+#include <cstring>
+#include <fstream>
+#include <iomanip>
+#include <iostream>
+#include "jrnl/file_hdr.hpp"
+#include "jrnl/jerrno.hpp"
+#include "jrnl/jinf.hpp"
+#include <limits>
+#include <sstream>
+#include <unistd.h>
+
+namespace mrg
+{
+namespace journal
+{
+
+#define AIO_CMPL_TIMEOUT_SEC   5
+#define AIO_CMPL_TIMEOUT_NSEC  0
+#define FINAL_AIO_CMPL_TIMEOUT_SEC   15
+#define FINAL_AIO_CMPL_TIMEOUT_NSEC  0
+
+// Static
+timespec jcntl::_aio_cmpl_timeout; ///< Timeout for blocking libaio returns
+timespec jcntl::_final_aio_cmpl_timeout; ///< Timeout for blocking libaio returns when stopping or finalizing
+bool jcntl::_init = init_statics();
+bool jcntl::init_statics()
+{
+    _aio_cmpl_timeout.tv_sec = AIO_CMPL_TIMEOUT_SEC;
+    _aio_cmpl_timeout.tv_nsec = AIO_CMPL_TIMEOUT_NSEC;
+    _final_aio_cmpl_timeout.tv_sec = FINAL_AIO_CMPL_TIMEOUT_SEC;
+    _final_aio_cmpl_timeout.tv_nsec = FINAL_AIO_CMPL_TIMEOUT_NSEC;
+    return true;
+}
+
+
+// Functions
+
+jcntl::jcntl(const std::string& jid, const std::string& jdir, const std::string& base_filename):
+    _jid(jid),
+    _jdir(jdir, base_filename),
+    _base_filename(base_filename),
+    _init_flag(false),
+    _stop_flag(false),
+    _readonly_flag(false),
+    _autostop(true),
+    _jfsize_sblks(0),
+    _lpmgr(),
+    _emap(),
+    _tmap(),
+    _rrfc(&_lpmgr),
+    _wrfc(&_lpmgr),
+    _rmgr(this, _emap, _tmap, _rrfc),
+    _wmgr(this, _emap, _tmap, _wrfc),
+    _rcvdat()
+{}
+
+jcntl::~jcntl()
+{
+    if (_init_flag && !_stop_flag)
+        try { stop(true); }
+        catch (const jexception& e) { std::cerr << e << std::endl; }
+    _lpmgr.finalize();
+}
+
+void
+jcntl::initialize(const u_int16_t num_jfiles, const bool ae, const u_int16_t ae_max_jfiles,
+        const u_int32_t jfsize_sblks, const u_int16_t wcache_num_pages, const u_int32_t wcache_pgsize_sblks,
+        aio_callback* const cbp)
+{
+    _init_flag = false;
+    _stop_flag = false;
+    _readonly_flag = false;
+
+    _emap.clear();
+    _tmap.clear();
+
+    _lpmgr.finalize();
+
+    // Set new file geometry parameters
+    assert(num_jfiles >= JRNL_MIN_NUM_FILES);
+    assert(num_jfiles <= JRNL_MAX_NUM_FILES);
+    _emap.set_num_jfiles(num_jfiles);
+    _tmap.set_num_jfiles(num_jfiles);
+
+    assert(jfsize_sblks >= JRNL_MIN_FILE_SIZE);
+    assert(jfsize_sblks <= JRNL_MAX_FILE_SIZE);
+    _jfsize_sblks = jfsize_sblks;
+
+    // Clear any existing journal files
+    _jdir.clear_dir();
+    _lpmgr.initialize(num_jfiles, ae, ae_max_jfiles, this, &new_fcntl);
+
+    _wrfc.initialize(_jfsize_sblks);
+    _rrfc.initialize();
+    _rrfc.set_findex(0);
+    _rmgr.initialize(cbp);
+    _wmgr.initialize(cbp, wcache_pgsize_sblks, wcache_num_pages, JRNL_WMGR_MAXDTOKPP, JRNL_WMGR_MAXWAITUS);
+
+    // Write info file (<basename>.jinf) to disk
+    write_infofile();
+
+    _init_flag = true;
+}
+
+void
+jcntl::recover(const u_int16_t num_jfiles, const bool ae, const u_int16_t ae_max_jfiles,
+        const u_int32_t jfsize_sblks, const u_int16_t wcache_num_pages, const u_int32_t wcache_pgsize_sblks,
+//         const rd_aio_cb rd_cb, const wr_aio_cb wr_cb, const std::vector<std::string>* prep_txn_list_ptr,
+        aio_callback* const cbp, const std::vector<std::string>* prep_txn_list_ptr,
+        u_int64_t& highest_rid)
+{
+    _init_flag = false;
+    _stop_flag = false;
+    _readonly_flag = false;
+
+    _emap.clear();
+    _tmap.clear();
+
+    _lpmgr.finalize();
+
+    assert(num_jfiles >= JRNL_MIN_NUM_FILES);
+    assert(num_jfiles <= JRNL_MAX_NUM_FILES);
+    assert(jfsize_sblks >= JRNL_MIN_FILE_SIZE);
+    assert(jfsize_sblks <= JRNL_MAX_FILE_SIZE);
+    _jfsize_sblks = jfsize_sblks;
+
+    // Verify journal dir and journal files
+    _jdir.verify_dir();
+    _rcvdat.reset(num_jfiles, ae, ae_max_jfiles);
+
+    rcvr_janalyze(_rcvdat, prep_txn_list_ptr);
+    highest_rid = _rcvdat._h_rid;
+    if (_rcvdat._jfull)
+        throw jexception(jerrno::JERR_JCNTL_RECOVERJFULL, "jcntl", "recover");
+    this->log(LOG_DEBUG, _rcvdat.to_log(_jid));
+
+    _lpmgr.recover(_rcvdat, this, &new_fcntl);
+
+    _wrfc.initialize(_jfsize_sblks, &_rcvdat);
+    _rrfc.initialize();
+    _rrfc.set_findex(_rcvdat.ffid());
+    _rmgr.initialize(cbp);
+    _wmgr.initialize(cbp, wcache_pgsize_sblks, wcache_num_pages, JRNL_WMGR_MAXDTOKPP, JRNL_WMGR_MAXWAITUS,
+            (_rcvdat._lffull ? 0 : _rcvdat._eo));
+
+    _readonly_flag = true;
+    _init_flag = true;
+}
+
+void
+jcntl::recover_complete()
+{
+    if (!_readonly_flag)
+        throw jexception(jerrno::JERR_JCNTL_NOTRECOVERED, "jcntl", "recover_complete");
+    for (u_int16_t i=0; i<_lpmgr.num_jfiles(); i++)
+        _lpmgr.get_fcntlp(i)->reset(&_rcvdat);
+    _wrfc.initialize(_jfsize_sblks, &_rcvdat);
+    _rrfc.initialize();
+    _rrfc.set_findex(_rcvdat.ffid());
+    _rmgr.recover_complete();
+    _readonly_flag = false;
+}
+
+void
+jcntl::delete_jrnl_files()
+{
+    stop(true); // wait for AIO to complete
+    _jdir.delete_dir();
+}
+
+
+iores
+jcntl::enqueue_data_record(const void* const data_buff, const std::size_t tot_data_len,
+        const std::size_t this_data_len, data_tok* dtokp, const bool transient)
+{
+    iores r;
+    check_wstatus("enqueue_data_record");
+    {
+        slock s(_wr_mutex);
+        while (handle_aio_wait(_wmgr.enqueue(data_buff, tot_data_len, this_data_len, dtokp, 0, 0, transient, false), r,
+                        dtokp)) ;
+    }
+    return r;
+}
+
+iores
+jcntl::enqueue_extern_data_record(const std::size_t tot_data_len, data_tok* dtokp, const bool transient)
+{
+    iores r;
+    check_wstatus("enqueue_extern_data_record");
+    {
+        slock s(_wr_mutex);
+        while (handle_aio_wait(_wmgr.enqueue(0, tot_data_len, 0, dtokp, 0, 0, transient, true), r, dtokp)) ;
+    }
+    return r;
+}
+
+iores
+jcntl::enqueue_txn_data_record(const void* const data_buff, const std::size_t tot_data_len,
+        const std::size_t this_data_len, data_tok* dtokp, const std::string& xid,
+        const bool transient)
+{
+    iores r;
+    check_wstatus("enqueue_tx_data_record");
+    {
+        slock s(_wr_mutex);
+        while (handle_aio_wait(_wmgr.enqueue(data_buff, tot_data_len, this_data_len, dtokp, xid.data(), xid.size(),
+                        transient, false), r, dtokp)) ;
+    }
+    return r;
+}
+
+iores
+jcntl::enqueue_extern_txn_data_record(const std::size_t tot_data_len, data_tok* dtokp,
+        const std::string& xid, const bool transient)
+{
+    iores r;
+    check_wstatus("enqueue_extern_txn_data_record");
+    {
+        slock s(_wr_mutex);
+        while (handle_aio_wait(_wmgr.enqueue(0, tot_data_len, 0, dtokp, xid.data(), xid.size(), transient, true), r,
+                        dtokp)) ;
+    }
+    return r;
+}
+
+/* TODO
+iores
+jcntl::get_data_record(const u_int64_t& rid, const std::size_t& dsize, const std::size_t& dsize_avail,
+        const void** const data, bool auto_discard)
+{
+    check_rstatus("get_data_record");
+    return _rmgr.get(rid, dsize, dsize_avail, data, auto_discard);
+} */
+
+/* TODO
+iores
+jcntl::discard_data_record(data_tok* const dtokp)
+{
+    check_rstatus("discard_data_record");
+    return _rmgr.discard(dtokp);
+} */
+
+iores
+jcntl::read_data_record(void** const datapp, std::size_t& dsize, void** const xidpp, std::size_t& xidsize,
+        bool& transient, bool& external, data_tok* const dtokp, bool ignore_pending_txns)
+{
+    check_rstatus("read_data");
+    iores res = _rmgr.read(datapp, dsize, xidpp, xidsize, transient, external, dtokp, ignore_pending_txns);
+    if (res == RHM_IORES_RCINVALID)
+    {
+        get_wr_events(0); // check for outstanding write events
+        iores sres = _rmgr.synchronize(); // flushes all outstanding read events
+        if (sres != RHM_IORES_SUCCESS)
+            return sres;
+        _rmgr.wait_for_validity(&_aio_cmpl_timeout, true); // throw if timeout occurs
+        res = _rmgr.read(datapp, dsize, xidpp, xidsize, transient, external, dtokp, ignore_pending_txns);
+    }
+    return res;
+}
+
+iores
+jcntl::dequeue_data_record(data_tok* const dtokp, const bool txn_coml_commit)
+{
+    iores r;
+    check_wstatus("dequeue_data");
+    {
+        slock s(_wr_mutex);
+        while (handle_aio_wait(_wmgr.dequeue(dtokp, 0, 0, txn_coml_commit), r, dtokp)) ;
+    }
+    return r;
+}
+
+iores
+jcntl::dequeue_txn_data_record(data_tok* const dtokp, const std::string& xid, const bool txn_coml_commit)
+{
+    iores r;
+    check_wstatus("dequeue_data");
+    {
+        slock s(_wr_mutex);
+        while (handle_aio_wait(_wmgr.dequeue(dtokp, xid.data(), xid.size(), txn_coml_commit), r, dtokp)) ;
+    }
+    return r;
+}
+
+iores
+jcntl::txn_abort(data_tok* const dtokp, const std::string& xid)
+{
+    iores r;
+    check_wstatus("txn_abort");
+    {
+        slock s(_wr_mutex);
+        while (handle_aio_wait(_wmgr.abort(dtokp, xid.data(), xid.size()), r, dtokp)) ;
+    }
+    return r;
+}
+
+iores
+jcntl::txn_commit(data_tok* const dtokp, const std::string& xid)
+{
+    iores r;
+    check_wstatus("txn_commit");
+    {
+        slock s(_wr_mutex);
+        while (handle_aio_wait(_wmgr.commit(dtokp, xid.data(), xid.size()), r, dtokp)) ;
+    }
+    return r;
+}
+
+bool
+jcntl::is_txn_synced(const std::string& xid)
+{
+    slock s(_wr_mutex);
+    bool res = _wmgr.is_txn_synced(xid);
+    return res;
+}
+
+int32_t
+jcntl::get_wr_events(timespec* const timeout)
+{
+    stlock t(_wr_mutex);
+    if (!t.locked())
+        return jerrno::LOCK_TAKEN;
+    int32_t res = _wmgr.get_events(pmgr::UNUSED, timeout);
+    return res;
+}
+
+int32_t
+jcntl::get_rd_events(timespec* const timeout)
+{
+    return _rmgr.get_events(pmgr::AIO_COMPLETE, timeout);
+}
+
+void
+jcntl::stop(const bool block_till_aio_cmpl)
+{
+    if (_readonly_flag)
+        check_rstatus("stop");
+    else
+        check_wstatus("stop");
+    _stop_flag = true;
+    if (!_readonly_flag)
+        flush(block_till_aio_cmpl);
+    _rrfc.finalize();
+    _lpmgr.finalize();
+}
+
+u_int16_t
+jcntl::get_earliest_fid()
+{
+    u_int16_t ffid = _wrfc.earliest_index();
+    u_int16_t fid = _wrfc.index();
+    while ( _emap.get_enq_cnt(ffid) == 0 && _tmap.get_txn_pfid_cnt(ffid) == 0 && ffid != fid)
+    {
+        if (++ffid >= _lpmgr.num_jfiles())
+            ffid = 0;
+    }
+    if (!_rrfc.is_active())
+        _rrfc.set_findex(ffid);
+    return ffid;
+}
+
+iores
+jcntl::flush(const bool block_till_aio_cmpl)
+{
+    if (!_init_flag)
+        return RHM_IORES_SUCCESS;
+    if (_readonly_flag)
+        throw jexception(jerrno::JERR_JCNTL_READONLY, "jcntl", "flush");
+    iores res;
+    {
+        slock s(_wr_mutex);
+        res = _wmgr.flush();
+    }
+    if (block_till_aio_cmpl)
+        aio_cmpl_wait();
+    return res;
+}
+
+void
+jcntl::log(log_level ll, const std::string& log_stmt) const
+{
+    log(ll, log_stmt.c_str());
+}
+
+void
+jcntl::log(log_level ll, const char* const log_stmt) const
+{
+    if (ll > LOG_INFO)
+    {
+        std::cout << log_level_str(ll) << ": Journal \"" << _jid << "\": " << log_stmt << std::endl;
+    }
+}
+
+void
+jcntl::chk_wr_frot()
+{
+    if (_wrfc.index() == _rrfc.index())
+        _rmgr.invalidate();
+}
+
+void
+jcntl::fhdr_wr_sync(const u_int16_t lid)
+{
+    fcntl* fcntlp = _lpmgr.get_fcntlp(lid);
+    while (fcntlp->wr_fhdr_aio_outstanding())
+    {
+        if (get_wr_events(&_aio_cmpl_timeout) == jerrno::AIO_TIMEOUT)
+            throw jexception(jerrno::JERR_JCNTL_AIOCMPLWAIT, "jcntl", "fhdr_wr_sync");
+    }
+}
+
+fcntl*
+jcntl::new_fcntl(jcntl* const jcp, const u_int16_t lid, const u_int16_t fid, const rcvdat* const rdp)
+{
+    if (!jcp) return 0;
+    std::ostringstream oss;
+    oss << jcp->jrnl_dir() << "/" << jcp->base_filename();
+    return new fcntl(oss.str(), fid, lid, jcp->jfsize_sblks(), rdp);
+}
+
+// Protected/Private functions
+
+void
+jcntl::check_wstatus(const char* fn_name) const
+{
+    if (!_init_flag)
+        throw jexception(jerrno::JERR__NINIT, "jcntl", fn_name);
+    if (_readonly_flag)
+        throw jexception(jerrno::JERR_JCNTL_READONLY, "jcntl", fn_name);
+    if (_stop_flag)
+        throw jexception(jerrno::JERR_JCNTL_STOPPED, "jcntl", fn_name);
+}
+
+void
+jcntl::check_rstatus(const char* fn_name) const
+{
+    if (!_init_flag)
+        throw jexception(jerrno::JERR__NINIT, "jcntl", fn_name);
+    if (_stop_flag)
+        throw jexception(jerrno::JERR_JCNTL_STOPPED, "jcntl", fn_name);
+}
+
+void
+jcntl::write_infofile() const
+{
+    timespec ts;
+    if (::clock_gettime(CLOCK_REALTIME, &ts))
+    {
+        std::ostringstream oss;
+        oss << FORMAT_SYSERR(errno);
+        throw jexception(jerrno::JERR__RTCLOCK, oss.str(), "jcntl", "write_infofile");
+    }
+    jinf ji(_jid, _jdir.dirname(), _base_filename, _lpmgr.num_jfiles(), _lpmgr.is_ae(), _lpmgr.ae_max_jfiles(),
+            _jfsize_sblks, _wmgr.cache_pgsize_sblks(), _wmgr.cache_num_pages(), ts);
+    ji.write();
+}
+
+void
+jcntl::aio_cmpl_wait()
+{
+    //while (_wmgr.get_aio_evt_rem())
+    while (true)
+    {
+        u_int32_t aer;
+        {
+            slock s(_wr_mutex);
+            aer = _wmgr.get_aio_evt_rem();
+        }
+        if (aer == 0) break; // no events left
+        if (get_wr_events(&_aio_cmpl_timeout) == jerrno::AIO_TIMEOUT)
+            throw jexception(jerrno::JERR_JCNTL_AIOCMPLWAIT, "jcntl", "aio_cmpl_wait");
+    }
+}
+
+bool
+jcntl::handle_aio_wait(const iores res, iores& resout, const data_tok* dtp)
+{
+    resout = res;
+    if (res == RHM_IORES_PAGE_AIOWAIT)
+    {
+        while (_wmgr.curr_pg_blocked())
+        {
+            if (_wmgr.get_events(pmgr::UNUSED, &_aio_cmpl_timeout) == jerrno::AIO_TIMEOUT)
+            {
+                std::ostringstream oss;
+                oss << "get_events() returned JERR_JCNTL_AIOCMPLWAIT; wmgr_status: " << _wmgr.status_str();
+                this->log(LOG_CRITICAL, oss.str());
+                throw jexception(jerrno::JERR_JCNTL_AIOCMPLWAIT, "jcntl", "handle_aio_wait");
+            }
+        }
+        return true;
+    }
+    else if (res == RHM_IORES_FILE_AIOWAIT)
+    {
+        while (_wmgr.curr_file_blocked())
+        {
+            if (_wmgr.get_events(pmgr::UNUSED, &_aio_cmpl_timeout) == jerrno::AIO_TIMEOUT)
+            {
+                std::ostringstream oss;
+                oss << "get_events() returned JERR_JCNTL_AIOCMPLWAIT; wmgr_status: " << _wmgr.status_str();
+                this->log(LOG_CRITICAL, oss.str());
+                throw jexception(jerrno::JERR_JCNTL_AIOCMPLWAIT, "jcntl", "handle_aio_wait");
+            }
+        }
+        _wrfc.wr_reset();
+        resout = RHM_IORES_SUCCESS;
+        data_tok::write_state ws = dtp->wstate();
+        return ws == data_tok::ENQ_PART || ws == data_tok::DEQ_PART || ws == data_tok::ABORT_PART ||
+                ws == data_tok::COMMIT_PART;
+    }
+    return false;
+}
+
+void
+jcntl::rcvr_janalyze(rcvdat& rd, const std::vector<std::string>* prep_txn_list_ptr)
+{
+    jinf ji(_jdir.dirname() + "/" + _base_filename + "." + JRNL_INFO_EXTENSION, true);
+
+    // If the number of files does not tie up with the jinf file from the journal being recovered,
+    // use the jinf data.
+    if (rd._njf != ji.num_jfiles())
+    {
+        std::ostringstream oss;
+        oss << "Recovery found " << ji.num_jfiles() <<
+                " files (different from --num-jfiles value of " << rd._njf << ").";
+        this->log(LOG_WARN, oss.str());
+        rd._njf = ji.num_jfiles();
+        _rcvdat._enq_cnt_list.resize(rd._njf);
+    }
+    _emap.set_num_jfiles(rd._njf);
+    _tmap.set_num_jfiles(rd._njf);
+    if (_jfsize_sblks != ji.jfsize_sblks())
+    {
+        std::ostringstream oss;
+        oss << "Recovery found file size = " << (ji.jfsize_sblks() / JRNL_RMGR_PAGE_SIZE) <<
+                " (different from --jfile-size-pgs value of " <<
+                (_jfsize_sblks / JRNL_RMGR_PAGE_SIZE) << ").";
+        this->log(LOG_WARN, oss.str());
+        _jfsize_sblks = ji.jfsize_sblks();
+    }
+    if (_jdir.dirname().compare(ji.jdir()))
+    {
+        std::ostringstream oss;
+        oss << "Journal file location change: original = \"" << ji.jdir() <<
+                "\"; current = \"" << _jdir.dirname() << "\"";
+        this->log(LOG_WARN, oss.str());
+        ji.set_jdir(_jdir.dirname());
+    }
+
+    try
+    {
+        rd._ffid = ji.get_first_pfid();
+        rd._lfid = ji.get_last_pfid();
+        rd._owi = ji.get_initial_owi();
+        rd._frot = ji.get_frot();
+        rd._jempty = false;
+        ji.get_normalized_pfid_list(rd._fid_list); // _pfid_list
+    }
+    catch (const jexception& e)
+    {
+        if (e.err_code() != jerrno::JERR_JINF_JDATEMPTY) throw;
+    }
+
+    // Restore all read and write pointers and transactions
+    if (!rd._jempty)
+    {
+        u_int16_t fid = rd._ffid;
+        std::ifstream ifs;
+        bool lowi = rd._owi; // local copy of owi to be used during analysis
+        while (rcvr_get_next_record(fid, &ifs, lowi, rd)) ;
+        if (ifs.is_open()) ifs.close();
+
+        // Remove all txns from tmap that are not in the prepared list
+        if (prep_txn_list_ptr)
+        {
+            std::vector<std::string> xid_list;
+            _tmap.xid_list(xid_list);
+            for (std::vector<std::string>::iterator itr = xid_list.begin(); itr != xid_list.end(); itr++)
+            {
+                std::vector<std::string>::const_iterator pitr =
+                        std::find(prep_txn_list_ptr->begin(), prep_txn_list_ptr->end(), *itr);
+                if (pitr == prep_txn_list_ptr->end()) // not found in prepared list
+                {
+                    txn_data_list tdl = _tmap.get_remove_tdata_list(*itr); // tdl will be empty if xid not found
+                    // Unlock any affected enqueues in emap
+                    for (tdl_itr i=tdl.begin(); i<tdl.end(); i++)
+                    {
+                        if (i->_enq_flag) // enq op - decrement enqueue count
+                            rd._enq_cnt_list[i->_pfid]--;
+                        else if (_emap.is_enqueued(i->_drid, true)) // deq op - unlock enq record
+                        {
+                            int16_t ret = _emap.unlock(i->_drid);
+                            if (ret < enq_map::EMAP_OK) // fail
+                            {
+                                // enq_map::unlock()'s only error is enq_map::EMAP_RID_NOT_FOUND
+                                std::ostringstream oss;
+                                oss << std::hex << "_emap.unlock(): drid=0x\"" << i->_drid;
+                                throw jexception(jerrno::JERR_MAP_NOTFOUND, oss.str(), "jcntl", "rcvr_janalyze");
+                            }
+                        }
+                    }
+                }
+            }
+        }
+
+        // Check for file full condition - add one to _jfsize_sblks to account for file header
+        rd._lffull = rd._eo == (1 + _jfsize_sblks) * JRNL_SBLK_SIZE * JRNL_DBLK_SIZE;
+
+        // Check for journal full condition
+        u_int16_t next_wr_fid = (rd._lfid + 1) % rd._njf;
+        rd._jfull = rd._ffid == next_wr_fid && rd._enq_cnt_list[next_wr_fid] && rd._lffull;
+    }
+}
+
+bool
+jcntl::rcvr_get_next_record(u_int16_t& fid, std::ifstream* ifsp, bool& lowi, rcvdat& rd)
+{
+    std::size_t cum_size_read = 0;
+    void* xidp = 0;
+    rec_hdr h;
+
+    bool hdr_ok = false;
+    std::streampos file_pos;
+    while (!hdr_ok)
+    {
+        if (!ifsp->is_open())
+        {
+            if (!jfile_cycle(fid, ifsp, lowi, rd, true))
+                return false;
+        }
+        file_pos = ifsp->tellg();
+        ifsp->read((char*)&h, sizeof(rec_hdr));
+        if (ifsp->gcount() == sizeof(rec_hdr))
+            hdr_ok = true;
+        else
+        {
+            if (!jfile_cycle(fid, ifsp, lowi, rd, true))
+                return false;
+        }
+    }
+
+    switch(h._magic)
+    {
+        case RHM_JDAT_ENQ_MAGIC:
+            {
+                enq_rec er;
+                u_int16_t start_fid = fid; // fid may increment in decode() if record folds over file boundary
+                if (!decode(er, fid, ifsp, cum_size_read, h, lowi, rd, file_pos))
+                    return false;
+                if (!er.is_transient()) // Ignore transient msgs
+                {
+                    rd._enq_cnt_list[start_fid]++;
+                    if (er.xid_size())
+                    {
+                        er.get_xid(&xidp);
+                        assert(xidp != 0);
+                        std::string xid((char*)xidp, er.xid_size());
+                        _tmap.insert_txn_data(xid, txn_data(h._rid, 0, start_fid, true));
+                        if (_tmap.set_aio_compl(xid, h._rid) < txn_map::TMAP_OK) // fail - xid or rid not found
+                        {
+                            std::ostringstream oss;
+                            oss << std::hex << "_tmap.set_aio_compl: txn_enq xid=\"" << xid << "\" rid=0x" << h._rid;
+                            throw jexception(jerrno::JERR_MAP_NOTFOUND, oss.str(), "jcntl", "rcvr_get_next_record");
+                        }
+                        std::free(xidp);
+                    }
+                    else
+                    {
+                        if (_emap.insert_pfid(h._rid, start_fid) < enq_map::EMAP_OK) // fail
+                        {
+                            // The only error code emap::insert_pfid() returns is enq_map::EMAP_DUP_RID.
+                            std::ostringstream oss;
+                            oss << std::hex << "rid=0x" << h._rid << " _pfid=0x" << start_fid;
+                            throw jexception(jerrno::JERR_MAP_DUPLICATE, oss.str(), "jcntl", "rcvr_get_next_record");
+                        }
+                    }
+                }
+            }
+            break;
+        case RHM_JDAT_DEQ_MAGIC:
+            {
+                deq_rec dr;
+                u_int16_t start_fid = fid; // fid may increment in decode() if record folds over file boundary
+                if (!decode(dr, fid, ifsp, cum_size_read, h, lowi, rd, file_pos))
+                    return false;
+                if (dr.xid_size())
+                {
+                    // If the enqueue is part of a pending txn, it will not yet be in emap
+                    _emap.lock(dr.deq_rid()); // ignore not found error
+                    dr.get_xid(&xidp);
+                    assert(xidp != 0);
+                    std::string xid((char*)xidp, dr.xid_size());
+                    _tmap.insert_txn_data(xid, txn_data(dr.rid(), dr.deq_rid(), start_fid, false,
+                            dr.is_txn_coml_commit()));
+                    if (_tmap.set_aio_compl(xid, dr.rid()) < txn_map::TMAP_OK) // fail - xid or rid not found
+                    {
+                        std::ostringstream oss;
+                        oss << std::hex << "_tmap.set_aio_compl: txn_deq xid=\"" << xid << "\" rid=0x" << dr.rid();
+                        throw jexception(jerrno::JERR_MAP_NOTFOUND, oss.str(), "jcntl", "rcvr_get_next_record");
+                    }
+                    std::free(xidp);
+                }
+                else
+                {
+                    int16_t enq_fid = _emap.get_remove_pfid(dr.deq_rid(), true);
+                    if (enq_fid >= enq_map::EMAP_OK) // ignore not found error
+                        rd._enq_cnt_list[enq_fid]--;
+                }
+            }
+            break;
+        case RHM_JDAT_TXA_MAGIC:
+            {
+                txn_rec ar;
+                if (!decode(ar, fid, ifsp, cum_size_read, h, lowi, rd, file_pos))
+                    return false;
+                // Delete this txn from tmap, unlock any locked records in emap
+                ar.get_xid(&xidp);
+                assert(xidp != 0);
+                std::string xid((char*)xidp, ar.xid_size());
+                txn_data_list tdl = _tmap.get_remove_tdata_list(xid); // tdl will be empty if xid not found
+                for (tdl_itr itr = tdl.begin(); itr != tdl.end(); itr++)
+                {
+                    if (itr->_enq_flag)
+                        rd._enq_cnt_list[itr->_pfid]--;
+                    else
+                        _emap.unlock(itr->_drid); // ignore not found error
+                }
+                std::free(xidp);
+            }
+            break;
+        case RHM_JDAT_TXC_MAGIC:
+            {
+                txn_rec cr;
+                if (!decode(cr, fid, ifsp, cum_size_read, h, lowi, rd, file_pos))
+                    return false;
+                // Delete this txn from tmap, process records into emap
+                cr.get_xid(&xidp);
+                assert(xidp != 0);
+                std::string xid((char*)xidp, cr.xid_size());
+                txn_data_list tdl = _tmap.get_remove_tdata_list(xid); // tdl will be empty if xid not found
+                for (tdl_itr itr = tdl.begin(); itr != tdl.end(); itr++)
+                {
+                    if (itr->_enq_flag) // txn enqueue
+                    {
+                        if (_emap.insert_pfid(itr->_rid, itr->_pfid) < enq_map::EMAP_OK) // fail
+                        {
+                            // The only error code emap::insert_pfid() returns is enq_map::EMAP_DUP_RID.
+                            std::ostringstream oss;
+                            oss << std::hex << "rid=0x" << itr->_rid << " _pfid=0x" << itr->_pfid;
+                            throw jexception(jerrno::JERR_MAP_DUPLICATE, oss.str(), "jcntl", "rcvr_get_next_record");
+                        }
+                    }
+                    else // txn dequeue
+                    {
+                        int16_t enq_fid = _emap.get_remove_pfid(itr->_drid, true);
+                        if (enq_fid >= enq_map::EMAP_OK)
+                            rd._enq_cnt_list[enq_fid]--;
+                    }
+                }
+                std::free(xidp);
+            }
+            break;
+        case RHM_JDAT_EMPTY_MAGIC:
+            {
+                u_int32_t rec_dblks = jrec::size_dblks(sizeof(rec_hdr));
+                ifsp->ignore(rec_dblks * JRNL_DBLK_SIZE - sizeof(rec_hdr));
+                assert(!ifsp->fail() && !ifsp->bad());
+                if (!jfile_cycle(fid, ifsp, lowi, rd, false))
+                    return false;
+            }
+            break;
+        case 0:
+            check_journal_alignment(fid, file_pos, rd);
+            return false;
+        default:
+            // Stop as this is the overwrite boundary.
+            check_journal_alignment(fid, file_pos, rd);
+            return false;
+    }
+    return true;
+}
+
+bool
+jcntl::decode(jrec& rec, u_int16_t& fid, std::ifstream* ifsp, std::size_t& cum_size_read,
+        rec_hdr& h, bool& lowi, rcvdat& rd, std::streampos& file_offs)
+{
+    u_int16_t start_fid = fid;
+    std::streampos start_file_offs = file_offs;
+    if (!check_owi(fid, h, lowi, rd, file_offs))
+        return false;
+    bool done = false;
+    while (!done)
+    {
+        try { done = rec.rcv_decode(h, ifsp, cum_size_read); }
+        catch (const jexception& e)
+        {
+// TODO - review this logic and tidy up how rd._lfid is assigned. See new jinf.get_end_file() fn.
+// Original
+//             if (e.err_code() != jerrno::JERR_JREC_BADRECTAIL ||
+//                     fid != (rd._ffid ? rd._ffid - 1 : _num_jfiles - 1)) throw;
+// Tried this, but did not work
+//             if (e.err_code() != jerrno::JERR_JREC_BADRECTAIL || h._magic != 0) throw;
+            check_journal_alignment(start_fid, start_file_offs, rd);
+//             rd._lfid = start_fid;
+            return false;
+        }
+        if (!done && !jfile_cycle(fid, ifsp, lowi, rd, false))
+        {
+            check_journal_alignment(start_fid, start_file_offs, rd);
+            return false;
+        }
+    }
+    return true;
+}
+
+bool
+jcntl::jfile_cycle(u_int16_t& fid, std::ifstream* ifsp, bool& lowi, rcvdat& rd, const bool jump_fro)
+{
+    if (ifsp->is_open())
+    {
+        if (ifsp->eof() || !ifsp->good())
+        {
+            ifsp->clear();
+            rd._eo = ifsp->tellg(); // remember file offset before closing
+            assert(rd._eo != std::numeric_limits<std::size_t>::max()); // Check for error code -1
+            ifsp->close();
+            if (++fid >= rd._njf)
+            {
+                fid = 0;
+                lowi = !lowi; // Flip local owi
+            }
+            if (fid == rd._ffid) // used up all journal files
+                return false;
+        }
+    }
+    if (!ifsp->is_open())
+    {
+        std::ostringstream oss;
+        oss << _jdir.dirname() << "/" << _base_filename << ".";
+        oss << std::hex << std::setfill('0') << std::setw(4) << fid << "." << JRNL_DATA_EXTENSION;
+        ifsp->clear(); // clear eof flag, req'd for older versions of c++
+        ifsp->open(oss.str().c_str(), std::ios_base::in | std::ios_base::binary);
+        if (!ifsp->good())
+            throw jexception(jerrno::JERR__FILEIO, oss.str(), "jcntl", "jfile_cycle");
+
+        // Read file header
+        file_hdr fhdr;
+        ifsp->read((char*)&fhdr, sizeof(fhdr));
+        assert(ifsp->good());
+        if (fhdr._magic == RHM_JDAT_FILE_MAGIC)
+        {
+            assert(fhdr._lfid == fid);
+            if (!rd._fro)
+                rd._fro = fhdr._fro;
+            std::streamoff foffs = jump_fro ? fhdr._fro : JRNL_DBLK_SIZE * JRNL_SBLK_SIZE;
+            ifsp->seekg(foffs);
+        }
+        else
+        {
+            ifsp->close();
+            return false;
+        }
+    }
+    return true;
+}
+
+bool
+jcntl::check_owi(const u_int16_t fid, rec_hdr& h, bool& lowi, rcvdat& rd, std::streampos& file_pos)
+{
+    if (rd._ffid ? h.get_owi() == lowi : h.get_owi() != lowi) // Overwrite indicator changed
+    {
+        u_int16_t expected_fid = rd._ffid ? rd._ffid - 1 : rd._njf - 1;
+        if (fid == expected_fid)
+        {
+            check_journal_alignment(fid, file_pos, rd);
+            return false;
+        }
+        std::ostringstream oss;
+        oss << std::hex << std::setfill('0') << "Magic=0x" << std::setw(8) << h._magic;
+        oss << " fid=0x" << std::setw(4) << fid << " rid=0x" << std::setw(8) << h._rid;
+        oss << " foffs=0x" << std::setw(8) << file_pos;
+        oss << " expected_fid=0x" << std::setw(4) << expected_fid;
+        throw jexception(jerrno::JERR_JCNTL_OWIMISMATCH, oss.str(), "jcntl",
+                "check_owi");
+    }
+    if (rd._h_rid == 0)
+        rd._h_rid = h._rid;
+    else if (h._rid - rd._h_rid < 0x8000000000000000ULL) // RFC 1982 comparison for unsigned 64-bit
+        rd._h_rid = h._rid;
+    return true;
+}
+
+
+void
+jcntl::check_journal_alignment(const u_int16_t fid, std::streampos& file_pos, rcvdat& rd)
+{
+    unsigned sblk_offs = file_pos % (JRNL_DBLK_SIZE * JRNL_SBLK_SIZE);
+    if (sblk_offs)
+    {
+        {
+            std::ostringstream oss;
+            oss << std::hex << "Bad record alignment found at fid=0x" << fid;
+            oss << " offs=0x" << file_pos << " (likely journal overwrite boundary); " << std::dec;
+            oss << (JRNL_SBLK_SIZE - (sblk_offs/JRNL_DBLK_SIZE)) << " filler record(s) required.";
+            this->log(LOG_WARN, oss.str());
+        }
+        const u_int32_t xmagic = RHM_JDAT_EMPTY_MAGIC;
+        std::ostringstream oss;
+        oss << _jdir.dirname() << "/" << _base_filename << ".";
+        oss << std::hex << std::setfill('0') << std::setw(4) << fid << "." << JRNL_DATA_EXTENSION;
+        std::ofstream ofsp(oss.str().c_str(),
+                std::ios_base::in | std::ios_base::out | std::ios_base::binary);
+        if (!ofsp.good())
+            throw jexception(jerrno::JERR__FILEIO, oss.str(), "jcntl", "check_journal_alignment");
+        ofsp.seekp(file_pos);
+        void* buff = std::malloc(JRNL_DBLK_SIZE);
+        assert(buff != 0);
+        std::memcpy(buff, (const void*)&xmagic, sizeof(xmagic));
+        // Normally, RHM_CLEAN must be set before these fills are done, but this is a recover
+        // situation (i.e. performance is not an issue), and it makes the location of the write
+        // clear should inspection of the file be required.
+        std::memset((char*)buff + sizeof(xmagic), RHM_CLEAN_CHAR, JRNL_DBLK_SIZE - sizeof(xmagic));
+
+        while (file_pos % (JRNL_DBLK_SIZE * JRNL_SBLK_SIZE))
+        {
+            ofsp.write((const char*)buff, JRNL_DBLK_SIZE);
+            assert(!ofsp.fail());
+            std::ostringstream oss;
+            oss << std::hex << "Recover phase write: Wrote filler record: fid=0x" << fid << " offs=0x" << file_pos;
+            this->log(LOG_NOTICE, oss.str());
+            file_pos = ofsp.tellp();
+        }
+        ofsp.close();
+        std::free(buff);
+        rd._lfid = fid;
+        if (!rd._frot)
+            rd._ffid = (fid + 1) % rd._njf;
+        this->log(LOG_INFO, "Bad record alignment fixed.");
+    }
+    rd._eo = file_pos;
+}
+
+} // namespace journal
+} // namespace mrg

Added: qpid/trunk/qpid/cpp/src/qpid/legacystore/jrnl/jcntl.hpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/legacystore/jrnl/jcntl.hpp?rev=1424091&view=auto
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/legacystore/jrnl/jcntl.hpp (added)
+++ qpid/trunk/qpid/cpp/src/qpid/legacystore/jrnl/jcntl.hpp Wed Dec 19 20:34:56 2012
@@ -0,0 +1,725 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+/**
+ * \file jcntl.hpp
+ *
+ * Qpid asynchronous store plugin library
+ *
+ * Messaging journal top-level control and interface class
+ * mrg::journal::jcntl. See class documentation for details.
+ *
+ * \author Kim van der Riet
+ *
+ * Copyright (c) 2007, 2008, 2009, 2010 Red Hat, Inc.
+ *
+ */
+
+#ifndef mrg_journal_jcntl_hpp
+#define mrg_journal_jcntl_hpp
+
+namespace mrg
+{
+namespace journal
+{
+    class jcntl;
+}
+}
+
+#include <cstddef>
+#include <deque>
+#include "jrnl/jdir.hpp"
+#include "jrnl/fcntl.hpp"
+#include "jrnl/lpmgr.hpp"
+#include "jrnl/rcvdat.hpp"
+#include "jrnl/slock.hpp"
+#include "jrnl/smutex.hpp"
+#include "jrnl/rmgr.hpp"
+#include "jrnl/wmgr.hpp"
+#include "jrnl/wrfc.hpp"
+
+namespace mrg
+{
+namespace journal
+{
+
+    /**
+    * \brief Access and control interface for the journal. This is the top-level class for the
+    *     journal.
+    *
+    * This is the top-level journal class; one instance of this class controls one instance of the
+    * journal and all its files and associated control structures. Besides this class, the only
+    * other class that needs to be used at a higher level is the data_tok class, one instance of
+    * which is used per data block written to the journal, and is used to track its status through
+    * the AIO enqueue, read and dequeue process.
+    */
+    class jcntl
+    {
+    protected:
+        /**
+        * \brief Journal ID
+        *
+        * This string uniquely identifies this journal instance. It will most likely be associated
+        * with the identity of the message queue with which it is associated.
+        */
+        // TODO: This is not included in any files at present, add to file_hdr?
+        std::string _jid;
+
+        /**
+        * \brief Journal directory
+        *
+        * This string stores the path to the journal directory. It may be absolute or relative, and
+        * should not end in a file separator character. (e.g. "/fastdisk/jdata" is correct,
+        * "/fastdisk/jdata/" is not.)
+        */
+        jdir _jdir;
+
+        /**
+        * \brief Base filename
+        *
+        * This string contains the base filename used for the journal files. The filenames will
+        * start with this base, and have various sections added to it to derive the final file names
+        * that will be written to disk. No file separator characters should be included here, but
+        * all other legal filename characters are valid.
+        */
+        std::string _base_filename;
+
+        /**
+        * \brief Initialized flag
+        *
+        * This flag starts out set to false, is set to true once this object has been initialized,
+        * either by calling initialize() or recover().
+        */
+        bool _init_flag;
+
+        /**
+        * \brief Stopped flag
+        *
+        * This flag starts out false, and is set to true when stop() is called. At this point, the
+        * journal will no longer accept messages until either initialize() or recover() is called.
+        * There is no way other than through initialization to reset this flag.
+        */
+        // TODO: It would be helpful to distinguish between states stopping and stopped. If stop(true) is called,
+        // then we are stopping, but must wait for all outstanding aios to return before being finally stopped. During
+        // this period, however, no new enqueue/dequeue/read requests may be accepted.
+        bool _stop_flag;
+
+        /**
+        * \brief Read-only state flag used during recover.
+        *
+        * When true, this flag prevents journal write operations (enqueue and dequeue), but
+        * allows read to occur. It is used during recovery, and is reset when recovered() is
+        * called.
+        */
+        bool _readonly_flag;
+
+        /**
+        * \brief If set, calls stop() if the jouranl write pointer overruns dequeue low water
+        *     marker. If not set, then attempts to write will throw exceptions until the journal
+        *     file low water marker moves to the next journal file.
+        */
+        bool _autostop;             ///< Autostop flag - stops journal when overrun occurs
+
+        // Journal control structures
+        u_int32_t _jfsize_sblks;    ///< Journal file size in sblks
+        lpmgr _lpmgr;               ///< LFID-PFID manager tracks inserted journal files
+        enq_map _emap;              ///< Enqueue map for low water mark management
+        txn_map _tmap;              ///< Transaction map open transactions
+        rrfc _rrfc;                 ///< Read journal rotating file controller
+        wrfc _wrfc;                 ///< Write journal rotating file controller
+        rmgr _rmgr;                 ///< Read page manager which manages AIO
+        wmgr _wmgr;                 ///< Write page manager which manages AIO
+        rcvdat _rcvdat;             ///< Recovery data used for recovery
+        smutex _wr_mutex;           ///< Mutex for journal writes
+
+    public:
+        static timespec _aio_cmpl_timeout; ///< Timeout for blocking libaio returns
+        static timespec _final_aio_cmpl_timeout; ///< Timeout for blocking libaio returns when stopping or finalizing
+
+        /**
+        * \brief Journal constructor.
+        *
+        * Constructor which sets the physical file location and base name.
+        *
+        * \param jid A unique identifier for this journal instance.
+        * \param jdir The directory which will contain the journal files.
+        * \param base_filename The string which will be used to start all journal filenames.
+        */
+        jcntl(const std::string& jid, const std::string& jdir, const std::string& base_filename);
+
+        /**
+        * \brief Destructor.
+        */
+        virtual ~jcntl();
+
+        inline const std::string& id() const { return _jid; }
+        inline const std::string& jrnl_dir() const { return _jdir.dirname(); }
+
+        /**
+        * \brief Initialize the journal for storing data.
+        *
+        * Initialize the journal by creating new journal data files and initializing internal
+        * control structures. When complete, the journal will be empty, and ready to store data.
+        *
+        * <b>NOTE: Any existing journal will be ignored by this operation.</b> To use recover
+        * the data from an existing journal, use recover().
+        *
+        * <b>NOTE: If <i>NULL</i> is passed to the deque pointers, they will be internally created
+        * and deleted.</b>
+        *
+        * <b>NOTE: If <i>NULL</i> is passed to the callbacks, internal default callbacks will be
+        * used.</b>
+        *
+        * \param num_jfiles The number of journal files to be created.
+        * \param auto_expand If true, allows journal file auto-expansion. In this mode, the journal will automatically
+        *     add files to the journal if it runs out of space. No more than ae_max_jfiles may be added. If false, then
+        *     no files are added and an exception will be thrown if the journal runs out of file space.
+        * \param ae_max_jfiles Upper limit of journal files for auto-expand mode. When auto_expand is true, this is the
+        *     maximum total number of files allowed in the journal (original plus those added by auto-expand mode). If
+        *     this number of files exist and the journal runs out of space, an exception will be thrown. This number
+        *     must be greater than the num_jfiles parameter value but cannot exceed the maximum number of files for a
+        *     single journal; if num_jfiles is already at its maximum value, then auto-expand will be disabled.
+        * \param jfsize_sblks The size of each journal file expressed in softblocks.
+        * \param wcache_num_pages The number of write cache pages to create.
+        * \param wcache_pgsize_sblks The size in sblks of each write cache page.
+        * \param cbp Pointer to object containing callback functions for read and write operations. May be 0 (NULL).
+        *
+        * \exception TODO
+        */
+        void initialize(const u_int16_t num_jfiles, const bool auto_expand, const u_int16_t ae_max_jfiles,
+                const u_int32_t jfsize_sblks, const u_int16_t wcache_num_pages, const u_int32_t wcache_pgsize_sblks,
+                aio_callback* const cbp);
+
+        /**
+        * /brief Initialize journal by recovering state from previously written journal.
+        *
+        * Initialize journal by recovering state from previously written journal. The journal files
+        * are analyzed, and all records that have not been dequeued and that remain in the journal
+        * will be available for reading. The journal is placed in a read-only state until
+        * recovered() is called; any calls to enqueue or dequeue will fail with an exception
+        * in this state.
+        *
+        * <b>NOTE: If <i>NULL</i> is passed to the deque pointers, they will be internally created
+        * and deleted.</b>
+        *
+        * <b>NOTE: If <i>NULL</i> is passed to the callbacks, internal default callbacks will be
+        * used.</b>
+        *
+        * \param num_jfiles The number of journal files to be created.
+        * \param auto_expand If true, allows journal file auto-expansion. In this mode, the journal will automatically
+        *     add files to the journal if it runs out of space. No more than ae_max_jfiles may be added. If false, then
+        *     no files are added and an exception will be thrown if the journal runs out of file space.
+        * \param ae_max_jfiles Upper limit of journal files for auto-expand mode. When auto_expand is true, this is the
+        *     maximum total number of files allowed in the journal (original plus those added by auto-expand mode). If
+        *     this number of files exist and the journal runs out of space, an exception will be thrown. This number
+        *     must be greater than the num_jfiles parameter value but cannot exceed the maximum number of files for a
+        *     single journal; if num_jfiles is already at its maximum value, then auto-expand will be disabled.
+        * \param jfsize_sblks The size of each journal file expressed in softblocks.
+        * \param wcache_num_pages The number of write cache pages to create.
+        * \param wcache_pgsize_sblks The size in sblks of each write cache page.
+        * \param cbp Pointer to object containing callback functions for read and write operations. May be 0 (NULL).
+        * \param prep_txn_list_ptr
+        * \param highest_rid Returns the highest rid found in the journal during recover
+        *
+        * \exception TODO
+        */
+        void recover(const u_int16_t num_jfiles, const bool auto_expand, const u_int16_t ae_max_jfiles,
+                const u_int32_t jfsize_sblks, const u_int16_t wcache_num_pages, const u_int32_t wcache_pgsize_sblks,
+                aio_callback* const cbp, const std::vector<std::string>* prep_txn_list_ptr, u_int64_t& highest_rid);
+
+        /**
+        * \brief Notification to the journal that recovery is complete and that normal operation
+        *     may resume.
+        *
+        * This call notifies the journal that recovery is complete and that normal operation
+        * may resume. The read pointers are reset so that all records read as a part of recover
+        * may  be re-read during normal operation. The read-only flag is then reset, allowing
+        * enqueue and dequeue operations to resume.
+        *
+        * \exception TODO
+        */
+        void recover_complete();
+
+        /**
+        * \brief Stops journal and deletes all journal files.
+        *
+        * Clear the journal directory of all journal files matching the base filename.
+        *
+        * \exception TODO
+        */
+        void delete_jrnl_files();
+
+        /**
+        * \brief Enqueue data.
+        *
+        * Enqueue data or part thereof. If a large data block is being written, then it may be
+        * enqueued in parts by setting this_data_len to the size of the data being written in this
+        * call. The total data size must be known in advance, however, as this is written into the
+        * record header on the first record write. The state of the write (i.e. how much has been
+        * written so far) is maintained in the data token dtokp. Partial writes will return in state
+        * ENQ_PART.
+        *
+        * Note that a return value of anything other than RHM_IORES_SUCCESS implies that this write
+        * operation did not complete successfully or was partially completed. The action taken under
+        * these conditions depends on the value of the return. For example, RHM_IORES_AIO_WAIT
+        * implies that all pages in the write page cache are waiting for AIO operations to return,
+        * and that the call should be remade after waiting a bit.
+        *
+        * Example: If a write of 99 kB is divided into three equal parts, then the following states
+        * and returns would characterize a successful operation:
+        * <pre>
+        *                            dtok.    dtok.   dtok.
+        * Pperation         Return   wstate() dsize() written() Comment
+        * -----------------+--------+--------+-------+---------+------------------------------------
+        *                            NONE     0       0         Value of dtok before op
+        * edr(99000, 33000) SUCCESS  ENQ_PART 99000   33000     Enqueue part 1
+        * edr(99000, 33000) AIO_WAIT ENQ_PART 99000   50000     Enqueue part 2, not completed
+        * edr(99000, 33000) SUCCESS  ENQ_PART 99000   66000     Enqueue part 2 again
+        * edr(99000, 33000) SUCCESS  ENQ      99000   99000     Enqueue part 3
+        * </pre>
+        *
+        * \param data_buff Pointer to data to be enqueued for this enqueue operation.
+        * \param tot_data_len Total data length.
+        * \param this_data_len Amount to be written in this enqueue operation.
+        * \param dtokp Pointer to data token which contains the details of the enqueue operation.
+        * \param transient Flag indicating transient persistence (ie, ignored on recover).
+        *
+        * \exception TODO
+        */
+        iores enqueue_data_record(const void* const data_buff, const std::size_t tot_data_len,
+                const std::size_t this_data_len, data_tok* dtokp, const bool transient = false);
+
+        iores enqueue_extern_data_record(const std::size_t tot_data_len, data_tok* dtokp,
+                const bool transient = false);
+
+        /**
+        * \brief Enqueue data.
+        *
+        * \param data_buff Pointer to data to be enqueued for this enqueue operation.
+        * \param tot_data_len Total data length.
+        * \param this_data_len Amount to be written in this enqueue operation.
+        * \param dtokp Pointer to data token which contains the details of the enqueue operation.
+        * \param xid String containing xid. An empty string (i.e. length=0) will be considered
+        *     non-transactional.
+        * \param transient Flag indicating transient persistence (ie, ignored on recover).
+        *
+        * \exception TODO
+        */
+        iores enqueue_txn_data_record(const void* const data_buff, const std::size_t tot_data_len,
+                const std::size_t this_data_len, data_tok* dtokp, const std::string& xid,
+                const bool transient = false);
+        iores enqueue_extern_txn_data_record(const std::size_t tot_data_len, data_tok* dtokp,
+                const std::string& xid, const bool transient = false);
+
+        /* TODO
+        **
+        * \brief Retrieve details of next record to be read without consuming the record.
+        *
+        * Retrieve information about current read record. A pointer to the data is returned, along
+        * with the data size and available data size. Data is considered "available" when the AIO
+        * operations to fill page-cache pages from disk have returned, and is ready for consumption.
+        *
+        * If <i>dsize_avail</i> &lt; <i>dsize</i>, then not all of the data is available or part of
+        * the data is in non-contiguous memory, and a subsequent call will update both the pointer
+        * and <i>dsize_avail</i> if more pages have returned from AIO.
+        *
+        * The <i>dsize_avail</i> parameter will return the amount of data from this record that is
+        * available in the page cache as contiguous memory, even if it spans page cache boundaries.
+        * However, if a record spans the end of the page cache and continues at the beginning, even
+        * if both parts are ready for consumption, then this must be divided into at least two
+        * get_data_record() operations, as the data is contained in at least two non-contiguous
+        * segments of the page cache.
+        *
+        * Once all the available data for a record is exposed, it can not be read again using
+        * this function. It must be consumed prior to getting the next record. This can be done by
+        * calling discard_data_record() or read_data_record(). However, if parameter
+        * <i>auto_discard</i> is set to <b><i>true</i></b>, then this record will be automatically
+        * consumed when the entire record has become available without having to explicitly call
+        * discard_next_data_record() or read_data_record().
+        *
+        * If the current record is an open transactional record, then it cannot be read until it is
+        * committed. If it is aborted, it can never be read. Under this condition, get_data_record()
+        * will return RHM_IORES_TXPENDING, the data pointer will be set to NULL and all data
+        * lengths will be set to 0.
+        *
+        * Example: Read a record of 30k. Assume a read page cache of 10 pages of size 10k starting
+        * at address base_ptr (page0 = base_ptr, page1 = page_ptr+10k, etc.). The first 15k of
+        * the record falls at the end of the page cache, the remaining 15k folded to the beginning.
+        * The current page (page 8) containing 5k is available, the remaining pages which contain
+        * this record are pending AIO return:
+        * <pre>
+        * call       dsize
+        * no.  dsize avail data ptr     Return   Comment
+        * ----+-----+-----+------------+--------+--------------------------------------------------
+        * 1    30k   5k    base_ptr+85k SUCCESS  Initial call, read first 5k
+        * 2    30k   0k    base_ptr+90k AIO_WAIT AIO still pending; no further pages avail
+        * 3    30k   10k   base_ptr+90k SUCCESS  AIO now returned; now read till end of page cache
+        * 4    30k   15k   base_ptr     SUCCESS  data_ptr now pointing to start of page cache
+        * </pre>
+        *
+        * \param rid Reference that returns the record ID (rid)
+        * \param dsize Reference that returns the total data size of the record data .
+        * \param dsize_avail Reference that returns the amount of the data that is available for
+        *     consumption.
+        * \param data Pointer to data pointer which will point to the first byte of the next record
+        *     data.
+        * \param auto_discard If <b><i>true</i></b>, automatically discard the record being read if
+        *     the entire record is available (i.e. dsize == dsize_avail). Otherwise
+        *     discard_next_data_record() must be explicitly called.
+        *
+        * \exception TODO
+        *
+        // *** NOT YET IMPLEMENTED ***
+        iores get_data_record(const u_int64_t& rid, const std::size_t& dsize,
+                const std::size_t& dsize_avail, const void** const data, bool auto_discard = false);
+        */
+
+        /* TODO
+        **
+        * \brief Discard (skip) next record to be read without reading or retrieving it.
+        *
+        * \exception TODO
+        *
+        // *** NOT YET IMPLEMENTED ***
+        iores discard_data_record(data_tok* const dtokp);
+        */
+
+        /**
+        * \brief Reads data from the journal. It is the responsibility of the reader to free
+        *     the memory that is allocated through this call - see below for details.
+        *
+        * Reads the next non-dequeued data record from the journal.
+        *
+        * <b>Note</b> that this call allocates memory into which the data and XID are copied. It
+        * is the responsibility of the caller to free this memory. The memory for the data and
+        * XID are allocated in a single call, and the XID precedes the data in the memory space.
+        * Thus, where an XID exists, freeing the XID pointer will free both the XID and data memory.
+        * However, if an XID does not exist for the message, the XID pointer xidpp is set to NULL,
+        * and it is the data pointer datapp that must be freed. Should neither an XID nor data be
+        * present (ie an empty record), then no memory is allocated, and both pointers will be NULL.
+        * In this case, there is no need to free memory.
+        *
+        * TODO: Fix this lousy interface. The caller should NOT be required to clean up these
+        * pointers! Rather use a struct, or better still, let the data token carry the data and
+        * xid pointers and lengths, and have the data token both allocate and delete.
+        *
+        * \param datapp Pointer to pointer that will be set to point to memory allocated and
+        *     containing the data. Will be set to NULL if the call fails or there is no data
+        *     in the record.
+        * \param dsize Ref that will be set to the size of the data. Will be set to 0 if the call
+        *     fails or if there is no data in the record.
+        * \param xidpp Pointer to pointer that will be set to point to memory allocated and
+        *     containing the XID. Will be set to NULL if the call fails or there is no XID attached
+        *     to this record.
+        * \param xidsize Ref that will be set to the size of the XID.
+        * \param transient Ref that will be set true if record is transient.
+        * \param external Ref that will be set true if record is external. In this case, the data
+        *     pointer datapp will be set to NULL, but dsize will contain the size of the data.
+        *     NOTE: If there is an xid, then xidpp must be freed.
+        * \param dtokp Pointer to data_tok instance for this data, used to track state of data
+        *     through journal.
+        * \param ignore_pending_txns When false (default), if the next record to be read is locked
+        *     by a pending transaction, the read fails with RHM_IORES_TXPENDING. However, if set
+        *     to true, then locks are ignored. This is required for reading of the Transaction
+        *     Prepared List (TPL) which may have its entries locked, but may be read from
+        *     time-to-time, and needs all its records (locked and unlocked) to be available.
+        *
+        * \exception TODO
+        */
+        iores read_data_record(void** const datapp, std::size_t& dsize, void** const xidpp,
+                std::size_t& xidsize, bool& transient, bool& external, data_tok* const dtokp,
+                bool ignore_pending_txns = false);
+
+        /**
+        * \brief Dequeues (marks as no longer needed) data record in journal.
+        *
+        * Dequeues (marks as no longer needed) data record in journal. Note that it is possible
+        * to use the same data token instance used to enqueue this data; it contains the record ID
+        * needed to correctly mark this data as dequeued in the journal. Otherwise the RID of the
+        * record to be dequeued and the write state of ENQ must be manually set in a new or reset
+        * instance of data_tok.
+        *
+        * \param dtokp Pointer to data_tok instance for this data, used to track state of data
+        *     through journal.
+        * \param txn_coml_commit Only used for preparedXID journal. When used for dequeueing
+        *     prepared XID list items, sets whether the complete() was called in commit or abort
+        *     mode.
+        *
+        * \exception TODO
+        */
+        iores dequeue_data_record(data_tok* const dtokp, const bool txn_coml_commit = false);
+
+        /**
+        * \brief Dequeues (marks as no longer needed) data record in journal.
+        *
+        * Dequeues (marks as no longer needed) data record in journal as part of a transaction.
+        * Note that it is possible to use the same data token instance used to enqueue this data;
+        * it contains the RID needed to correctly mark this data as dequeued in the journal.
+        * Otherwise the RID of the record to be dequeued and the write state of ENQ must be
+        * manually set in a new or reset instance of data_tok.
+        *
+        * \param dtokp Pointer to data_tok instance for this data, used to track state of data
+        *     through journal.
+        * \param xid String containing xid. An empty string (i.e. length=0) will be considered
+        *     non-transactional.
+        * \param txn_coml_commit Only used for preparedXID journal. When used for dequeueing
+        *     prepared XID list items, sets whether the complete() was called in commit or abort
+        *     mode.
+        *
+        * \exception TODO
+        */
+        iores dequeue_txn_data_record(data_tok* const dtokp, const std::string& xid, const bool txn_coml_commit = false);
+
+        /**
+        * \brief Abort the transaction for all records enqueued or dequeued with the matching xid.
+        *
+        * Abort the transaction for all records enqueued with the matching xid. All enqueued records
+        * are effectively deleted from the journal, and can not be read. All dequeued records remain
+        * as though they had never been dequeued.
+        *
+        * \param dtokp Pointer to data_tok instance for this data, used to track state of data
+        *     through journal.
+        * \param xid String containing xid.
+        *
+        * \exception TODO
+        */
+        iores txn_abort(data_tok* const dtokp, const std::string& xid);
+
+        /**
+        * \brief Commit the transaction for all records enqueued or dequeued with the matching xid.
+        *
+        * Commit the transaction for all records enqueued with the matching xid. All enqueued
+        * records are effectively released for reading and dequeueing. All dequeued records are
+        * removed and can no longer be accessed.
+        *
+        * \param dtokp Pointer to data_tok instance for this data, used to track state of data
+        *     through journal.
+        * \param xid String containing xid.
+        *
+        * \exception TODO
+        */
+        iores txn_commit(data_tok* const dtokp, const std::string& xid);
+
+        /**
+        * \brief Check whether all the enqueue records for the given xid have reached disk.
+        *
+        * \param xid String containing xid.
+        *
+        * \exception TODO
+        */
+        bool is_txn_synced(const std::string& xid);
+
+        /**
+        * \brief Forces a check for returned AIO write events.
+        *
+        * Forces a check for returned AIO write events. This is normally performed by enqueue() and
+        * dequeue() operations, but if these operations cease, then this call needs to be made to
+        * force the processing of any outstanding AIO operations.
+        */
+        int32_t get_wr_events(timespec* const timeout);
+
+        /**
+        * \brief Forces a check for returned AIO read events.
+        *
+        * Forces a check for returned AIO read events. This is normally performed by read_data()
+        * operations, but if these operations cease, then this call needs to be made to force the
+        * processing of any outstanding AIO operations.
+        */
+        int32_t get_rd_events(timespec* const timeout);
+
+        /**
+        * \brief Stop the journal from accepting any further requests to read or write data.
+        *
+        * This operation is used to stop the journal. This is the normal mechanism for bringing the
+        * journal to an orderly stop. Any outstanding AIO operations or partially written pages in
+        * the write page cache will by flushed and will complete.
+        *
+        * <b>Note:</b> The journal cannot be restarted without either initializing it or restoring
+        *     it.
+        *
+        * \param block_till_aio_cmpl If true, will block the thread while waiting for all
+        *     outstanding AIO operations to complete.
+        */
+        void stop(const bool block_till_aio_cmpl = false);
+
+        /**
+        * \brief Force a flush of the write page cache, creating a single AIO write operation.
+        */
+        iores flush(const bool block_till_aio_cmpl = false);
+
+        inline u_int32_t get_enq_cnt() const { return _emap.size(); }
+
+        inline u_int32_t get_wr_aio_evt_rem() const { slock l(_wr_mutex); return _wmgr.get_aio_evt_rem(); }
+
+        inline u_int32_t get_rd_aio_evt_rem() const { return _rmgr.get_aio_evt_rem(); }
+
+        inline u_int32_t get_wr_outstanding_aio_dblks() const
+                { return _wrfc.aio_outstanding_dblks(); }
+
+        inline u_int32_t get_wr_outstanding_aio_dblks(u_int16_t lfid) const
+                { return _lpmgr.get_fcntlp(lfid)->wr_aio_outstanding_dblks(); }
+
+        inline u_int32_t get_rd_outstanding_aio_dblks() const
+                { return _rrfc.aio_outstanding_dblks(); }
+
+        inline u_int32_t get_rd_outstanding_aio_dblks(u_int16_t lfid) const
+                { return _lpmgr.get_fcntlp(lfid)->rd_aio_outstanding_dblks(); }
+
+        inline u_int16_t get_rd_fid() const { return _rrfc.index(); }
+        inline u_int16_t get_wr_fid() const { return _wrfc.index(); }
+        u_int16_t get_earliest_fid();
+
+        /**
+        * \brief Check if a particular rid is enqueued. Note that this function will return
+        *     false if the rid is transactionally enqueued and is not committed, or if it is
+        *     locked (i.e. transactionally dequeued, but the dequeue has not been committed).
+        */
+        inline bool is_enqueued(const u_int64_t rid, bool ignore_lock = false)
+                { return _emap.is_enqueued(rid, ignore_lock); }
+        inline bool is_locked(const u_int64_t rid)
+                { if (_emap.is_enqueued(rid, true) < enq_map::EMAP_OK) return false; return _emap.is_locked(rid) == enq_map::EMAP_TRUE; }
+        inline void enq_rid_list(std::vector<u_int64_t>& rids) { _emap.rid_list(rids); }
+        inline void enq_xid_list(std::vector<std::string>& xids) { _tmap.xid_list(xids); }
+        inline u_int32_t get_open_txn_cnt() const { return _tmap.size(); }
+        // TODO Make this a const, but txn_map must support const first.
+        inline txn_map& get_txn_map() { return _tmap; }
+
+        /**
+        * \brief Check if the journal is stopped.
+        *
+        * \return <b><i>true</i></b> if the jouranl is stopped;
+        *     <b><i>false</i></b> otherwise.
+        */
+        inline bool is_stopped() { return _stop_flag; }
+
+        /**
+        * \brief Check if the journal is ready to read and write data.
+        *
+        * Checks if the journal is ready to read and write data. This function will return
+        * <b><i>true</i></b> if the journal has been either initialized or restored, and the stop()
+        * function has not been called since the initialization.
+        *
+        * Note that the journal may also be stopped if an internal error occurs (such as running out
+        * of data journal file space).
+        *
+        * \return <b><i>true</i></b> if the journal is ready to read and write data;
+        *     <b><i>false</i></b> otherwise.
+        */
+        inline bool is_ready() const { return _init_flag && !_stop_flag; }
+
+        inline bool is_read_only() const { return _readonly_flag; }
+
+        /**
+        * \brief Get the journal directory.
+        *
+        * This returns the journal directory as set during initialization. This is the directory
+        * into which the journal files will be written.
+        */
+        inline const std::string& dirname() const { return _jdir.dirname(); }
+
+        /**
+        * \brief Get the journal base filename.
+        *
+        * Get the journal base filename as set during initialization. This is the prefix used in all
+        * journal files of this instance. Note that if more than one instance of the journal shares
+        * the same directory, their base filenames <b>MUST</b> be different or else the instances
+        * will overwrite one another.
+        */
+        inline const std::string& base_filename() const { return _base_filename; }
+
+        inline u_int16_t num_jfiles() const { return _lpmgr.num_jfiles(); }
+
+        inline fcntl* get_fcntlp(const u_int16_t lfid) const { return _lpmgr.get_fcntlp(lfid); }
+
+        inline u_int32_t jfsize_sblks() const { return _jfsize_sblks; }
+
+        // Logging
+        virtual void log(log_level level, const std::string& log_stmt) const;
+        virtual void log(log_level level, const char* const log_stmt) const;
+
+        // FIXME these are _rmgr to _wmgr interactions, remove when _rmgr contains ref to _wmgr:
+        void chk_wr_frot();
+        inline u_int32_t unflushed_dblks() { return _wmgr.unflushed_dblks(); }
+        void fhdr_wr_sync(const u_int16_t lid);
+        inline u_int32_t wr_subm_cnt_dblks(const u_int16_t lfid) const { return _lpmgr.get_fcntlp(lfid)->wr_subm_cnt_dblks(); }
+
+        // Management instrumentation callbacks
+        inline virtual void instr_incr_outstanding_aio_cnt() {}
+        inline virtual void instr_decr_outstanding_aio_cnt() {}
+
+        /**
+        * /brief Static function for creating new fcntl objects for use with obj_arr.
+        */
+        static fcntl* new_fcntl(jcntl* const jcp, const u_int16_t lid, const u_int16_t fid, const rcvdat* const rdp);
+
+    protected:
+        static bool _init;
+        static bool init_statics();
+
+        /**
+        * \brief Check status of journal before allowing write operations.
+        */
+        void check_wstatus(const char* fn_name) const;
+
+        /**
+        * \brief Check status of journal before allowing read operations.
+        */
+        void check_rstatus(const char* fn_name) const;
+
+        /**
+        * \brief Write info file &lt;basefilename&gt;.jinf to disk
+        */
+        void write_infofile() const;
+
+        /**
+        * \brief Call that blocks while waiting for all outstanding AIOs to complete
+        */
+        void aio_cmpl_wait();
+
+        /**
+        * \brief Call that blocks until at least one message returns; used to wait for
+        *     AIO wait conditions to clear.
+        */
+        bool handle_aio_wait(const iores res, iores& resout, const data_tok* dtp);
+
+        /**
+        * \brief Analyze journal for recovery.
+        */
+        void rcvr_janalyze(rcvdat& rd, const std::vector<std::string>* prep_txn_list_ptr);
+
+        bool rcvr_get_next_record(u_int16_t& fid, std::ifstream* ifsp, bool& lowi, rcvdat& rd);
+
+        bool decode(jrec& rec, u_int16_t& fid, std::ifstream* ifsp, std::size_t& cum_size_read,
+                rec_hdr& h, bool& lowi, rcvdat& rd, std::streampos& rec_offset);
+
+        bool jfile_cycle(u_int16_t& fid, std::ifstream* ifsp, bool& lowi, rcvdat& rd,
+                const bool jump_fro);
+
+        bool check_owi(const u_int16_t fid, rec_hdr& h, bool& lowi, rcvdat& rd,
+                std::streampos& read_pos);
+
+        void check_journal_alignment(const u_int16_t fid, std::streampos& rec_offset, rcvdat& rd);
+    };
+
+} // namespace journal
+} // namespace mrg
+
+#endif // ifndef mrg_journal_jcntl_hpp

Added: qpid/trunk/qpid/cpp/src/qpid/legacystore/jrnl/jdir.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/legacystore/jrnl/jdir.cpp?rev=1424091&view=auto
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/legacystore/jrnl/jdir.cpp (added)
+++ qpid/trunk/qpid/cpp/src/qpid/legacystore/jrnl/jdir.cpp Wed Dec 19 20:34:56 2012
@@ -0,0 +1,466 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+/**
+ * \file jdir.cpp
+ *
+ * Qpid asynchronous store plugin library
+ *
+ * File containing code for class mrg::journal::jdir (journal data
+ * directory), used for controlling and manipulating journal data
+ * direcories and files. See comments in file jdir.hpp for details.
+ *
+ * \author Kim van der Riet
+ *
+ * Copyright (c) 2007, 2008 Red Hat, Inc.
+ *
+ */
+
+#include "jrnl/jdir.hpp"
+
+#include <cstdlib>
+#include <cstring>
+#include <cerrno>
+#include <iomanip>
+#include "jrnl/jcfg.hpp"
+#include "jrnl/jerrno.hpp"
+#include "jrnl/jexception.hpp"
+#include <sstream>
+#include <sys/stat.h>
+#include <unistd.h>
+
+namespace mrg
+{
+namespace journal
+{
+
+jdir::jdir(const std::string& dirname, const std::string& _base_filename):
+        _dirname(dirname),
+        _base_filename(_base_filename)
+{}
+
+jdir::~jdir()
+{}
+
+// === create_dir ===
+
+void
+jdir::create_dir()
+{
+    create_dir(_dirname);
+}
+
+
+void
+jdir::create_dir(const char* dirname)
+{
+    create_dir(std::string(dirname));
+}
+
+
+void
+jdir::create_dir(const std::string& dirname)
+{
+    std::size_t fdp = dirname.find_last_of('/');
+    if (fdp != std::string::npos)
+    {
+        std::string parent_dir = dirname.substr(0, fdp);
+        if (!exists(parent_dir))
+            create_dir(parent_dir);
+    }
+    if (::mkdir(dirname.c_str(), S_IRWXU | S_IRWXG | S_IROTH | S_IXOTH))
+    {
+        if (errno != EEXIST) // Dir exists, ignore
+        {
+            std::ostringstream oss;
+            oss << "dir=\"" << dirname << "\"" << FORMAT_SYSERR(errno);
+            throw jexception(jerrno::JERR_JDIR_MKDIR, oss.str(), "jdir", "create_dir");
+        }
+    }
+}
+
+
+// === clear_dir ===
+
+void
+jdir::clear_dir(const bool create_flag)
+{
+    clear_dir(_dirname, _base_filename, create_flag);
+}
+
+void
+jdir::clear_dir(const char* dirname, const char* base_filename, const bool create_flag)
+{
+    clear_dir(std::string(dirname), std::string(base_filename), create_flag);
+}
+
+
+void
+jdir::clear_dir(const std::string& dirname, const std::string&
+#ifndef RHM_JOWRITE
+        base_filename
+#endif
+        , const bool create_flag)
+{
+    DIR* dir = ::opendir(dirname.c_str());
+    if (!dir)
+    {
+        if (errno == 2 && create_flag) // ENOENT (No such file or dir)
+        {
+            create_dir(dirname);
+            return;
+        }
+        std::ostringstream oss;
+        oss << "dir=\"" << dirname << "\"" << FORMAT_SYSERR(errno);
+        throw jexception(jerrno::JERR_JDIR_OPENDIR, oss.str(), "jdir", "clear_dir");
+    }
+#ifndef RHM_JOWRITE
+    struct dirent* entry;
+    bool found = false;
+    std::string bak_dir;
+    while ((entry = ::readdir(dir)) != 0)
+    {
+        // Ignore . and ..
+        if (std::strcmp(entry->d_name, ".") != 0 && std::strcmp(entry->d_name, "..") != 0)
+        {
+            if (std::strlen(entry->d_name) > base_filename.size())
+            {
+                if (std::strncmp(entry->d_name, base_filename.c_str(), base_filename.size()) == 0)
+                {
+                    if (!found)
+                    {
+                        bak_dir = create_bak_dir(dirname, base_filename);
+                        found = true;
+                    }
+                    std::ostringstream oldname;
+                    oldname << dirname << "/" << entry->d_name;
+                    std::ostringstream newname;
+                    newname << bak_dir << "/" << entry->d_name;
+                    if (::rename(oldname.str().c_str(), newname.str().c_str()))
+                    {
+                        ::closedir(dir);
+                        std::ostringstream oss;
+                        oss << "file=\"" << oldname.str() << "\" dest=\"" <<
+                                newname.str() << "\"" << FORMAT_SYSERR(errno);
+                        throw jexception(jerrno::JERR_JDIR_FMOVE, oss.str(), "jdir", "clear_dir");
+                    }
+                }
+            }
+        }
+    }
+// FIXME: Find out why this fails with false alarms/errors from time to time...
+// While commented out, there is no error capture from reading dir entries.
+//    check_err(errno, dir, dirname, "clear_dir");
+#endif
+    close_dir(dir, dirname, "clear_dir");
+}
+
+// === push_down ===
+
+std::string
+jdir::push_down(const std::string& dirname, const std::string& target_dir, const std::string& bak_dir_base)
+{
+    std::string bak_dir_name = create_bak_dir(dirname, bak_dir_base);
+
+    DIR* dir = ::opendir(dirname.c_str());
+    if (!dir)
+    {
+        std::ostringstream oss;
+        oss << "dir=\"" << dirname << "\"" << FORMAT_SYSERR(errno);
+        throw jexception(jerrno::JERR_JDIR_OPENDIR, oss.str(), "jdir", "push_down");
+    }
+    // Copy contents of targetDirName into bak dir
+    struct dirent* entry;
+    while ((entry = ::readdir(dir)) != 0)
+    {
+        // Search for targetDirName in storeDirName
+        if (std::strcmp(entry->d_name, target_dir.c_str()) == 0)
+        {
+            std::ostringstream oldname;
+            oldname << dirname << "/" << target_dir;
+            std::ostringstream newname;
+            newname << bak_dir_name << "/" << target_dir;
+            if (::rename(oldname.str().c_str(), newname.str().c_str()))
+            {
+                ::closedir(dir);
+                std::ostringstream oss;
+                oss << "file=\"" << oldname.str() << "\" dest=\"" <<  newname.str() << "\"" << FORMAT_SYSERR(errno);
+                throw jexception(jerrno::JERR_JDIR_FMOVE, oss.str(), "jdir", "push_down");
+            }
+            break;
+        }
+    }
+    close_dir(dir, dirname, "push_down");
+    return bak_dir_name;
+}
+
+// === verify_dir ===
+
+void
+jdir::verify_dir()
+{
+    verify_dir(_dirname, _base_filename);
+}
+
+void
+jdir::verify_dir(const char* dirname, const char* base_filename)
+{
+    verify_dir(std::string(dirname), std::string(base_filename));
+}
+
+
+void
+jdir::verify_dir(const std::string& dirname, const std::string& base_filename)
+{
+    if (!is_dir(dirname))
+    {
+        std::ostringstream oss;
+        oss << "dir=\"" << dirname << "\"";
+        throw jexception(jerrno::JERR_JDIR_NOTDIR, oss.str(), "jdir", "verify_dir");
+    }
+
+    // Read jinf file, then verify all journal files are present
+    jinf ji(dirname + "/" + base_filename + "." + JRNL_INFO_EXTENSION, true);
+    for (u_int16_t fnum=0; fnum < ji.num_jfiles(); fnum++)
+    {
+        std::ostringstream oss;
+        oss << dirname << "/" << base_filename << ".";
+        oss << std::setw(4) << std::setfill('0') << std::hex << fnum;
+        oss << "." << JRNL_DATA_EXTENSION;
+        if (!exists(oss.str()))
+            throw jexception(jerrno::JERR_JDIR_NOSUCHFILE, oss.str(), "jdir", "verify_dir");
+    }
+}
+
+
+// === delete_dir ===
+
+void
+jdir::delete_dir(bool children_only)
+{
+    delete_dir(_dirname, children_only);
+}
+
+void
+jdir::delete_dir(const char* dirname, bool children_only)
+{
+    delete_dir(std::string(dirname), children_only);
+}
+
+void
+jdir::delete_dir(const std::string& dirname, bool children_only)
+{
+    struct dirent* entry;
+    struct stat s;
+    DIR* dir = ::opendir(dirname.c_str());
+    if (!dir)
+    {
+        if (errno == ENOENT) // dir does not exist.
+            return;
+
+        std::ostringstream oss;
+        oss << "dir=\"" << dirname << "\"" << FORMAT_SYSERR(errno);
+        throw jexception(jerrno::JERR_JDIR_OPENDIR, oss.str(), "jdir", "delete_dir");
+    }
+    else
+    {
+        while ((entry = ::readdir(dir)) != 0)
+        {
+            // Ignore . and ..
+            if (std::strcmp(entry->d_name, ".") != 0 && std::strcmp(entry->d_name, "..") != 0)
+            {
+                std::string full_name(dirname + "/" + entry->d_name);
+                if (::lstat(full_name.c_str(), &s))
+                {
+                    ::closedir(dir);
+                    std::ostringstream oss;
+                    oss << "stat: file=\"" << full_name << "\"" << FORMAT_SYSERR(errno);
+                    throw jexception(jerrno::JERR_JDIR_STAT, oss.str(), "jdir", "delete_dir");
+                }
+                if (S_ISREG(s.st_mode) || S_ISLNK(s.st_mode)) // This is a file or slink
+                {
+                    if(::unlink(full_name.c_str()))
+                    {
+                        ::closedir(dir);
+                        std::ostringstream oss;
+                        oss << "unlink: file=\"" << entry->d_name << "\"" << FORMAT_SYSERR(errno);
+                        throw jexception(jerrno::JERR_JDIR_UNLINK, oss.str(), "jdir", "delete_dir");
+                    }
+                }
+                else if (S_ISDIR(s.st_mode)) // This is a dir
+                {
+                    delete_dir(full_name);
+                }
+                else // all other types, throw up!
+                {
+                    ::closedir(dir);
+                    std::ostringstream oss;
+                    oss << "file=\"" << entry->d_name << "\" is not a dir, file or slink.";
+                    oss << " (mode=0x" << std::hex << s.st_mode << std::dec << ")";
+                    throw jexception(jerrno::JERR_JDIR_BADFTYPE, oss.str(), "jdir", "delete_dir");
+                }
+            }
+        }
+
+// FIXME: Find out why this fails with false alarms/errors from time to time...
+// While commented out, there is no error capture from reading dir entries.
+//        check_err(errno, dir, dirname, "delete_dir");
+    }
+    // Now dir is empty, close and delete it
+    close_dir(dir, dirname, "delete_dir");
+
+    if (!children_only)
+        if (::rmdir(dirname.c_str()))
+        {
+            std::ostringstream oss;
+            oss << "dir=\"" << dirname << "\"" << FORMAT_SYSERR(errno);
+            throw jexception(jerrno::JERR_JDIR_RMDIR, oss.str(), "jdir", "delete_dir");
+        }
+}
+
+
+std::string
+jdir::create_bak_dir(const std::string& dirname, const std::string& base_filename)
+{
+    DIR* dir = ::opendir(dirname.c_str());
+    long dir_num = 0L;
+    if (!dir)
+    {
+        std::ostringstream oss;
+        oss << "dir=\"" << dirname << "\"" << FORMAT_SYSERR(errno);
+        throw jexception(jerrno::JERR_JDIR_OPENDIR, oss.str(), "jdir", "create_bak_dir");
+    }
+    struct dirent* entry;
+    while ((entry = ::readdir(dir)) != 0)
+    {
+        // Ignore . and ..
+        if (std::strcmp(entry->d_name, ".") != 0 && std::strcmp(entry->d_name, "..") != 0)
+        {
+            if (std::strlen(entry->d_name) == base_filename.size() + 10) // Format: basename.bak.XXXX
+            {
+                std::ostringstream oss;
+                oss << "_" << base_filename << ".bak.";
+                if (std::strncmp(entry->d_name, oss.str().c_str(), base_filename.size() + 6) == 0)
+                {
+                    long this_dir_num = std::strtol(entry->d_name + base_filename.size() + 6, 0, 16);
+                    if (this_dir_num > dir_num)
+                        dir_num = this_dir_num;
+                }
+            }
+        }
+    }
+// FIXME: Find out why this fails with false alarms/errors from time to time...
+// While commented out, there is no error capture from reading dir entries.
+//    check_err(errno, dir, dirname, "create_bak_dir");
+    close_dir(dir, dirname, "create_bak_dir");
+
+    std::ostringstream dn;
+    dn << dirname << "/_" << base_filename << ".bak." << std::hex << std::setw(4) <<
+            std::setfill('0') << ++dir_num;
+    if (::mkdir(dn.str().c_str(), S_IRWXU | S_IRWXG | S_IROTH))
+    {
+        std::ostringstream oss;
+        oss << "dir=\"" << dn.str() << "\"" << FORMAT_SYSERR(errno);
+        throw jexception(jerrno::JERR_JDIR_MKDIR, oss.str(), "jdir", "create_bak_dir");
+    }
+    return std::string(dn.str());
+}
+
+bool
+jdir::is_dir(const char* name)
+{
+    struct stat s;
+    if (::stat(name, &s))
+    {
+        std::ostringstream oss;
+        oss << "file=\"" << name << "\"" << FORMAT_SYSERR(errno);
+        throw jexception(jerrno::JERR_JDIR_STAT, oss.str(), "jdir", "is_dir");
+    }
+    return S_ISDIR(s.st_mode);
+}
+
+bool
+jdir::is_dir(const std::string& name)
+{
+    return is_dir(name.c_str());
+}
+
+bool
+jdir::exists(const char* name)
+{
+    struct stat s;
+    if (::stat(name, &s))
+    {
+        if (errno == ENOENT) // No such dir or file
+            return false;
+        // Throw for any other condition
+        std::ostringstream oss;
+        oss << "file=\"" << name << "\"" << FORMAT_SYSERR(errno);
+        throw jexception(jerrno::JERR_JDIR_STAT, oss.str(), "jdir", "exists");
+    }
+    return true;
+}
+
+bool
+jdir::exists(const std::string& name)
+{
+    return exists(name.c_str());
+}
+
+void
+jdir::check_err(const int err_num, DIR* dir, const std::string& dir_name, const std::string& fn_name)
+{
+    if (err_num)
+    {
+        std::ostringstream oss;
+        oss << "dir=\"" << dir_name << "\"" << FORMAT_SYSERR(err_num);
+        ::closedir(dir); // Try to close, it makes no sense to trap errors here...
+        throw jexception(jerrno::JERR_JDIR_READDIR, oss.str(), "jdir", fn_name);
+    }
+}
+
+void
+jdir::close_dir(DIR* dir, const std::string& dir_name, const std::string& fn_name)
+{
+    if (::closedir(dir))
+    {
+        std::ostringstream oss;
+        oss << "dir=\"" << dir_name << "\"" << FORMAT_SYSERR(errno);
+        throw jexception(jerrno::JERR_JDIR_CLOSEDIR, oss.str(), "jdir", fn_name);
+    }
+}
+
+std::ostream&
+operator<<(std::ostream& os, const jdir& jdir)
+{
+    os << jdir._dirname;
+    return os;
+}
+
+std::ostream&
+operator<<(std::ostream& os, const jdir* jdirPtr)
+{
+    os << jdirPtr->_dirname;
+    return os;
+}
+
+} // namespace journal
+} // namespace mrg



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org