You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kp...@apache.org on 2013/10/22 21:09:58 UTC
svn commit: r1534736 [5/8] - in /qpid/trunk/qpid/cpp/src: ./
qpid/linearstore/ qpid/linearstore/jrnl/ qpid/linearstore/jrnl/utils/
tests/linearstore/
Added: qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/deq_rec.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/deq_rec.cpp?rev=1534736&view=auto
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/deq_rec.cpp (added)
+++ qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/deq_rec.cpp Tue Oct 22 19:09:56 2013
@@ -0,0 +1,464 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+/**
+ * \file deq_rec.cpp
+ *
+ * Qpid asynchronous store plugin library
+ *
+ * This file contains the code for the mrg::journal::deq_rec (journal dequeue
+ * record) class. See comments in file deq_rec.h for details.
+ *
+ * \author Kim van der Riet
+ */
+
+#include "qpid/linearstore/jrnl/deq_rec.h"
+#include "qpid/linearstore/jrnl/utils/deq_hdr.h"
+#include "qpid/linearstore/jrnl/utils/rec_tail.h"
+
+#include <cassert>
+#include <cerrno>
+#include <cstdlib>
+#include <cstring>
+#include <iomanip>
+#include "qpid/linearstore/jrnl/jerrno.h"
+#include "qpid/linearstore/jrnl/jexception.h"
+#include <sstream>
+
+namespace qpid
+{
+namespace qls_jrnl
+{
+
+deq_rec::deq_rec():
+// _deq_hdr(QLS_DEQ_MAGIC, QLS_JRNL_VERSION, 0, 0, 0, false),
+ _xidp(0),
+ _buff(0)
+// _deq_tail(_deq_hdr)
+{
+ ::deq_hdr_init(&_deq_hdr, QLS_DEQ_MAGIC, QLS_JRNL_VERSION, 0, 0, 0, 0);
+ ::rec_tail_copy(&_deq_tail, &_deq_hdr._rhdr, 0);
+}
+
+deq_rec::deq_rec(const uint64_t rid, const uint64_t drid, const void* const xidp,
+ const std::size_t xidlen, const bool txn_coml_commit):
+// _deq_hdr(QLS_DEQ_MAGIC, QLS_JRNL_VERSION, rid, drid, xidlen, owi, txn_coml_commit),
+ _xidp(xidp),
+ _buff(0)
+// _deq_tail(_deq_hdr)
+{
+ ::deq_hdr_init(&_deq_hdr, QLS_DEQ_MAGIC, QLS_JRNL_VERSION, 0, rid, drid, xidlen);
+ ::rec_tail_copy(&_deq_tail, &_deq_hdr._rhdr, 0);
+ ::set_txn_coml_commit(&_deq_hdr, txn_coml_commit);
+}
+
+deq_rec::~deq_rec()
+{
+ clean();
+}
+
+void
+deq_rec::reset()
+{
+ _deq_hdr._rhdr._rid = 0;
+// _deq_hdr.set_owi(false);
+ ::set_txn_coml_commit(&_deq_hdr, false);
+ _deq_hdr._deq_rid = 0;
+ _deq_hdr._xidsize = 0;
+ _deq_tail._checksum = 0;
+ _deq_tail._rid = 0;
+ _xidp = 0;
+ _buff = 0;
+}
+
+void
+deq_rec::reset(const uint64_t rid, const uint64_t drid, const void* const xidp,
+ const std::size_t xidlen, const bool txn_coml_commit)
+{
+ _deq_hdr._rhdr._rid = rid;
+ ::set_txn_coml_commit(&_deq_hdr, txn_coml_commit);
+ _deq_hdr._deq_rid = drid;
+ _deq_hdr._xidsize = xidlen;
+ _deq_tail._rid = rid;
+ _xidp = xidp;
+ _buff = 0;
+}
+
+uint32_t
+deq_rec::encode(void* wptr, uint32_t rec_offs_dblks, uint32_t max_size_dblks)
+{
+ assert(wptr != 0);
+ assert(max_size_dblks > 0);
+ if (_xidp == 0)
+ assert(_deq_hdr._xidsize == 0);
+
+ std::size_t rec_offs = rec_offs_dblks * QLS_DBLK_SIZE_BYTES;
+ std::size_t rem = max_size_dblks * QLS_DBLK_SIZE_BYTES;
+ std::size_t wr_cnt = 0;
+ if (rec_offs_dblks) // Continuation of split dequeue record (over 2 or more pages)
+ {
+ if (size_dblks(rec_size()) - rec_offs_dblks > max_size_dblks) // Further split required
+ {
+ rec_offs -= sizeof(_deq_hdr);
+ std::size_t wsize = _deq_hdr._xidsize > rec_offs ? _deq_hdr._xidsize - rec_offs : 0;
+ std::size_t wsize2 = wsize;
+ if (wsize)
+ {
+ if (wsize > rem)
+ wsize = rem;
+ std::memcpy(wptr, (const char*)_xidp + rec_offs, wsize);
+ wr_cnt += wsize;
+ rem -= wsize;
+ }
+ rec_offs -= _deq_hdr._xidsize - wsize2;
+ if (rem)
+ {
+ wsize = sizeof(_deq_tail) > rec_offs ? sizeof(_deq_tail) - rec_offs : 0;
+ wsize2 = wsize;
+ if (wsize)
+ {
+ if (wsize > rem)
+ wsize = rem;
+ std::memcpy((char*)wptr + wr_cnt, (char*)&_deq_tail + rec_offs, wsize);
+ wr_cnt += wsize;
+ rem -= wsize;
+ }
+ rec_offs -= sizeof(_deq_tail) - wsize2;
+ }
+ assert(rem == 0);
+ assert(rec_offs == 0);
+ }
+ else // No further split required
+ {
+ rec_offs -= sizeof(_deq_hdr);
+ std::size_t wsize = _deq_hdr._xidsize > rec_offs ? _deq_hdr._xidsize - rec_offs : 0;
+ if (wsize)
+ {
+ std::memcpy(wptr, (const char*)_xidp + rec_offs, wsize);
+ wr_cnt += wsize;
+ }
+ rec_offs -= _deq_hdr._xidsize - wsize;
+ wsize = sizeof(_deq_tail) > rec_offs ? sizeof(_deq_tail) - rec_offs : 0;
+ if (wsize)
+ {
+ std::memcpy((char*)wptr + wr_cnt, (char*)&_deq_tail + rec_offs, wsize);
+ wr_cnt += wsize;
+#ifdef QLS_CLEAN
+ std::size_t rec_offs = rec_offs_dblks * QLS_DBLK_SIZE_BYTES;
+ std::size_t dblk_rec_size = size_dblks(rec_size() - rec_offs) * QLS_DBLK_SIZE_BYTES;
+ std::memset((char*)wptr + wr_cnt, QLS_CLEAN_CHAR, dblk_rec_size - wr_cnt);
+#endif
+ }
+ rec_offs -= sizeof(_deq_tail) - wsize;
+ assert(rec_offs == 0);
+ }
+ }
+ else // Start at beginning of data record
+ {
+ // Assumption: the header will always fit into the first dblk
+ std::memcpy(wptr, (void*)&_deq_hdr, sizeof(_deq_hdr));
+ wr_cnt = sizeof(_deq_hdr);
+ if (size_dblks(rec_size()) > max_size_dblks) // Split required - can only occur with xid
+ {
+ std::size_t wsize;
+ rem -= sizeof(_deq_hdr);
+ if (rem)
+ {
+ wsize = rem >= _deq_hdr._xidsize ? _deq_hdr._xidsize : rem;
+ std::memcpy((char*)wptr + wr_cnt, _xidp, wsize);
+ wr_cnt += wsize;
+ rem -= wsize;
+ }
+ if (rem)
+ {
+ wsize = rem >= sizeof(_deq_tail) ? sizeof(_deq_tail) : rem;
+ std::memcpy((char*)wptr + wr_cnt, (void*)&_deq_tail, wsize);
+ wr_cnt += wsize;
+ rem -= wsize;
+ }
+ assert(rem == 0);
+ }
+ else // No split required
+ {
+ if (_deq_hdr._xidsize)
+ {
+ std::memcpy((char*)wptr + wr_cnt, _xidp, _deq_hdr._xidsize);
+ wr_cnt += _deq_hdr._xidsize;
+ std::memcpy((char*)wptr + wr_cnt, (void*)&_deq_tail, sizeof(_deq_tail));
+ wr_cnt += sizeof(_deq_tail);
+ }
+#ifdef QLS_CLEAN
+ std::size_t dblk_rec_size = size_dblks(rec_size()) * QLS_DBLK_SIZE_BYTES;
+ std::memset((char*)wptr + wr_cnt, QLS_CLEAN_CHAR, dblk_rec_size - wr_cnt);
+#endif
+ }
+ }
+ return size_dblks(wr_cnt);
+}
+
+uint32_t
+deq_rec::decode(rec_hdr_t& h, void* rptr, uint32_t rec_offs_dblks, uint32_t max_size_dblks)
+{
+ assert(rptr != 0);
+ assert(max_size_dblks > 0);
+
+ std::size_t rd_cnt = 0;
+ if (rec_offs_dblks) // Continuation of record on new page
+ {
+ const uint32_t hdr_xid_dblks = size_dblks(sizeof(deq_hdr_t) + _deq_hdr._xidsize);
+ const uint32_t hdr_xid_tail_dblks = size_dblks(sizeof(deq_hdr_t) + _deq_hdr._xidsize +
+ sizeof(rec_tail_t));
+ const std::size_t rec_offs = rec_offs_dblks * QLS_DBLK_SIZE_BYTES;
+
+ if (hdr_xid_tail_dblks - rec_offs_dblks <= max_size_dblks)
+ {
+ // Remainder of xid fits within this page
+ if (rec_offs - sizeof(deq_hdr_t) < _deq_hdr._xidsize)
+ {
+ // Part of xid still outstanding, copy remainder of xid and tail
+ const std::size_t xid_offs = rec_offs - sizeof(deq_hdr_t);
+ const std::size_t xid_rem = _deq_hdr._xidsize - xid_offs;
+ std::memcpy((char*)_buff + xid_offs, rptr, xid_rem);
+ rd_cnt = xid_rem;
+ std::memcpy((void*)&_deq_tail, ((char*)rptr + rd_cnt), sizeof(_deq_tail));
+ chk_tail();
+ rd_cnt += sizeof(_deq_tail);
+ }
+ else
+ {
+ // Tail or part of tail only outstanding, complete tail
+ const std::size_t tail_offs = rec_offs - sizeof(deq_hdr_t) - _deq_hdr._xidsize;
+ const std::size_t tail_rem = sizeof(rec_tail_t) - tail_offs;
+ std::memcpy((char*)&_deq_tail + tail_offs, rptr, tail_rem);
+ chk_tail();
+ rd_cnt = tail_rem;
+ }
+ }
+ else if (hdr_xid_dblks - rec_offs_dblks <= max_size_dblks)
+ {
+ // Remainder of xid fits within this page, tail split
+ const std::size_t xid_offs = rec_offs - sizeof(deq_hdr_t);
+ const std::size_t xid_rem = _deq_hdr._xidsize - xid_offs;
+ std::memcpy((char*)_buff + xid_offs, rptr, xid_rem);
+ rd_cnt += xid_rem;
+ const std::size_t tail_rem = (max_size_dblks * QLS_DBLK_SIZE_BYTES) - rd_cnt;
+ if (tail_rem)
+ {
+ std::memcpy((void*)&_deq_tail, ((char*)rptr + xid_rem), tail_rem);
+ rd_cnt += tail_rem;
+ }
+ }
+ else
+ {
+ // Remainder of xid split
+ const std::size_t xid_cp_size = (max_size_dblks * QLS_DBLK_SIZE_BYTES);
+ std::memcpy((char*)_buff + rec_offs - sizeof(deq_hdr_t), rptr, xid_cp_size);
+ rd_cnt += xid_cp_size;
+ }
+ }
+ else // Start of record
+ {
+ // Get and check header
+ //_deq_hdr.hdr_copy(h);
+ ::rec_hdr_copy(&_deq_hdr._rhdr, &h);
+ rd_cnt = sizeof(rec_hdr_t);
+ _deq_hdr._deq_rid = *(uint64_t*)((char*)rptr + rd_cnt);
+ rd_cnt += sizeof(uint64_t);
+ _deq_hdr._xidsize = *(std::size_t*)((char*)rptr + rd_cnt);
+ rd_cnt = sizeof(deq_hdr_t);
+ chk_hdr();
+ if (_deq_hdr._xidsize)
+ {
+ _buff = std::malloc(_deq_hdr._xidsize);
+ MALLOC_CHK(_buff, "_buff", "deq_rec", "decode");
+ const uint32_t hdr_xid_dblks = size_dblks(sizeof(deq_hdr_t) + _deq_hdr._xidsize);
+ const uint32_t hdr_xid_tail_dblks = size_dblks(sizeof(deq_hdr_t) + _deq_hdr._xidsize +
+ sizeof(rec_tail_t));
+
+ // Check if record (header + xid + tail) fits within this page, we can check the
+ // tail before the expense of copying data to memory
+ if (hdr_xid_tail_dblks <= max_size_dblks)
+ {
+ // Entire header, xid and tail fits within this page
+ std::memcpy(_buff, (char*)rptr + rd_cnt, _deq_hdr._xidsize);
+ rd_cnt += _deq_hdr._xidsize;
+ std::memcpy((void*)&_deq_tail, (char*)rptr + rd_cnt, sizeof(_deq_tail));
+ rd_cnt += sizeof(_deq_tail);
+ chk_tail();
+ }
+ else if (hdr_xid_dblks <= max_size_dblks)
+ {
+ // Entire header and xid fit within this page, tail split
+ std::memcpy(_buff, (char*)rptr + rd_cnt, _deq_hdr._xidsize);
+ rd_cnt += _deq_hdr._xidsize;
+ const std::size_t tail_rem = (max_size_dblks * QLS_DBLK_SIZE_BYTES) - rd_cnt;
+ if (tail_rem)
+ {
+ std::memcpy((void*)&_deq_tail, (char*)rptr + rd_cnt, tail_rem);
+ rd_cnt += tail_rem;
+ }
+ }
+ else
+ {
+ // Header fits within this page, xid split
+ const std::size_t xid_cp_size = (max_size_dblks * QLS_DBLK_SIZE_BYTES) - rd_cnt;
+ std::memcpy(_buff, (char*)rptr + rd_cnt, xid_cp_size);
+ rd_cnt += xid_cp_size;
+ }
+ }
+ }
+ return size_dblks(rd_cnt);
+}
+
+bool
+deq_rec::rcv_decode(rec_hdr_t h, std::ifstream* ifsp, std::size_t& rec_offs)
+{
+ if (rec_offs == 0)
+ {
+ //_deq_hdr.hdr_copy(h);
+ ::rec_hdr_copy(&_deq_hdr._rhdr, &h);
+ ifsp->read((char*)&_deq_hdr._deq_rid, sizeof(uint64_t));
+ ifsp->read((char*)&_deq_hdr._xidsize, sizeof(std::size_t));
+#if defined(JRNL_32_BIT)
+ ifsp->ignore(sizeof(uint32_t)); // _filler0
+#endif
+ rec_offs = sizeof(_deq_hdr);
+ // Read header, allocate (if req'd) for xid
+ if (_deq_hdr._xidsize)
+ {
+ _buff = std::malloc(_deq_hdr._xidsize);
+ MALLOC_CHK(_buff, "_buff", "enq_rec", "rcv_decode");
+ }
+ }
+ if (rec_offs < sizeof(_deq_hdr) + _deq_hdr._xidsize)
+ {
+ // Read xid (or continue reading xid)
+ std::size_t offs = rec_offs - sizeof(_deq_hdr);
+ ifsp->read((char*)_buff + offs, _deq_hdr._xidsize - offs);
+ std::size_t size_read = ifsp->gcount();
+ rec_offs += size_read;
+ if (size_read < _deq_hdr._xidsize - offs)
+ {
+ assert(ifsp->eof());
+ // As we may have read past eof, turn off fail bit
+ ifsp->clear(ifsp->rdstate()&(~std::ifstream::failbit));
+ assert(!ifsp->fail() && !ifsp->bad());
+ return false;
+ }
+ }
+ if (rec_offs < sizeof(_deq_hdr) +
+ (_deq_hdr._xidsize ? _deq_hdr._xidsize + sizeof(rec_tail_t) : 0))
+ {
+ // Read tail (or continue reading tail)
+ std::size_t offs = rec_offs - sizeof(_deq_hdr) - _deq_hdr._xidsize;
+ ifsp->read((char*)&_deq_tail + offs, sizeof(rec_tail_t) - offs);
+ std::size_t size_read = ifsp->gcount();
+ rec_offs += size_read;
+ if (size_read < sizeof(rec_tail_t) - offs)
+ {
+ assert(ifsp->eof());
+ // As we may have read past eof, turn off fail bit
+ ifsp->clear(ifsp->rdstate()&(~std::ifstream::failbit));
+ assert(!ifsp->fail() && !ifsp->bad());
+ return false;
+ }
+ }
+ ifsp->ignore(rec_size_dblks() * QLS_DBLK_SIZE_BYTES - rec_size());
+ if (_deq_hdr._xidsize)
+ chk_tail(); // Throws if tail invalid or record incomplete
+ assert(!ifsp->fail() && !ifsp->bad());
+ return true;
+}
+
+std::size_t
+deq_rec::get_xid(void** const xidpp)
+{
+ if (!_buff)
+ {
+ *xidpp = 0;
+ return 0;
+ }
+ *xidpp = _buff;
+ return _deq_hdr._xidsize;
+}
+
+std::string&
+deq_rec::str(std::string& str) const
+{
+ std::ostringstream oss;
+ oss << "deq_rec: m=" << _deq_hdr._rhdr._magic;
+ oss << " v=" << (int)_deq_hdr._rhdr._version;
+ oss << " rid=" << _deq_hdr._rhdr._rid;
+ oss << " drid=" << _deq_hdr._deq_rid;
+ if (_xidp)
+ oss << " xid=\"" << _xidp << "\"";
+ str.append(oss.str());
+ return str;
+}
+
+std::size_t
+deq_rec::xid_size() const
+{
+ return _deq_hdr._xidsize;
+}
+
+std::size_t
+deq_rec::rec_size() const
+{
+ return sizeof(deq_hdr_t) + (_deq_hdr._xidsize ? _deq_hdr._xidsize + sizeof(rec_tail_t) : 0);
+}
+
+void
+deq_rec::chk_hdr() const
+{
+ jrec::chk_hdr(_deq_hdr._rhdr);
+ if (_deq_hdr._rhdr._magic != QLS_DEQ_MAGIC)
+ {
+ std::ostringstream oss;
+ oss << std::hex << std::setfill('0');
+ oss << "deq magic: rid=0x" << std::setw(16) << _deq_hdr._rhdr._rid;
+ oss << ": expected=0x" << std::setw(8) << QLS_DEQ_MAGIC;
+ oss << " read=0x" << std::setw(2) << (int)_deq_hdr._rhdr._magic;
+ throw jexception(jerrno::JERR_JREC_BADRECHDR, oss.str(), "deq_rec", "chk_hdr");
+ }
+}
+
+void
+deq_rec::chk_hdr(uint64_t rid) const
+{
+ chk_hdr();
+ jrec::chk_rid(_deq_hdr._rhdr, rid);
+}
+
+void
+deq_rec::chk_tail() const
+{
+ jrec::chk_tail(_deq_tail, _deq_hdr._rhdr);
+}
+
+void
+deq_rec::clean()
+{
+ // clean up allocated memory here
+}
+
+} // namespace journal
+} // namespace mrg
Added: qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/deq_rec.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/deq_rec.h?rev=1534736&view=auto
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/deq_rec.h (added)
+++ qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/deq_rec.h Tue Oct 22 19:09:56 2013
@@ -0,0 +1,91 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#ifndef QPID_LEGACYSTORE_JRNL_DEQ_REQ_H
+#define QPID_LEGACYSTORE_JRNL_DEQ_REQ_H
+
+namespace qpid
+{
+namespace qls_jrnl
+{
+class deq_rec;
+}}
+
+#include <cstddef>
+#include "qpid/linearstore/jrnl/utils/deq_hdr.h"
+#include "qpid/linearstore/jrnl/jrec.h"
+
+namespace qpid
+{
+namespace qls_jrnl
+{
+
+ /**
+ * \class deq_rec
+ * \brief Class to handle a single journal dequeue record.
+ */
+ class deq_rec : public jrec
+ {
+ private:
+ deq_hdr_t _deq_hdr; ///< Dequeue header
+ const void* _xidp; ///< xid pointer for encoding (writing to disk)
+ void* _buff; ///< Pointer to buffer to receive data read from disk
+ rec_tail_t _deq_tail; ///< Record tail, only encoded if XID is present
+
+ public:
+ // constructor used for read operations and xid will have memory allocated
+ deq_rec();
+ // constructor used for write operations, where xid already exists
+ deq_rec(const uint64_t rid, const uint64_t drid, const void* const xidp,
+ const std::size_t xidlen, const bool txn_coml_commit);
+ virtual ~deq_rec();
+
+ // Prepare instance for use in reading data from journal
+ void reset();
+ // Prepare instance for use in writing data to journal
+ void reset(const uint64_t rid, const uint64_t drid, const void* const xidp,
+ const std::size_t xidlen, const bool txn_coml_commit);
+ uint32_t encode(void* wptr, uint32_t rec_offs_dblks, uint32_t max_size_dblks);
+ uint32_t decode(rec_hdr_t& h, void* rptr, uint32_t rec_offs_dblks,
+ uint32_t max_size_dblks);
+ // Decode used for recover
+ bool rcv_decode(rec_hdr_t h, std::ifstream* ifsp, std::size_t& rec_offs);
+
+ inline bool is_txn_coml_commit() const { return ::is_txn_coml_commit(&_deq_hdr); }
+ inline uint64_t rid() const { return _deq_hdr._rhdr._rid; }
+ inline uint64_t deq_rid() const { return _deq_hdr._deq_rid; }
+ std::size_t get_xid(void** const xidpp);
+ std::string& str(std::string& str) const;
+ inline std::size_t data_size() const { return 0; } // This record never carries data
+ std::size_t xid_size() const;
+ std::size_t rec_size() const;
+
+ private:
+ virtual void chk_hdr() const;
+ virtual void chk_hdr(uint64_t rid) const;
+ virtual void chk_tail() const;
+ virtual void clean();
+ }; // class deq_rec
+
+} // namespace journal
+} // namespace mrg
+
+#endif // ifndef QPID_LEGACYSTORE_JRNL_DEQ_REQ_H
Added: qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/enq_map.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/enq_map.cpp?rev=1534736&view=auto
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/enq_map.cpp (added)
+++ qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/enq_map.cpp Tue Oct 22 19:09:56 2013
@@ -0,0 +1,186 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "qpid/linearstore/jrnl/enq_map.h"
+
+#include <iomanip>
+#include "qpid/linearstore/jrnl/jerrno.h"
+#include "qpid/linearstore/jrnl/slock.h"
+#include <sstream>
+
+
+namespace qpid
+{
+namespace qls_jrnl
+{
+
+// static return/error codes
+int16_t enq_map::EMAP_DUP_RID = -3;
+int16_t enq_map::EMAP_LOCKED = -2;
+int16_t enq_map::EMAP_RID_NOT_FOUND = -1;
+int16_t enq_map::EMAP_OK = 0;
+int16_t enq_map::EMAP_FALSE = 0;
+int16_t enq_map::EMAP_TRUE = 1;
+
+enq_map::enq_map():
+ _map(){}
+
+enq_map::~enq_map() {}
+
+
+short
+enq_map::insert_pfid(const uint64_t rid, const uint64_t pfid, const std::streampos file_posn)
+{
+ return insert_pfid(rid, pfid, file_posn, false);
+}
+
+short
+enq_map::insert_pfid(const uint64_t rid, const uint64_t pfid, const std::streampos file_posn, const bool locked)
+{
+ std::pair<emap_itr, bool> ret;
+ emap_data_struct_t rec(pfid, file_posn, locked);
+ {
+ slock s(_mutex);
+ ret = _map.insert(emap_param(rid, rec));
+ }
+ if (ret.second == false)
+ return EMAP_DUP_RID;
+ return EMAP_OK;
+}
+
+short
+enq_map::get_pfid(const uint64_t rid, uint64_t& pfid)
+{
+ slock s(_mutex);
+ emap_itr itr = _map.find(rid);
+ if (itr == _map.end()) // not found in map
+ return EMAP_RID_NOT_FOUND;
+ if (itr->second._lock)
+ return EMAP_LOCKED;
+ pfid = itr->second._pfid;
+ return EMAP_OK;
+}
+
+short
+enq_map::get_remove_pfid(const uint64_t rid, uint64_t& pfid, const bool txn_flag)
+{
+ slock s(_mutex);
+ emap_itr itr = _map.find(rid);
+ if (itr == _map.end()) // not found in map
+ return EMAP_RID_NOT_FOUND;
+ if (itr->second._lock && !txn_flag) // locked, but not a commit/abort
+ return EMAP_LOCKED;
+ pfid = itr->second._pfid;
+ _map.erase(itr);
+ return EMAP_OK;
+}
+
+short
+enq_map::get_file_posn(const uint64_t rid, std::streampos& file_posn) {
+ slock s(_mutex);
+ emap_itr itr = _map.find(rid);
+ if (itr == _map.end()) // not found in map
+ return EMAP_RID_NOT_FOUND;
+ if (itr->second._lock)
+ return EMAP_LOCKED;
+ file_posn = itr->second._file_posn;
+ return EMAP_OK;
+}
+
+short
+enq_map::get_data(const uint64_t rid, emap_data_struct_t& eds) {
+ slock s(_mutex);
+ emap_itr itr = _map.find(rid);
+ if (itr == _map.end()) // not found in map
+ return EMAP_RID_NOT_FOUND;
+ eds._pfid = itr->second._pfid;
+ eds._file_posn = itr->second._file_posn;
+ eds._lock = itr->second._lock;
+ return EMAP_OK;
+}
+
+bool
+enq_map::is_enqueued(const uint64_t rid, bool ignore_lock)
+{
+ slock s(_mutex);
+ emap_itr itr = _map.find(rid);
+ if (itr == _map.end()) // not found in map
+ return false;
+ if (!ignore_lock && itr->second._lock) // locked
+ return false;
+ return true;
+}
+
+short
+enq_map::lock(const uint64_t rid)
+{
+ slock s(_mutex);
+ emap_itr itr = _map.find(rid);
+ if (itr == _map.end()) // not found in map
+ return EMAP_RID_NOT_FOUND;
+ itr->second._lock = true;
+ return EMAP_OK;
+}
+
+short
+enq_map::unlock(const uint64_t rid)
+{
+ slock s(_mutex);
+ emap_itr itr = _map.find(rid);
+ if (itr == _map.end()) // not found in map
+ return EMAP_RID_NOT_FOUND;
+ itr->second._lock = false;
+ return EMAP_OK;
+}
+
+short
+enq_map::is_locked(const uint64_t rid)
+{
+ slock s(_mutex);
+ emap_itr itr = _map.find(rid);
+ if (itr == _map.end()) // not found in map
+ return EMAP_RID_NOT_FOUND;
+ return itr->second._lock ? EMAP_TRUE : EMAP_FALSE;
+}
+
+void
+enq_map::rid_list(std::vector<uint64_t>& rv)
+{
+ rv.clear();
+ {
+ slock s(_mutex);
+ for (emap_itr itr = _map.begin(); itr != _map.end(); itr++)
+ rv.push_back(itr->first);
+ }
+}
+
+void
+enq_map::pfid_list(std::vector<uint64_t>& fv)
+{
+ fv.clear();
+ {
+ slock s(_mutex);
+ for (emap_itr itr = _map.begin(); itr != _map.end(); itr++)
+ fv.push_back(itr->second._pfid);
+ }
+}
+
+}}
Added: qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/enq_map.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/enq_map.h?rev=1534736&view=auto
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/enq_map.h (added)
+++ qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/enq_map.h Tue Oct 22 19:09:56 2013
@@ -0,0 +1,112 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#ifndef QPID_LEGACYSTORE_JRNL_ENQ_MAP_H
+#define QPID_LEGACYSTORE_JRNL_ENQ_MAP_H
+
+namespace qpid
+{
+namespace qls_jrnl
+{
+class enq_map;
+}}
+
+#include "qpid/linearstore/jrnl/jexception.h"
+#include "qpid/linearstore/jrnl/smutex.h"
+#include <map>
+#include <pthread.h>
+#include <vector>
+
+namespace qpid
+{
+namespace qls_jrnl
+{
+
+ /**
+ * \class enq_map
+ * \brief Class for storing the physical file id (pfid) and a transaction locked flag for each enqueued
+ * data block using the record id (rid) as a key. This is the primary mechanism for
+ * deterimining the enqueue low water mark: if a pfid exists in this map, then there is
+ * at least one still-enqueued record in that file. (The transaction map must also be
+ * clear, however.)
+ *
+ * Map rids against pfid and lock status. As records are enqueued, they are added to this
+ * map, and as they are dequeued, they are removed. An enqueue is locked when a transactional
+ * dequeue is pending that has been neither committed nor aborted.
+ * <pre>
+ * key data
+ *
+ * rid1 --- [ pfid, txn_lock ]
+ * rid2 --- [ pfid, txn_lock ]
+ * rid3 --- [ pfid, txn_lock ]
+ * ...
+ * </pre>
+ */
+ class enq_map
+ {
+ public:
+ // return/error codes
+ static short EMAP_DUP_RID;
+ static short EMAP_LOCKED;
+ static short EMAP_RID_NOT_FOUND;
+ static short EMAP_OK;
+ static short EMAP_FALSE;
+ static short EMAP_TRUE;
+
+ typedef struct emap_data_struct_t {
+ uint64_t _pfid;
+ std::streampos _file_posn;
+ bool _lock;
+ emap_data_struct_t() : _pfid(0), _file_posn(0), _lock(false) {}
+ emap_data_struct_t(const uint64_t pfid, const std::streampos file_posn, const bool lock) : _pfid(pfid), _file_posn(file_posn), _lock(lock) {}
+ } emqp_data_struct_t;
+ typedef std::pair<uint64_t, emap_data_struct_t> emap_param;
+ typedef std::map<uint64_t, emap_data_struct_t> emap;
+ typedef emap::iterator emap_itr;
+
+ private:
+ emap _map;
+ smutex _mutex;
+
+ public:
+ enq_map();
+ virtual ~enq_map();
+
+ short insert_pfid(const uint64_t rid, const uint64_t pfid, const std::streampos file_posn); // 0=ok; -3=duplicate rid;
+ short insert_pfid(const uint64_t rid, const uint64_t pfid, const std::streampos file_posn, const bool locked); // 0=ok; -3=duplicate rid;
+ short get_pfid(const uint64_t rid, uint64_t& pfid); // >=0=pfid; -1=rid not found; -2=locked
+ short get_remove_pfid(const uint64_t rid, uint64_t& pfid, const bool txn_flag = false); // >=0=pfid; -1=rid not found; -2=locked
+ short get_file_posn(const uint64_t rid, std::streampos& file_posn); // -1=rid not found; -2=locked
+ short get_data(const uint64_t rid, emap_data_struct_t& eds);
+ bool is_enqueued(const uint64_t rid, bool ignore_lock = false);
+ short lock(const uint64_t rid); // 0=ok; -1=rid not found
+ short unlock(const uint64_t rid); // 0=ok; -1=rid not found
+ short is_locked(const uint64_t rid); // 1=true; 0=false; -1=rid not found
+ inline void clear() { _map.clear(); }
+ inline bool empty() const { return _map.empty(); }
+ inline uint32_t size() const { return uint32_t(_map.size()); }
+ void rid_list(std::vector<uint64_t>& rv);
+ void pfid_list(std::vector<uint64_t>& fv);
+ };
+
+}}
+
+#endif // ifndef QPID_LEGACYSTORE_JRNL_ENQ_MAP_H
Added: qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/enq_rec.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/enq_rec.cpp?rev=1534736&view=auto
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/enq_rec.cpp (added)
+++ qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/enq_rec.cpp Tue Oct 22 19:09:56 2013
@@ -0,0 +1,621 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "qpid/linearstore/jrnl/enq_rec.h"
+
+#include <cassert>
+#include <cerrno>
+#include <cstdlib>
+#include <cstring>
+#include <iomanip>
+#include "qpid/linearstore/jrnl/jerrno.h"
+#include "qpid/linearstore/jrnl/jexception.h"
+#include <sstream>
+
+namespace qpid
+{
+namespace qls_jrnl
+{
+
+// Constructor used for read operations, where buf contains preallocated space to receive data.
+enq_rec::enq_rec():
+ jrec(), // superclass
+ //_enq_hdr(QLS_ENQ_MAGIC, QLS_JRNL_VERSION, 0, 0, 0, false, false),
+ _xidp(0),
+ _data(0),
+ _buff(0)
+ //_enq_tail(_enq_hdr)
+{
+ ::enq_hdr_init(&_enq_hdr, QLS_ENQ_MAGIC, QLS_JRNL_VERSION, 0, 0, 0, false);
+ ::rec_tail_copy(&_enq_tail, &_enq_hdr._rhdr, 0);
+}
+
+// Constructor used for transactional write operations, where dbuf contains data to be written.
+enq_rec::enq_rec(const uint64_t rid, const void* const dbuf, const std::size_t dlen,
+ const void* const xidp, const std::size_t xidlen, const bool transient):
+ jrec(), // superclass
+ //_enq_hdr(QLS_ENQ_MAGIC, QLS_JRNL_VERSION, rid, xidlen, dlen, owi, transient),
+ _xidp(xidp),
+ _data(dbuf),
+ _buff(0)
+ //_enq_tail(_enq_hdr)
+{
+ ::enq_hdr_init(&_enq_hdr, QLS_ENQ_MAGIC, QLS_JRNL_VERSION, 0, rid, xidlen, dlen);
+ ::rec_tail_copy(&_enq_tail, &_enq_hdr._rhdr, 0);
+ ::set_enq_transient(&_enq_hdr, transient);
+}
+
+enq_rec::~enq_rec()
+{
+ clean();
+}
+
+// Prepare instance for use in reading data from journal, where buf contains preallocated space
+// to receive data.
+void
+enq_rec::reset()
+{
+ _enq_hdr._rhdr._rid = 0;
+ ::set_enq_transient(&_enq_hdr, false);
+ _enq_hdr._xidsize = 0;
+ _enq_hdr._dsize = 0;
+ _xidp = 0;
+ _data = 0;
+ _buff = 0;
+ _enq_tail._rid = 0;
+}
+
+// Prepare instance for use in writing transactional data to journal, where dbuf contains data to
+// be written.
+void
+enq_rec::reset(const uint64_t rid, const void* const dbuf, const std::size_t dlen,
+ const void* const xidp, const std::size_t xidlen, const bool transient,
+ const bool external)
+{
+ _enq_hdr._rhdr._rid = rid;
+ ::set_enq_transient(&_enq_hdr, transient);
+ ::set_enq_external(&_enq_hdr, external);
+ _enq_hdr._xidsize = xidlen;
+ _enq_hdr._dsize = dlen;
+ _xidp = xidp;
+ _data = dbuf;
+ _buff = 0;
+ _enq_tail._rid = rid;
+}
+
+uint32_t
+enq_rec::encode(void* wptr, uint32_t rec_offs_dblks, uint32_t max_size_dblks)
+{
+ assert(wptr != 0);
+ assert(max_size_dblks > 0);
+ if (_xidp == 0)
+ assert(_enq_hdr._xidsize == 0);
+
+ std::size_t rec_offs = rec_offs_dblks * QLS_DBLK_SIZE_BYTES;
+ std::size_t rem = max_size_dblks * QLS_DBLK_SIZE_BYTES;
+ std::size_t wr_cnt = 0;
+ if (rec_offs_dblks) // Continuation of split data record (over 2 or more pages)
+ {
+ if (size_dblks(rec_size()) - rec_offs_dblks > max_size_dblks) // Further split required
+ {
+ rec_offs -= sizeof(_enq_hdr);
+ std::size_t wsize = _enq_hdr._xidsize > rec_offs ? _enq_hdr._xidsize - rec_offs : 0;
+ std::size_t wsize2 = wsize;
+ if (wsize)
+ {
+ if (wsize > rem)
+ wsize = rem;
+ std::memcpy(wptr, (const char*)_xidp + rec_offs, wsize);
+ wr_cnt = wsize;
+ rem -= wsize;
+ }
+ rec_offs -= _enq_hdr._xidsize - wsize2;
+ if (rem && !::is_enq_external(&_enq_hdr))
+ {
+ wsize = _enq_hdr._dsize > rec_offs ? _enq_hdr._dsize - rec_offs : 0;
+ wsize2 = wsize;
+ if (wsize)
+ {
+ if (wsize > rem)
+ wsize = rem;
+ std::memcpy((char*)wptr + wr_cnt, (const char*)_data + rec_offs, wsize);
+ wr_cnt += wsize;
+ rem -= wsize;
+ }
+ rec_offs -= _enq_hdr._dsize - wsize2;
+ }
+ if (rem)
+ {
+ wsize = sizeof(_enq_tail) > rec_offs ? sizeof(_enq_tail) - rec_offs : 0;
+ wsize2 = wsize;
+ if (wsize)
+ {
+ if (wsize > rem)
+ wsize = rem;
+ std::memcpy((char*)wptr + wr_cnt, (char*)&_enq_tail + rec_offs, wsize);
+ wr_cnt += wsize;
+ rem -= wsize;
+ }
+ rec_offs -= sizeof(_enq_tail) - wsize2;
+ }
+ assert(rem == 0);
+ assert(rec_offs == 0);
+ }
+ else // No further split required
+ {
+ rec_offs -= sizeof(_enq_hdr);
+ std::size_t wsize = _enq_hdr._xidsize > rec_offs ? _enq_hdr._xidsize - rec_offs : 0;
+ if (wsize)
+ {
+ std::memcpy(wptr, (const char*)_xidp + rec_offs, wsize);
+ wr_cnt += wsize;
+ }
+ rec_offs -= _enq_hdr._xidsize - wsize;
+ wsize = _enq_hdr._dsize > rec_offs ? _enq_hdr._dsize - rec_offs : 0;
+ if (wsize && !::is_enq_external(&_enq_hdr))
+ {
+ std::memcpy((char*)wptr + wr_cnt, (const char*)_data + rec_offs, wsize);
+ wr_cnt += wsize;
+ }
+ rec_offs -= _enq_hdr._dsize - wsize;
+ wsize = sizeof(_enq_tail) > rec_offs ? sizeof(_enq_tail) - rec_offs : 0;
+ if (wsize)
+ {
+ std::memcpy((char*)wptr + wr_cnt, (char*)&_enq_tail + rec_offs, wsize);
+ wr_cnt += wsize;
+#ifdef QLS_CLEAN
+ std::size_t rec_offs = rec_offs_dblks * QLS_DBLK_SIZE_BYTES;
+ std::size_t dblk_rec_size = size_dblks(rec_size() - rec_offs) * QLS_DBLK_SIZE_BYTES;
+ std::memset((char*)wptr + wr_cnt, QLS_CLEAN_CHAR, dblk_rec_size - wr_cnt);
+#endif
+ }
+ rec_offs -= sizeof(_enq_tail) - wsize;
+ assert(rec_offs == 0);
+ }
+ }
+ else // Start at beginning of data record
+ {
+ // Assumption: the header will always fit into the first dblk
+ std::memcpy(wptr, (void*)&_enq_hdr, sizeof(_enq_hdr));
+ wr_cnt = sizeof(_enq_hdr);
+ if (size_dblks(rec_size()) > max_size_dblks) // Split required
+ {
+ std::size_t wsize;
+ rem -= sizeof(_enq_hdr);
+ if (rem)
+ {
+ wsize = rem >= _enq_hdr._xidsize ? _enq_hdr._xidsize : rem;
+ std::memcpy((char*)wptr + wr_cnt, _xidp, wsize);
+ wr_cnt += wsize;
+ rem -= wsize;
+ }
+ if (rem && !::is_enq_external(&_enq_hdr))
+ {
+ wsize = rem >= _enq_hdr._dsize ? _enq_hdr._dsize : rem;
+ std::memcpy((char*)wptr + wr_cnt, _data, wsize);
+ wr_cnt += wsize;
+ rem -= wsize;
+ }
+ if (rem)
+ {
+ wsize = rem >= sizeof(_enq_tail) ? sizeof(_enq_tail) : rem;
+ std::memcpy((char*)wptr + wr_cnt, (void*)&_enq_tail, wsize);
+ wr_cnt += wsize;
+ rem -= wsize;
+ }
+ assert(rem == 0);
+ }
+ else // No split required
+ {
+ if (_enq_hdr._xidsize)
+ {
+ std::memcpy((char*)wptr + wr_cnt, _xidp, _enq_hdr._xidsize);
+ wr_cnt += _enq_hdr._xidsize;
+ }
+ if (!::is_enq_external(&_enq_hdr))
+ {
+ std::memcpy((char*)wptr + wr_cnt, _data, _enq_hdr._dsize);
+ wr_cnt += _enq_hdr._dsize;
+ }
+ std::memcpy((char*)wptr + wr_cnt, (void*)&_enq_tail, sizeof(_enq_tail));
+ wr_cnt += sizeof(_enq_tail);
+#ifdef QLS_CLEAN
+ std::size_t dblk_rec_size = size_dblks(rec_size()) * QLS_DBLK_SIZE_BYTES;
+ std::memset((char*)wptr + wr_cnt, QLS_CLEAN_CHAR, dblk_rec_size - wr_cnt);
+#endif
+ }
+ }
+ return size_dblks(wr_cnt);
+}
+
+uint32_t
+enq_rec::decode(rec_hdr_t& h, void* rptr, uint32_t rec_offs_dblks, uint32_t max_size_dblks)
+{
+ assert(rptr != 0);
+ assert(max_size_dblks > 0);
+
+ std::size_t rd_cnt = 0;
+ if (rec_offs_dblks) // Continuation of record on new page
+ {
+ const uint32_t hdr_xid_data_size = sizeof(enq_hdr_t) + _enq_hdr._xidsize +
+ (::is_enq_external(&_enq_hdr) ? 0 : _enq_hdr._dsize);
+ const uint32_t hdr_xid_data_tail_size = hdr_xid_data_size + sizeof(rec_tail_t);
+ const uint32_t hdr_data_dblks = size_dblks(hdr_xid_data_size);
+ const uint32_t hdr_tail_dblks = size_dblks(hdr_xid_data_tail_size);
+ const std::size_t rec_offs = rec_offs_dblks * QLS_DBLK_SIZE_BYTES;
+ const std::size_t offs = rec_offs - sizeof(enq_hdr_t);
+
+ if (hdr_tail_dblks - rec_offs_dblks <= max_size_dblks)
+ {
+ // Remainder of record fits within this page
+ if (offs < _enq_hdr._xidsize)
+ {
+ // some XID still outstanding, copy remainder of XID, data and tail
+ const std::size_t rem = _enq_hdr._xidsize + _enq_hdr._dsize - offs;
+ std::memcpy((char*)_buff + offs, rptr, rem);
+ rd_cnt += rem;
+ std::memcpy((void*)&_enq_tail, ((char*)rptr + rd_cnt), sizeof(_enq_tail));
+ chk_tail();
+ rd_cnt += sizeof(_enq_tail);
+ }
+ else if (offs < _enq_hdr._xidsize + _enq_hdr._dsize && !::is_enq_external(&_enq_hdr))
+ {
+ // some data still outstanding, copy remainder of data and tail
+ const std::size_t data_offs = offs - _enq_hdr._xidsize;
+ const std::size_t data_rem = _enq_hdr._dsize - data_offs;
+ std::memcpy((char*)_buff + offs, rptr, data_rem);
+ rd_cnt += data_rem;
+ std::memcpy((void*)&_enq_tail, ((char*)rptr + rd_cnt), sizeof(_enq_tail));
+ chk_tail();
+ rd_cnt += sizeof(_enq_tail);
+ }
+ else
+ {
+ // Tail or part of tail only outstanding, complete tail
+ const std::size_t tail_offs = rec_offs - sizeof(enq_hdr_t) - _enq_hdr._xidsize -
+ _enq_hdr._dsize;
+ const std::size_t tail_rem = sizeof(rec_tail_t) - tail_offs;
+ std::memcpy((char*)&_enq_tail + tail_offs, rptr, tail_rem);
+ chk_tail();
+ rd_cnt = tail_rem;
+ }
+ }
+ else if (hdr_data_dblks - rec_offs_dblks <= max_size_dblks)
+ {
+ // Remainder of xid & data fits within this page; tail split
+
+ /*
+ * TODO: This section needs revision. Since it is known that the end of the page falls within the
+ * tail record, it is only necessary to write from the current offset to the end of the page under
+ * all circumstances. The multiple if/else combinations may be eliminated, as well as one memcpy()
+ * operation.
+ *
+ * Also note that Coverity has detected a possible memory overwrite in this block. It occurs if
+ * both the following two if() stmsts (numbered) are false. With rd_cnt = 0, this would result in
+ * the value of tail_rem > sizeof(tail_rec). Practically, this could only happen if the start and
+ * end of a page both fall within the same tail record, in which case the tail would have to be
+ * (much!) larger. However, the logic here does not account for this possibility.
+ *
+ * If the optimization above is undertaken, this code would probably be removed.
+ */
+ if (offs < _enq_hdr._xidsize) // 1
+ {
+ // some XID still outstanding, copy remainder of XID and data
+ const std::size_t rem = _enq_hdr._xidsize + _enq_hdr._dsize - offs;
+ std::memcpy((char*)_buff + offs, rptr, rem);
+ rd_cnt += rem;
+ }
+ else if (offs < _enq_hdr._xidsize + _enq_hdr._dsize && !::is_enq_external(&_enq_hdr)) // 2
+ {
+ // some data still outstanding, copy remainder of data
+ const std::size_t data_offs = offs - _enq_hdr._xidsize;
+ const std::size_t data_rem = _enq_hdr._dsize - data_offs;
+ std::memcpy((char*)_buff + offs, rptr, data_rem);
+ rd_cnt += data_rem;
+ }
+ const std::size_t tail_rem = (max_size_dblks * QLS_DBLK_SIZE_BYTES) - rd_cnt;
+ if (tail_rem)
+ {
+ std::memcpy((void*)&_enq_tail, ((char*)rptr + rd_cnt), tail_rem);
+ rd_cnt += tail_rem;
+ }
+ }
+ else
+ {
+ // Since xid and data are contiguous, both fit within current page - copy whole page
+ const std::size_t data_cp_size = (max_size_dblks * QLS_DBLK_SIZE_BYTES);
+ std::memcpy((char*)_buff + offs, rptr, data_cp_size);
+ rd_cnt += data_cp_size;
+ }
+ }
+ else // Start of record
+ {
+ // Get and check header
+ //_enq_hdr.hdr_copy(h);
+ ::rec_hdr_copy(&_enq_hdr._rhdr, &h);
+ rd_cnt = sizeof(rec_hdr_t);
+ _enq_hdr._xidsize = *(std::size_t*)((char*)rptr + rd_cnt);
+ rd_cnt += sizeof(std::size_t);
+#if defined(JRNL_32_BIT)
+ rd_cnt += sizeof(uint32_t); // Filler 0
+#endif
+ _enq_hdr._dsize = *(std::size_t*)((char*)rptr + rd_cnt);
+ rd_cnt = sizeof(enq_hdr_t);
+ chk_hdr();
+ if (_enq_hdr._xidsize + (::is_enq_external(&_enq_hdr) ? 0 : _enq_hdr._dsize))
+ {
+ _buff = std::malloc(_enq_hdr._xidsize + (::is_enq_external(&_enq_hdr) ? 0 : _enq_hdr._dsize));
+ MALLOC_CHK(_buff, "_buff", "enq_rec", "decode");
+
+ const uint32_t hdr_xid_size = sizeof(enq_hdr_t) + _enq_hdr._xidsize;
+ const uint32_t hdr_xid_data_size = hdr_xid_size + (::is_enq_external(&_enq_hdr) ? 0 : _enq_hdr._dsize);
+ const uint32_t hdr_xid_data_tail_size = hdr_xid_data_size + sizeof(rec_tail_t);
+ const uint32_t hdr_xid_dblks = size_dblks(hdr_xid_size);
+ const uint32_t hdr_data_dblks = size_dblks(hdr_xid_data_size);
+ const uint32_t hdr_tail_dblks = size_dblks(hdr_xid_data_tail_size);
+ // Check if record (header + data + tail) fits within this page, we can check the
+ // tail before the expense of copying data to memory
+ if (hdr_tail_dblks <= max_size_dblks)
+ {
+ // Header, xid, data and tail fits within this page
+ if (_enq_hdr._xidsize)
+ {
+ std::memcpy(_buff, (char*)rptr + rd_cnt, _enq_hdr._xidsize);
+ rd_cnt += _enq_hdr._xidsize;
+ }
+ if (_enq_hdr._dsize && !::is_enq_external(&_enq_hdr))
+ {
+ std::memcpy((char*)_buff + _enq_hdr._xidsize, (char*)rptr + rd_cnt,
+ _enq_hdr._dsize);
+ rd_cnt += _enq_hdr._dsize;
+ }
+ std::memcpy((void*)&_enq_tail, (char*)rptr + rd_cnt, sizeof(_enq_tail));
+ chk_tail();
+ rd_cnt += sizeof(_enq_tail);
+ }
+ else if (hdr_data_dblks <= max_size_dblks)
+ {
+ // Header, xid and data fit within this page, tail split or separated
+ if (_enq_hdr._xidsize)
+ {
+ std::memcpy(_buff, (char*)rptr + rd_cnt, _enq_hdr._xidsize);
+ rd_cnt += _enq_hdr._xidsize;
+ }
+ if (_enq_hdr._dsize && !::is_enq_external(&_enq_hdr))
+ {
+ std::memcpy((char*)_buff + _enq_hdr._xidsize, (char*)rptr + rd_cnt,
+ _enq_hdr._dsize);
+ rd_cnt += _enq_hdr._dsize;
+ }
+ const std::size_t tail_rem = (max_size_dblks * QLS_DBLK_SIZE_BYTES) - rd_cnt;
+ if (tail_rem)
+ {
+ std::memcpy((void*)&_enq_tail, (char*)rptr + rd_cnt, tail_rem);
+ rd_cnt += tail_rem;
+ }
+ }
+ else if (hdr_xid_dblks <= max_size_dblks)
+ {
+ // Header and xid fits within this page, data split or separated
+ if (_enq_hdr._xidsize)
+ {
+ std::memcpy(_buff, (char*)rptr + rd_cnt, _enq_hdr._xidsize);
+ rd_cnt += _enq_hdr._xidsize;
+ }
+ if (_enq_hdr._dsize && !::is_enq_external(&_enq_hdr))
+ {
+ const std::size_t data_cp_size = (max_size_dblks * QLS_DBLK_SIZE_BYTES) - rd_cnt;
+ std::memcpy((char*)_buff + _enq_hdr._xidsize, (char*)rptr + rd_cnt, data_cp_size);
+ rd_cnt += data_cp_size;
+ }
+ }
+ else
+ {
+ // Header fits within this page, xid split or separated
+ const std::size_t data_cp_size = (max_size_dblks * QLS_DBLK_SIZE_BYTES) - rd_cnt;
+ std::memcpy(_buff, (char*)rptr + rd_cnt, data_cp_size);
+ rd_cnt += data_cp_size;
+ }
+ }
+ }
+ return size_dblks(rd_cnt);
+}
+
+bool
+enq_rec::rcv_decode(rec_hdr_t h, std::ifstream* ifsp, std::size_t& rec_offs)
+{
+ if (rec_offs == 0)
+ {
+ // Read header, allocate (if req'd) for xid
+ //_enq_hdr.hdr_copy(h);
+ ::rec_hdr_copy(&_enq_hdr._rhdr, &h);
+ ifsp->read((char*)&_enq_hdr._xidsize, sizeof(std::size_t));
+#if defined(JRNL_32_BIT)
+ ifsp->ignore(sizeof(uint32_t)); // _filler0
+#endif
+ ifsp->read((char*)&_enq_hdr._dsize, sizeof(std::size_t));
+#if defined(JRNL_32_BIT)
+ ifsp->ignore(sizeof(uint32_t)); // _filler1
+#endif
+ rec_offs = sizeof(_enq_hdr);
+ if (_enq_hdr._xidsize)
+ {
+ _buff = std::malloc(_enq_hdr._xidsize);
+ MALLOC_CHK(_buff, "_buff", "enq_rec", "rcv_decode");
+ }
+ }
+ if (rec_offs < sizeof(_enq_hdr) + _enq_hdr._xidsize)
+ {
+ // Read xid (or continue reading xid)
+ std::size_t offs = rec_offs - sizeof(_enq_hdr);
+ ifsp->read((char*)_buff + offs, _enq_hdr._xidsize - offs);
+ std::size_t size_read = ifsp->gcount();
+ rec_offs += size_read;
+ if (size_read < _enq_hdr._xidsize - offs)
+ {
+ assert(ifsp->eof());
+ // As we may have read past eof, turn off fail bit
+ ifsp->clear(ifsp->rdstate()&(~std::ifstream::failbit));
+ assert(!ifsp->fail() && !ifsp->bad());
+ return false;
+ }
+ }
+ if (!::is_enq_external(&_enq_hdr))
+ {
+ if (rec_offs < sizeof(_enq_hdr) + _enq_hdr._xidsize + _enq_hdr._dsize)
+ {
+ // Ignore data (or continue ignoring data)
+ std::size_t offs = rec_offs - sizeof(_enq_hdr) - _enq_hdr._xidsize;
+ ifsp->ignore(_enq_hdr._dsize - offs);
+ std::size_t size_read = ifsp->gcount();
+ rec_offs += size_read;
+ if (size_read < _enq_hdr._dsize - offs)
+ {
+ assert(ifsp->eof());
+ // As we may have read past eof, turn off fail bit
+ ifsp->clear(ifsp->rdstate()&(~std::ifstream::failbit));
+ assert(!ifsp->fail() && !ifsp->bad());
+ return false;
+ }
+ }
+ }
+ if (rec_offs < sizeof(_enq_hdr) + _enq_hdr._xidsize +
+ (::is_enq_external(&_enq_hdr) ? 0 : _enq_hdr._dsize) + sizeof(rec_tail_t))
+ {
+ // Read tail (or continue reading tail)
+ std::size_t offs = rec_offs - sizeof(_enq_hdr) - _enq_hdr._xidsize;
+ if (!::is_enq_external(&_enq_hdr))
+ offs -= _enq_hdr._dsize;
+ ifsp->read((char*)&_enq_tail + offs, sizeof(rec_tail_t) - offs);
+ std::size_t size_read = ifsp->gcount();
+ rec_offs += size_read;
+ if (size_read < sizeof(rec_tail_t) - offs)
+ {
+ assert(ifsp->eof());
+ // As we may have read past eof, turn off fail bit
+ ifsp->clear(ifsp->rdstate()&(~std::ifstream::failbit));
+ assert(!ifsp->fail() && !ifsp->bad());
+ return false;
+ }
+ }
+ ifsp->ignore(rec_size_dblks() * QLS_DBLK_SIZE_BYTES - rec_size());
+ chk_tail(); // Throws if tail invalid or record incomplete
+ assert(!ifsp->fail() && !ifsp->bad());
+ return true;
+}
+
+std::size_t
+enq_rec::get_xid(void** const xidpp)
+{
+ if (!_buff || !_enq_hdr._xidsize)
+ {
+ *xidpp = 0;
+ return 0;
+ }
+ *xidpp = _buff;
+ return _enq_hdr._xidsize;
+}
+
+std::size_t
+enq_rec::get_data(void** const datapp)
+{
+ if (!_buff)
+ {
+ *datapp = 0;
+ return 0;
+ }
+ if (::is_enq_external(&_enq_hdr))
+ *datapp = 0;
+ else
+ *datapp = (void*)((char*)_buff + _enq_hdr._xidsize);
+ return _enq_hdr._dsize;
+}
+
+std::string&
+enq_rec::str(std::string& str) const
+{
+ std::ostringstream oss;
+ oss << "enq_rec: m=" << _enq_hdr._rhdr._magic;
+ oss << " v=" << (int)_enq_hdr._rhdr._version;
+ oss << " rid=" << _enq_hdr._rhdr._rid;
+ if (_xidp)
+ oss << " xid=\"" << _xidp << "\"";
+ oss << " len=" << _enq_hdr._dsize;
+ str.append(oss.str());
+ return str;
+}
+
+std::size_t
+enq_rec::rec_size() const
+{
+ return rec_size(_enq_hdr._xidsize, _enq_hdr._dsize, ::is_enq_external(&_enq_hdr));
+}
+
+std::size_t
+enq_rec::rec_size(const std::size_t xidsize, const std::size_t dsize, const bool external)
+{
+ if (external)
+ return sizeof(enq_hdr_t) + xidsize + sizeof(rec_tail_t);
+ return sizeof(enq_hdr_t) + xidsize + dsize + sizeof(rec_tail_t);
+}
+
+void
+enq_rec::set_rid(const uint64_t rid)
+{
+ _enq_hdr._rhdr._rid = rid;
+ _enq_tail._rid = rid;
+}
+
+void
+enq_rec::chk_hdr() const
+{
+ jrec::chk_hdr(_enq_hdr._rhdr);
+ if (_enq_hdr._rhdr._magic != QLS_ENQ_MAGIC)
+ {
+ std::ostringstream oss;
+ oss << std::hex << std::setfill('0');
+ oss << "enq magic: rid=0x" << std::setw(16) << _enq_hdr._rhdr._rid;
+ oss << ": expected=0x" << std::setw(8) << QLS_ENQ_MAGIC;
+ oss << " read=0x" << std::setw(2) << (int)_enq_hdr._rhdr._magic;
+ throw jexception(jerrno::JERR_JREC_BADRECHDR, oss.str(), "enq_rec", "chk_hdr");
+ }
+}
+
+void
+enq_rec::chk_hdr(uint64_t rid) const
+{
+ chk_hdr();
+ jrec::chk_rid(_enq_hdr._rhdr, rid);
+}
+
+void
+enq_rec::chk_tail() const
+{
+ jrec::chk_tail(_enq_tail, _enq_hdr._rhdr);
+}
+
+void
+enq_rec::clean()
+{
+ // clean up allocated memory here
+}
+
+}}
Added: qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/enq_rec.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/enq_rec.h?rev=1534736&view=auto
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/enq_rec.h (added)
+++ qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/enq_rec.h Tue Oct 22 19:09:56 2013
@@ -0,0 +1,104 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#ifndef QPID_LEGACYSTORE_JRNL_ENQ_REC_H
+#define QPID_LEGACYSTORE_JRNL_ENQ_REC_H
+
+namespace qpid
+{
+namespace qls_jrnl
+{
+class enq_rec;
+}}
+
+#include <cstddef>
+#include "qpid/linearstore/jrnl/utils/enq_hdr.h"
+#include "qpid/linearstore/jrnl/jrec.h"
+
+namespace qpid
+{
+namespace qls_jrnl
+{
+
+ /**
+ * \class enq_rec
+ * \brief Class to handle a single journal enqueue record.
+ */
+ class enq_rec : public jrec
+ {
+ private:
+ enq_hdr_t _enq_hdr;
+ const void* _xidp; ///< xid pointer for encoding (for writing to disk)
+ const void* _data; ///< Pointer to data to be written to disk
+ void* _buff; ///< Pointer to buffer to receive data read from disk
+ rec_tail_t _enq_tail;
+
+ public:
+ /**
+ * \brief Constructor used for read operations.
+ */
+ enq_rec();
+
+ /**
+ * \brief Constructor used for write operations, where mbuf contains data to be written.
+ */
+ enq_rec(const uint64_t rid, const void* const dbuf, const std::size_t dlen,
+ const void* const xidp, const std::size_t xidlen, const bool transient);
+
+ /**
+ * \brief Destructor
+ */
+ virtual ~enq_rec();
+
+ // Prepare instance for use in reading data from journal, xid and data will be allocated
+ void reset();
+ // Prepare instance for use in writing data to journal
+ void reset(const uint64_t rid, const void* const dbuf, const std::size_t dlen,
+ const void* const xidp, const std::size_t xidlen, const bool transient,
+ const bool external);
+
+ uint32_t encode(void* wptr, uint32_t rec_offs_dblks, uint32_t max_size_dblks);
+ uint32_t decode(rec_hdr_t& h, void* rptr, uint32_t rec_offs_dblks, uint32_t max_size_dblks);
+ // Decode used for recover
+ bool rcv_decode(rec_hdr_t h, std::ifstream* ifsp, std::size_t& rec_offs);
+
+ std::size_t get_xid(void** const xidpp);
+ std::size_t get_data(void** const datapp);
+ inline bool is_transient() const { return ::is_enq_transient(&_enq_hdr); }
+ inline bool is_external() const { return ::is_enq_external(&_enq_hdr); }
+ std::string& str(std::string& str) const;
+ inline std::size_t data_size() const { return _enq_hdr._dsize; }
+ inline std::size_t xid_size() const { return _enq_hdr._xidsize; }
+ std::size_t rec_size() const;
+ static std::size_t rec_size(const std::size_t xidsize, const std::size_t dsize, const bool external);
+ inline uint64_t rid() const { return _enq_hdr._rhdr._rid; }
+ void set_rid(const uint64_t rid);
+
+ private:
+ void chk_hdr() const;
+ void chk_hdr(uint64_t rid) const;
+ void chk_tail() const;
+ virtual void clean();
+ }; // class enq_rec
+
+}}
+
+#endif // ifndef QPID_LEGACYSTORE_JRNL_ENQ_REC_H
Added: qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/enums.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/enums.h?rev=1534736&view=auto
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/enums.h (added)
+++ qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/enums.h Tue Oct 22 19:09:56 2013
@@ -0,0 +1,99 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#ifndef QPID_LINEARSTORE_JRNL_ENUMS_H
+#define QPID_LINEARSTORE_JRNL_ENUMS_H
+
+namespace qpid
+{
+namespace qls_jrnl
+{
+
+ // TODO: Change this to flags, as multiple of these conditions may exist simultaneously
+ /**
+ * \brief Enumeration of possible return states from journal read and write operations.
+ */
+ enum _iores
+ {
+ RHM_IORES_SUCCESS = 0, ///< Success: IO operation completed noramlly.
+ RHM_IORES_PAGE_AIOWAIT, ///< IO operation suspended - next page is waiting for AIO.
+ RHM_IORES_FILE_AIOWAIT, ///< IO operation suspended - next file is waiting for AIO.
+ RHM_IORES_EMPTY, ///< During read operations, nothing further is available to read.
+// RHM_IORES_RCINVALID, ///< Read page cache is invalid (ie obsolete or uninitialized)
+// RHM_IORES_ENQCAPTHRESH, ///< Enqueue capacity threshold (limit) reached.
+// RHM_IORES_FULL, ///< During write operations, the journal files are full.
+// RHM_IORES_BUSY, ///< Another blocking operation is in progress.
+ RHM_IORES_TXPENDING ///< Operation blocked by pending transaction.
+// RHM_IORES_NOTIMPL ///< Function is not implemented.
+ };
+ typedef _iores iores;
+
+ static inline const char* iores_str(iores res)
+ {
+ switch (res)
+ {
+ case RHM_IORES_SUCCESS: return "RHM_IORES_SUCCESS";
+ case RHM_IORES_PAGE_AIOWAIT: return "RHM_IORES_PAGE_AIOWAIT";
+ case RHM_IORES_FILE_AIOWAIT: return "RHM_IORES_FILE_AIOWAIT";
+ case RHM_IORES_EMPTY: return "RHM_IORES_EMPTY";
+// case RHM_IORES_RCINVALID: return "RHM_IORES_RCINVALID";
+// case RHM_IORES_ENQCAPTHRESH: return "RHM_IORES_ENQCAPTHRESH";
+// case RHM_IORES_FULL: return "RHM_IORES_FULL";
+// case RHM_IORES_BUSY: return "RHM_IORES_BUSY";
+ case RHM_IORES_TXPENDING: return "RHM_IORES_TXPENDING";
+// case RHM_IORES_NOTIMPL: return "RHM_IORES_NOTIMPL";
+ }
+ return "<iores unknown>";
+ }
+
+/*
+ enum _log_level
+ {
+ LOG_TRACE = 0,
+ LOG_DEBUG,
+ LOG_INFO,
+ LOG_NOTICE,
+ LOG_WARN,
+ LOG_ERROR,
+ LOG_CRITICAL
+ };
+ typedef _log_level log_level_t;
+
+ static inline const char* log_level_str(log_level_t ll)
+ {
+ switch (ll)
+ {
+ case LOG_TRACE: return "TRACE";
+ case LOG_DEBUG: return "DEBUG";
+ case LOG_INFO: return "INFO";
+ case LOG_NOTICE: return "NOTICE";
+ case LOG_WARN: return "WARN";
+ case LOG_ERROR: return "ERROR";
+ case LOG_CRITICAL: return "CRITICAL";
+ }
+ return "<log level unknown>";
+ }
+*/
+
+
+}}
+
+#endif // ifndef QPID_LINEARSTORE_JRNL_ENUMS_H
Added: qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/jcfg.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/jcfg.h?rev=1534736&view=auto
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/jcfg.h (added)
+++ qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/jcfg.h Tue Oct 22 19:09:56 2013
@@ -0,0 +1,58 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#ifndef QPID_LEGACYSTORE_JRNL_JCFG_H
+#define QPID_LEGACYSTORE_JRNL_JCFG_H
+
+#define QLS_SBLK_SIZE_BYTES 4096 /**< Disk softblock size in bytes, should match size used on disk media */
+#define QLS_AIO_ALIGN_BOUNDARY_BYTES QLS_SBLK_SIZE_BYTES /** Memory alignment boundary used for DMA */
+/**
+* <b>Rule:</b> Data block size (QLS_DBLK_SIZE_BYTES) MUST be a power of 2 AND
+* a power of 2 factor of the disk softblock size (QLS_SBLK_SIZE_BYTES):
+* <pre>
+* n * QLS_DBLK_SIZE_BYTES == QLS_SBLK_SIZE_BYTES (n = 1,2,4,8...)
+* </pre>
+*/
+#define QLS_DBLK_SIZE_BYTES 128 /**< Data block size in bytes (CANNOT BE LESS THAN 32!) */
+#define QLS_SBLK_SIZE_DBLKS (QLS_SBLK_SIZE_BYTES / QLS_DBLK_SIZE_BYTES) /**< Disk softblock size in multiples of QLS_DBLK_SIZE_BYTES */
+#define QLS_SBLK_SIZE_KIB (QLS_SBLK_SIZE_BYTES / 1024) /**< Disk softblock size in KiB */
+
+#define QLS_WMGR_DEF_PAGE_SIZE_KIB 32 /**< Journal write page size in KiB (default) */
+#define QLS_WMGR_DEF_PAGE_SIZE_SBLKS (QLS_WMGR_DEF_PAGE_SIZE_KIB / QLS_SBLK_SIZE_KIB) /**< Journal write page size in softblocks (default) */
+#define QLS_WMGR_DEF_PAGES 32 /**< Number of pages to use in wmgr (default) */
+
+#define QLS_WMGR_MAXDTOKPP 1024 /**< Max. dtoks (data blocks) per page in wmgr */
+#define QLS_WMGR_MAXWAITUS 100 /**< Max. wait time (us) before submitting AIO */
+
+#define QLS_JRNL_FILE_EXTENSION ".jrnl" /**< Extension for journal data files */
+#define QLS_TXA_MAGIC 0x61534c51 /**< ("RHMa" in little endian) Magic for dtx abort hdrs */
+#define QLS_TXC_MAGIC 0x63534c51 /**< ("RHMc" in little endian) Magic for dtx commit hdrs */
+#define QLS_DEQ_MAGIC 0x64534c51 /**< ("QLSd" in little endian) Magic for deq rec hdrs */
+#define QLS_ENQ_MAGIC 0x65534c51 /**< ("QLSe" in little endian) Magic for enq rec hdrs */
+#define QLS_FILE_MAGIC 0x66534c51 /**< ("QLSf" in little endian) Magic for file hdrs */
+#define QLS_EMPTY_MAGIC 0x78534c51 /**< ("QLSx" in little endian) Magic for empty dblk */
+#define QLS_JRNL_VERSION 2 /**< Version (of file layout) */
+#define QLS_JRNL_FHDR_RES_SIZE_SBLKS 1 /**< Journal file header reserved size in sblks (as defined by QLS_SBLK_SIZE_BYTES) */
+
+#define QLS_CLEAN /**< If defined, writes QLS_CLEAN_CHAR to all filled areas on disk */
+#define QLS_CLEAN_CHAR 0xff /**< Char used to clear empty space on disk */
+
+#endif /* ifndef QPID_LEGACYSTORE_JRNL_JCFG_H */
Added: qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/jcntl.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/jcntl.cpp?rev=1534736&view=auto
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/jcntl.cpp (added)
+++ qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/jcntl.cpp Tue Oct 22 19:09:56 2013
@@ -0,0 +1,549 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "qpid/linearstore/jrnl/jcntl.h"
+
+#include <algorithm>
+#include <cassert>
+#include <cerrno>
+#include <cstdlib>
+#include <cstring>
+#include <fstream>
+#include <iomanip>
+#include <iostream>
+#include <qpid/linearstore/jrnl/EmptyFilePool.h>
+#include <qpid/linearstore/jrnl/EmptyFilePoolManager.h>
+#include "qpid/linearstore/jrnl/jerrno.h"
+#include "qpid/linearstore/jrnl/JournalLog.h"
+#include "qpid/linearstore/jrnl/utils/enq_hdr.h"
+#include <limits>
+#include <sstream>
+#include <unistd.h>
+
+namespace qpid
+{
+namespace qls_jrnl
+{
+
+#define AIO_CMPL_TIMEOUT_SEC 5
+#define AIO_CMPL_TIMEOUT_NSEC 0
+#define FINAL_AIO_CMPL_TIMEOUT_SEC 15
+#define FINAL_AIO_CMPL_TIMEOUT_NSEC 0
+
+// Static
+timespec jcntl::_aio_cmpl_timeout; ///< Timeout for blocking libaio returns
+timespec jcntl::_final_aio_cmpl_timeout; ///< Timeout for blocking libaio returns when stopping or finalizing
+bool jcntl::_init = init_statics();
+bool jcntl::init_statics()
+{
+ _aio_cmpl_timeout.tv_sec = AIO_CMPL_TIMEOUT_SEC;
+ _aio_cmpl_timeout.tv_nsec = AIO_CMPL_TIMEOUT_NSEC;
+ _final_aio_cmpl_timeout.tv_sec = FINAL_AIO_CMPL_TIMEOUT_SEC;
+ _final_aio_cmpl_timeout.tv_nsec = FINAL_AIO_CMPL_TIMEOUT_NSEC;
+ return true;
+}
+
+
+// Functions
+
+jcntl::jcntl(const std::string& jid,
+ const std::string& jdir,
+ JournalLog& jrnl_log):
+ _jid(jid),
+ _jdir(jdir),
+ _init_flag(false),
+ _stop_flag(false),
+ _readonly_flag(false),
+ _jrnl_log(jrnl_log),
+ _linearFileController(*this),
+ _emptyFilePoolPtr(0),
+ _emap(),
+ _tmap(),
+ _wmgr(this, _emap, _tmap, _linearFileController),
+ _recoveryManager(_jdir.dirname(), _jid, _emap, _tmap, jrnl_log)
+{}
+
+jcntl::~jcntl()
+{
+ if (_init_flag && !_stop_flag)
+ try { stop(true); }
+ catch (const jexception& e) { std::cerr << e << std::endl; }
+ _linearFileController.finalize();
+}
+
+void
+jcntl::initialize(EmptyFilePool* efpp,
+ const uint16_t wcache_num_pages,
+ const uint32_t wcache_pgsize_sblks,
+ aio_callback* const cbp)
+{
+ _init_flag = false;
+ _stop_flag = false;
+ _readonly_flag = false;
+
+ _emap.clear();
+ _tmap.clear();
+
+ _linearFileController.finalize();
+
+// _lpmgr.finalize();
+
+ // Set new file geometry parameters
+// assert(num_jfiles >= JRNL_MIN_NUM_FILES);
+// assert(num_jfiles <= JRNL_MAX_NUM_FILES);
+// _emap.set_num_jfiles(num_jfiles);
+// _tmap.set_num_jfiles(num_jfiles);
+
+// assert(jfsize_sblks >= JRNL_MIN_FILE_SIZE);
+// assert(jfsize_sblks <= JRNL_MAX_FILE_SIZE);
+// _jfsize_sblks = jfsize_sblks;
+
+ // Clear any existing journal files
+ _jdir.clear_dir();
+// _lpmgr.initialize(num_jfiles, ae, ae_max_jfiles, this, &new_fcntl); // Creates new journal files
+
+ _linearFileController.initialize(_jdir.dirname(), efpp, 0ULL);
+ _linearFileController.pullEmptyFileFromEfp();
+// std::cout << _linearFileController.status(2);
+// _wrfc.initialize(_jfsize_sblks);
+// _rrfc.initialize();
+// _rrfc.set_findex(0);
+// _rmgr.initialize(cbp);
+ _wmgr.initialize(cbp, wcache_pgsize_sblks, wcache_num_pages, QLS_WMGR_MAXDTOKPP, QLS_WMGR_MAXWAITUS);
+
+ // Write info file (<basename>.jinf) to disk
+// write_infofile();
+
+ _init_flag = true;
+}
+
+void
+jcntl::recover(EmptyFilePoolManager* efpmp,
+ const uint16_t wcache_num_pages,
+ const uint32_t wcache_pgsize_sblks,
+ aio_callback* const cbp,
+ const std::vector<std::string>* prep_txn_list_ptr,
+ uint64_t& highest_rid)
+{
+ _init_flag = false;
+ _stop_flag = false;
+ _readonly_flag = false;
+
+ _emap.clear();
+ _tmap.clear();
+
+ _linearFileController.finalize();
+
+// _lpmgr.finalize();
+
+// assert(num_jfiles >= JRNL_MIN_NUM_FILES);
+// assert(num_jfiles <= JRNL_MAX_NUM_FILES);
+// assert(jfsize_sblks >= JRNL_MIN_FILE_SIZE);
+// assert(jfsize_sblks <= JRNL_MAX_FILE_SIZE);
+// _jfsize_sblks = jfsize_sblks;
+
+ // Verify journal dir and journal files
+ _jdir.verify_dir();
+// _rcvdat.reset(num_jfiles/*, ae, ae_max_jfiles*/);
+
+// rcvr_janalyze(prep_txn_list_ptr, efpm);
+ efpIdentity_t efpIdentity;
+ _recoveryManager.analyzeJournals(prep_txn_list_ptr, efpmp, &_emptyFilePoolPtr);
+
+ highest_rid = _recoveryManager.getHighestRecordId();
+// if (_rcvdat._jfull)
+// throw jexception(jerrno::JERR_JCNTL_RECOVERJFULL, "jcntl", "recover");
+ _jrnl_log.log(/*LOG_DEBUG*/JournalLog::LOG_INFO, _jid, _recoveryManager.toString(_jid, true));
+
+// _lpmgr.recover(_rcvdat, this, &new_fcntl);
+
+ _linearFileController.initialize(_jdir.dirname(), _emptyFilePoolPtr, _recoveryManager.getHighestFileNumber());
+// _linearFileController.setFileNumberCounter(_recoveryManager.getHighestFileNumber());
+ _recoveryManager.setLinearFileControllerJournals(&qpid::qls_jrnl::LinearFileController::addJournalFile, &_linearFileController);
+// _wrfc.initialize(_jfsize_sblks, &_rcvdat);
+// _rrfc.initialize();
+// _rrfc.set_findex(_rcvdat.ffid());
+// _rmgr.initialize(cbp);
+ _wmgr.initialize(cbp, wcache_pgsize_sblks, wcache_num_pages, QLS_WMGR_MAXDTOKPP, QLS_WMGR_MAXWAITUS,
+ (_recoveryManager.isLastFileFull() ? 0 : _recoveryManager.getEndOffset()));
+
+ _readonly_flag = true;
+ _init_flag = true;
+}
+
+void
+jcntl::recover_complete()
+{
+ if (!_readonly_flag)
+ throw jexception(jerrno::JERR_JCNTL_NOTRECOVERED, "jcntl", "recover_complete");
+// for (uint16_t i=0; i<_lpmgr.num_jfiles(); i++)
+// _lpmgr.get_fcntlp(i)->reset(&_rcvdat);
+// _wrfc.initialize(_jfsize_sblks, &_rcvdat);
+// _rrfc.initialize();
+// _rrfc.set_findex(_rcvdat.ffid());
+// _rmgr.recover_complete();
+ _readonly_flag = false;
+}
+
+void
+jcntl::delete_jrnl_files()
+{
+ stop(true); // wait for AIO to complete
+ _linearFileController.purgeFilesToEfp();
+ _jdir.delete_dir();
+}
+
+
+iores
+jcntl::enqueue_data_record(const void* const data_buff,
+ const std::size_t tot_data_len,
+ const std::size_t this_data_len,
+ data_tok* dtokp,
+ const bool transient)
+{
+ iores r;
+ check_wstatus("enqueue_data_record");
+ {
+ slock s(_wr_mutex);
+ while (handle_aio_wait(_wmgr.enqueue(data_buff, tot_data_len, this_data_len, dtokp, 0, 0, transient, false), r,
+ dtokp)) ;
+ }
+ return r;
+}
+
+iores
+jcntl::enqueue_extern_data_record(const std::size_t tot_data_len,
+ data_tok* dtokp,
+ const bool transient)
+{
+ iores r;
+ check_wstatus("enqueue_extern_data_record");
+ {
+ slock s(_wr_mutex);
+ while (handle_aio_wait(_wmgr.enqueue(0, tot_data_len, 0, dtokp, 0, 0, transient, true), r, dtokp)) ;
+ }
+ return r;
+}
+
+iores
+jcntl::enqueue_txn_data_record(const void* const data_buff,
+ const std::size_t tot_data_len,
+ const std::size_t this_data_len,
+ data_tok* dtokp,
+ const std::string& xid,
+ const bool transient)
+{
+ iores r;
+ check_wstatus("enqueue_tx_data_record");
+ {
+ slock s(_wr_mutex);
+ while (handle_aio_wait(_wmgr.enqueue(data_buff, tot_data_len, this_data_len, dtokp, xid.data(), xid.size(),
+ transient, false), r, dtokp)) ;
+ }
+ return r;
+}
+
+iores
+jcntl::enqueue_extern_txn_data_record(const std::size_t tot_data_len,
+ data_tok* dtokp,
+ const std::string& xid,
+ const bool transient)
+{
+ iores r;
+ check_wstatus("enqueue_extern_txn_data_record");
+ {
+ slock s(_wr_mutex);
+ while (handle_aio_wait(_wmgr.enqueue(0, tot_data_len, 0, dtokp, xid.data(), xid.size(), transient, true), r,
+ dtokp)) ;
+ }
+ return r;
+}
+
+iores
+jcntl::read_data_record(void** const datapp,
+ std::size_t& dsize,
+ void** const xidpp,
+ std::size_t& xidsize,
+ bool& transient,
+ bool& external,
+ data_tok* const dtokp,
+ bool ignore_pending_txns)
+{
+ check_rstatus("read_data");
+ if (_recoveryManager.readNextRemainingRecord(datapp, dsize, xidpp, xidsize, transient, external, dtokp, ignore_pending_txns))
+ return RHM_IORES_SUCCESS;
+ return RHM_IORES_EMPTY;
+/*
+ if (!dtokp->is_readable()) {
+ std::ostringstream oss;
+ oss << std::hex << std::setfill('0');
+ oss << "dtok_id=0x" << std::setw(8) << dtokp->id();
+ oss << "; dtok_rid=0x" << std::setw(16) << dtokp->rid();
+ oss << "; dtok_wstate=" << dtokp->wstate_str();
+ throw jexception(jerrno::JERR_JCNTL_ENQSTATE, oss.str(), "jcntl", "read_data_record");
+ }
+ std::vector<uint64_t> ridl;
+ _emap.rid_list(ridl);
+ enq_map::emap_data_struct_t eds;
+ for (std::vector<uint64_t>::const_iterator i=ridl.begin(); i!=ridl.end(); ++i) {
+ short res = _emap.get_data(*i, eds);
+ if (res == enq_map::EMAP_OK) {
+ std::ifstream ifs(_recoveryManager._fm[eds._pfid].c_str(), std::ifstream::in | std::ifstream::binary);
+ if (!ifs.good()) {
+ std::ostringstream oss;
+ oss << "rid=" << (*i) << " pfid=" << eds._pfid << " file=" << _recoveryManager._fm[eds._pfid] << " file_posn=" << eds._file_posn;
+ throw jexception(jerrno::JERR_RCVM_OPENRD, oss.str(), "jcntl", "read_data_record");
+ }
+ ifs.seekg(eds._file_posn, std::ifstream::beg);
+ ::enq_hdr_t eh;
+ ifs.read((char*)&eh, sizeof(::enq_hdr_t));
+ if (!::validate_enq_hdr(&eh, QLS_ENQ_MAGIC, QLS_JRNL_VERSION, *i)) {
+ std::ostringstream oss;
+ oss << "rid=" << (*i) << " pfid=" << eds._pfid << " file=" << _recoveryManager._fm[eds._pfid] << " file_posn=" << eds._file_posn;
+ throw jexception(jerrno::JERR_JCNTL_INVALIDENQHDR, oss.str(), "jcntl", "read_data_record");
+ }
+ dsize = eh._dsize;
+ xidsize = eh._xidsize;
+ transient = ::is_enq_transient(&eh);
+ external = ::is_enq_external(&eh);
+ if (xidsize) {
+ *xidpp = ::malloc(xidsize);
+ ifs.read((char*)(*xidpp), xidsize);
+ } else {
+ *xidpp = 0;
+ }
+ if (dsize) {
+ *datapp = ::malloc(dsize);
+ ifs.read((char*)(*datapp), dsize);
+ } else {
+ *datapp = 0;
+ }
+ }
+ }
+*/
+/*
+ check_rstatus("read_data");
+ iores res = _rmgr.read(datapp, dsize, xidpp, xidsize, transient, external, dtokp, ignore_pending_txns);
+ if (res == RHM_IORES_RCINVALID)
+ {
+ get_wr_events(0); // check for outstanding write events
+ iores sres = _rmgr.synchronize(); // flushes all outstanding read events
+ if (sres != RHM_IORES_SUCCESS)
+ return sres;
+ // TODO: Does linear store need this?
+// _rmgr.wait_for_validity(&_aio_cmpl_timeout, true); // throw if timeout occurs
+ res = _rmgr.read(datapp, dsize, xidpp, xidsize, transient, external, dtokp, ignore_pending_txns);
+ }
+ return res;
+*/
+ return RHM_IORES_SUCCESS;
+}
+
+iores
+jcntl::dequeue_data_record(data_tok* const dtokp,
+ const bool txn_coml_commit)
+{
+ iores r;
+ check_wstatus("dequeue_data");
+ {
+ slock s(_wr_mutex);
+ while (handle_aio_wait(_wmgr.dequeue(dtokp, 0, 0, txn_coml_commit), r, dtokp)) ;
+ }
+ return r;
+}
+
+iores
+jcntl::dequeue_txn_data_record(data_tok* const dtokp,
+ const std::string& xid,
+ const bool txn_coml_commit)
+{
+ iores r;
+ check_wstatus("dequeue_data");
+ {
+ slock s(_wr_mutex);
+ while (handle_aio_wait(_wmgr.dequeue(dtokp, xid.data(), xid.size(), txn_coml_commit), r, dtokp)) ;
+ }
+ return r;
+}
+
+iores
+jcntl::txn_abort(data_tok* const dtokp,
+ const std::string& xid)
+{
+ iores r;
+ check_wstatus("txn_abort");
+ {
+ slock s(_wr_mutex);
+ while (handle_aio_wait(_wmgr.abort(dtokp, xid.data(), xid.size()), r, dtokp)) ;
+ }
+ return r;
+}
+
+iores
+jcntl::txn_commit(data_tok* const dtokp,
+ const std::string& xid)
+{
+ iores r;
+ check_wstatus("txn_commit");
+ {
+ slock s(_wr_mutex);
+ while (handle_aio_wait(_wmgr.commit(dtokp, xid.data(), xid.size()), r, dtokp)) ;
+ }
+ return r;
+}
+
+bool
+jcntl::is_txn_synced(const std::string& xid)
+{
+ slock s(_wr_mutex);
+ bool res = _wmgr.is_txn_synced(xid);
+ return res;
+}
+
+int32_t
+jcntl::get_wr_events(timespec* const timeout)
+{
+ stlock t(_wr_mutex);
+ if (!t.locked())
+ return jerrno::LOCK_TAKEN;
+ return _wmgr.get_events(timeout, false);
+}
+
+void
+jcntl::stop(const bool block_till_aio_cmpl)
+{
+ if (_readonly_flag)
+ check_rstatus("stop");
+ else
+ check_wstatus("stop");
+ _stop_flag = true;
+ if (!_readonly_flag)
+ flush(block_till_aio_cmpl);
+ _linearFileController.finalize();
+}
+
+LinearFileController&
+jcntl::getLinearFileControllerRef() {
+ return _linearFileController;
+}
+
+iores
+jcntl::flush(const bool block_till_aio_cmpl)
+{
+ if (!_init_flag)
+ return RHM_IORES_SUCCESS;
+ if (_readonly_flag)
+ throw jexception(jerrno::JERR_JCNTL_READONLY, "jcntl", "flush");
+ iores res;
+ {
+ slock s(_wr_mutex);
+ res = _wmgr.flush();
+ }
+ if (block_till_aio_cmpl)
+ aio_cmpl_wait();
+ return res;
+}
+
+// Protected/Private functions
+
+void
+jcntl::check_wstatus(const char* fn_name) const
+{
+ if (!_init_flag)
+ throw jexception(jerrno::JERR__NINIT, "jcntl", fn_name);
+ if (_readonly_flag)
+ throw jexception(jerrno::JERR_JCNTL_READONLY, "jcntl", fn_name);
+ if (_stop_flag)
+ throw jexception(jerrno::JERR_JCNTL_STOPPED, "jcntl", fn_name);
+}
+
+void
+jcntl::check_rstatus(const char* fn_name) const
+{
+ if (!_init_flag)
+ throw jexception(jerrno::JERR__NINIT, "jcntl", fn_name);
+ if (_stop_flag)
+ throw jexception(jerrno::JERR_JCNTL_STOPPED, "jcntl", fn_name);
+}
+
+
+void
+jcntl::aio_cmpl_wait()
+{
+ //while (_wmgr.get_aio_evt_rem())
+ while (true)
+ {
+ uint32_t aer;
+ {
+ slock s(_wr_mutex);
+ aer = _wmgr.get_aio_evt_rem();
+ }
+ if (aer == 0) break; // no events left
+ if (get_wr_events(&_aio_cmpl_timeout) == jerrno::AIO_TIMEOUT)
+ throw jexception(jerrno::JERR_JCNTL_AIOCMPLWAIT, "jcntl", "aio_cmpl_wait");
+ }
+}
+
+
+bool
+jcntl::handle_aio_wait(const iores res, iores& resout, const data_tok* dtp)
+{
+ resout = res;
+ if (res == RHM_IORES_PAGE_AIOWAIT)
+ {
+ while (_wmgr.curr_pg_blocked())
+ {
+ if (_wmgr.get_aio_evt_rem() == 0) {
+std::cout << "&&&&&& jcntl::handle_aio_wait() " << _wmgr.status_str() << std::endl; // DEBUG
+ throw jexception("_wmgr.curr_pg_blocked() with no events remaining"); // TODO - complete exception
+ }
+ if (_wmgr.get_events(&_aio_cmpl_timeout, false) == jerrno::AIO_TIMEOUT)
+ {
+ std::ostringstream oss;
+ oss << "get_events() returned JERR_JCNTL_AIOCMPLWAIT; wmgr_status: " << _wmgr.status_str();
+ _jrnl_log.log(JournalLog::LOG_CRITICAL, _jid, oss.str());
+ throw jexception(jerrno::JERR_JCNTL_AIOCMPLWAIT, "jcntl", "handle_aio_wait");
+ }
+ }
+ return true;
+ }
+ else if (res == RHM_IORES_FILE_AIOWAIT)
+ {
+// while (_wmgr.curr_file_blocked())
+// {
+// if (_wmgr.get_events(pmgr::UNUSED, &_aio_cmpl_timeout) == jerrno::AIO_TIMEOUT)
+// {
+// std::ostringstream oss;
+// oss << "get_events() returned JERR_JCNTL_AIOCMPLWAIT; wmgr_status: " << _wmgr.status_str();
+// this->log(LOG_CRITICAL, oss.str());
+// throw jexception(jerrno::JERR_JCNTL_AIOCMPLWAIT, "jcntl", "handle_aio_wait");
+// }
+// }
+// _wrfc.wr_reset();
+ resout = RHM_IORES_SUCCESS;
+ data_tok::write_state ws = dtp->wstate();
+ return ws == data_tok::ENQ_PART || ws == data_tok::DEQ_PART || ws == data_tok::ABORT_PART ||
+ ws == data_tok::COMMIT_PART;
+ }
+ return false;
+}
+
+}}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org