You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kp...@apache.org on 2013/10/22 21:09:58 UTC
svn commit: r1534736 [7/8] - in /qpid/trunk/qpid/cpp/src: ./
qpid/linearstore/ qpid/linearstore/jrnl/ qpid/linearstore/jrnl/utils/
tests/linearstore/
Added: qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/jexception.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/jexception.h?rev=1534736&view=auto
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/jexception.h (added)
+++ qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/jexception.h Tue Oct 22 19:09:56 2013
@@ -0,0 +1,126 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#ifndef QPID_LEGACYSTORE_JRNL_JEXCEPTION_H
+#define QPID_LEGACYSTORE_JRNL_JEXCEPTION_H
+
+namespace qpid
+{
+namespace qls_jrnl
+{
+class jexception;
+}}
+
+#include <cerrno>
+#include <cstdio>
+#include <cstdlib>
+#include <cstring>
+#include <exception>
+#include "qpid/linearstore/jrnl/jerrno.h"
+#include <sstream>
+#include <string>
+
+// Macro for formatting commom system errors
+#define FORMAT_SYSERR(errno) " errno=" << errno << " (" << std::strerror(errno) << ")"
+
+#define MALLOC_CHK(ptr, var, cls, fn) if(ptr == 0) { \
+ clean(); \
+ std::ostringstream oss; \
+ oss << var << ": malloc() failed: " << FORMAT_SYSERR(errno); \
+ throw jexception(jerrno::JERR__MALLOC, oss.str(), cls, fn); \
+ }
+
+// TODO: The following is a temporary bug-tracking aid which forces a core.
+// Replace with the commented out version below when BZ484048 is resolved.
+#define PTHREAD_CHK(err, pfn, cls, fn) if(err != 0) { \
+ std::ostringstream oss; \
+ oss << cls << "::" << fn << "(): " << pfn; \
+ errno = err; \
+ ::perror(oss.str().c_str()); \
+ ::abort(); \
+ }
+/*
+#define PTHREAD_CHK(err, pfn, cls, fn) if(err != 0) { \
+ std::ostringstream oss; \
+ oss << pfn << " failed: " << FORMAT_SYSERR(err); \
+ throw jexception(jerrno::JERR__PTHREAD, oss.str(), cls, fn); \
+ }
+*/
+
+#define ASSERT(cond, msg) if(cond == 0) { \
+ std::cerr << msg << std::endl; \
+ ::abort(); \
+ }
+
+namespace qpid
+{
+namespace qls_jrnl
+{
+ /**
+ * \class jexception
+ * \brief Generic journal exception class
+ */
+ class jexception : public std::exception
+ {
+ private:
+ uint32_t _err_code;
+ std::string _additional_info;
+ std::string _throwing_class;
+ std::string _throwing_fn;
+ std::string _what;
+ void format();
+
+ public:
+ jexception() throw ();
+
+ jexception(const uint32_t err_code) throw ();
+
+ jexception(const char* additional_info) throw ();
+ jexception(const std::string& additional_info) throw ();
+
+ jexception(const uint32_t err_code, const char* additional_info) throw ();
+ jexception(const uint32_t err_code, const std::string& additional_info) throw ();
+
+ jexception(const uint32_t err_code, const char* throwing_class, const char* throwing_fn)
+ throw ();
+ jexception(const uint32_t err_code, const std::string& throwing_class,
+ const std::string& throwing_fn) throw ();
+
+ jexception(const uint32_t err_code, const char* additional_info,
+ const char* throwing_class, const char* throwing_fn) throw ();
+ jexception(const uint32_t err_code, const std::string& additional_info,
+ const std::string& throwing_class, const std::string& throwing_fn) throw ();
+
+ virtual ~jexception() throw ();
+ virtual const char* what() const throw (); // override std::exception::what()
+
+ inline uint32_t err_code() const throw () { return _err_code; }
+ inline const std::string additional_info() const throw () { return _additional_info; }
+ inline const std::string throwing_class() const throw () { return _throwing_class; }
+ inline const std::string throwing_fn() const throw () { return _throwing_fn; }
+
+ friend std::ostream& operator<<(std::ostream& os, const jexception& je);
+ friend std::ostream& operator<<(std::ostream& os, const jexception* jePtr);
+ }; // class jexception
+
+}}
+
+#endif // ifndef QPID_LEGACYSTORE_JRNL_JEXCEPTION_H
Added: qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/jrec.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/jrec.cpp?rev=1534736&view=auto
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/jrec.cpp (added)
+++ qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/jrec.cpp Tue Oct 22 19:09:56 2013
@@ -0,0 +1,107 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "qpid/linearstore/jrnl/jrec.h"
+
+#include <iomanip>
+#include "qpid/linearstore/jrnl/jerrno.h"
+#include "qpid/linearstore/jrnl/jexception.h"
+#include <sstream>
+
+namespace qpid
+{
+namespace qls_jrnl
+{
+
+jrec::jrec() {}
+jrec::~jrec() {}
+
+void
+jrec::chk_hdr(const rec_hdr_t& hdr)
+{
+ if (hdr._magic == 0)
+ {
+ std::ostringstream oss;
+ oss << std::hex << std::setfill('0');
+ oss << "enq magic NULL: rid=0x" << hdr._rid;
+ throw jexception(jerrno::JERR_JREC_BADRECHDR, oss.str(), "jrec", "chk_hdr");
+ }
+ if (hdr._version != QLS_JRNL_VERSION)
+ {
+ std::ostringstream oss;
+ oss << std::hex << std::setfill('0');
+ oss << "version: rid=0x" << hdr._rid;
+ oss << ": expected=0x" << std::setw(2) << (int)QLS_JRNL_VERSION;
+ oss << " read=0x" << std::setw(2) << (int)hdr._version;
+ throw jexception(jerrno::JERR_JREC_BADRECHDR, oss.str(), "jrec", "chk_hdr");
+ }
+//#if defined (JRNL_LITTLE_ENDIAN)
+// uint8_t endian_flag = RHM_LENDIAN_FLAG;
+//#else
+// uint8_t endian_flag = RHM_BENDIAN_FLAG;
+//#endif
+// if (hdr._eflag != endian_flag)
+// {
+// std::ostringstream oss;
+// oss << std::hex << std::setfill('0');
+// oss << "endian_flag: rid=" << hdr._rid;
+// oss << ": expected=0x" << std::setw(2) << (int)endian_flag;
+// oss << " read=0x" << std::setw(2) << (int)hdr._eflag;
+// throw jexception(jerrno::JERR_JREC_BADRECHDR, oss.str(), "jrec", "chk_hdr");
+// }
+}
+
+void
+jrec::chk_rid(const rec_hdr_t& hdr, const uint64_t rid)
+{
+ if (hdr._rid != rid)
+ {
+ std::ostringstream oss;
+ oss << std::hex << std::setfill('0');
+ oss << "rid mismatch: expected=0x" << rid;
+ oss << " read=0x" << hdr._rid;
+ throw jexception(jerrno::JERR_JREC_BADRECHDR, oss.str(), "jrec", "chk_hdr");
+ }
+}
+
+void
+jrec::chk_tail(const rec_tail_t& tail, const rec_hdr_t& hdr)
+{
+ if (tail._xmagic != ~hdr._magic)
+ {
+ std::ostringstream oss;
+ oss << std::hex << std::setfill('0');
+ oss << "magic: rid=0x" << hdr._rid;
+ oss << ": expected=0x" << ~hdr._magic;
+ oss << " read=0x" << tail._xmagic;
+ throw jexception(jerrno::JERR_JREC_BADRECTAIL, oss.str(), "jrec", "chk_tail");
+ }
+ if (tail._rid != hdr._rid)
+ {
+ std::ostringstream oss;
+ oss << std::hex << std::setfill('0');
+ oss << "rid: rid=0x" << hdr._rid;
+ oss << ": read=0x" << tail._rid;
+ throw jexception(jerrno::JERR_JREC_BADRECTAIL, oss.str(), "jrec", "chk_tail");
+ }
+}
+
+}}
Added: qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/jrec.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/jrec.h?rev=1534736&view=auto
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/jrec.h (added)
+++ qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/jrec.h Tue Oct 22 19:09:56 2013
@@ -0,0 +1,170 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#ifndef QPID_LEGACYSTORE_JRNL_JREC_H
+#define QPID_LEGACYSTORE_JRNL_JREC_H
+
+namespace qpid
+{
+namespace qls_jrnl
+{
+class jrec;
+}}
+
+#include <cstddef>
+#include <fstream>
+#include "qpid/linearstore/jrnl/jcfg.h"
+#include "qpid/linearstore/jrnl/utils/rec_hdr.h"
+#include "qpid/linearstore/jrnl/utils/rec_tail.h"
+#include <string>
+
+namespace qpid
+{
+namespace qls_jrnl
+{
+
+ /**
+ * \class jrec
+ * \brief Abstract class for all file jrecords, both data and log. This class establishes
+ * the common data format and structure for these jrecords.
+ */
+ class jrec
+ {
+ public:
+ jrec();
+ virtual ~jrec();
+
+ /**
+ * \brief Encode this instance of jrec into the write buffer at the disk-block-aligned
+ * pointer wptr starting at position rec_offs_dblks in the encoded record to a
+ * maximum size of max_size_dblks.
+ *
+ * This call encodes the content of the data contianed in this instance of jrec into a
+ * disk-softblock-aligned (defined by JRNL_SBLK_SIZE) buffer pointed to by parameter
+ * wptr. No more than paramter max_size_dblks data-blocks may be written to the buffer.
+ * The parameter rec_offs_dblks is the offset in data-blocks within the fully encoded
+ * data block this instance represents at which to start encoding.
+ *
+ * Encoding entails writing the record header (struct enq_hdr), the data and the record tail
+ * (struct enq_tail). The record must be data-block-aligned (defined by JRNL_DBLK_SIZE),
+ * thus any remaining space in the final data-block is ignored; the returned value is the
+ * number of data-blocks consumed from the page by the encode action. Provided the initial
+ * alignment requirements are met, records may be of arbitrary size and may span multiple
+ * data-blocks, disk-blocks and/or pages.
+ *
+ * Since the record size in data-blocks is known, the general usage pattern is to call
+ * encode() as many times as is needed to fully encode the data. Each call to encode()
+ * will encode as much of the record as it can to what remains of the current page cache,
+ * and will return the number of data-blocks actually encoded.
+ *
+ * <b>Example:</b> Assume that record r1 was previously written to page 0, and that this
+ * is an instance representing record r2. Being larger than the page size ps, r2 would span
+ * multiple pages as follows:
+ * <pre>
+ * |<---ps--->|
+ * +----------+----------+----------+----...
+ * | |r2a| r2b | r2c | |
+ * |<-r1-><----------r2----------> |
+ * +----------+----------+----------+----...
+ * page: p0 p1 p2
+ * </pre>
+ * Encoding record r2 will require multiple calls to encode; one for each page which
+ * is involved. Record r2 is divided logically into sections r2a, r2b and r2c at the
+ * points where the page boundaries intersect with the record. Assuming a page size
+ * of ps, the page boundary pointers are represented by their names p0, p1... and the
+ * sizes of the record segments are represented by their names r1, r2a, r2b..., the calls
+ * should be as follows:
+ * <pre>
+ * encode(p0+r1, 0, ps-r1); (returns r2a data-blocks)
+ * encode(p1, r2a, ps); (returns r2b data-blocks which equals ps)
+ * encode(p2, r2a+r2b, ps); (returns r2c data-blocks)
+ * </pre>
+ *
+ * \param wptr Data-block-aligned pointer to position in page buffer where encoding is to
+ * take place.
+ * \param rec_offs_dblks Offset in data-blocks within record from which to start encoding.
+ * \param max_size_dblks Maximum number of data-blocks to write to pointer wptr.
+ * \returns Number of data-blocks encoded.
+ */
+ virtual uint32_t encode(void* wptr, uint32_t rec_offs_dblks,
+ uint32_t max_size_dblks) = 0;
+
+ /**
+ * \brief Decode into this instance of jrec from the read buffer at the disk-block-aligned
+ * pointer rptr starting at position jrec_offs_dblks in the encoded record to a
+ * maximum size of max_size_blks.
+ *
+ * This call decodes a record in the page buffer pointed to by the data-block-aligned
+ * (defined by JRNL_DBLK_SIZE) parameter rptr into this instance of jrec. No more than
+ * paramter max_size_dblks data-blocks may be read from the buffer. The parameter
+ * jrec_offs_dblks is the offset in data-blocks within the encoded record at which to start
+ * decoding.
+ *
+ * Decoding entails reading the record header, the data and the tail. The record is
+ * data-block-aligned (defined by JRNL_DBLK_SIZE); the returned value is the number of
+ * data-blocks read from the buffer by the decode action. As the record data size is only
+ * known once the header is read, the number of calls required to complete reading the
+ * record will depend on the vlaues within this instance which are set when the
+ * header is decoded.
+ *
+ * A non-zero value for jrec_offs_dblks implies that this is not the first call to
+ * decode and the record data will be appended at this offset.
+ *
+ * \param h Reference to instance of struct hdr, already read from page buffer and used
+ * to determine record type
+ * \param rptr Data-block-aligned pointer to position in page buffer where decoding is to
+ * begin.
+ * \param rec_offs_dblks Offset within record from which to start appending the decoded
+ * record.
+ * \param max_size_dblks Maximum number of data-blocks to read from pointer rptr.
+ * \returns Number of data-blocks read (consumed).
+ */
+ virtual uint32_t decode(rec_hdr_t& h, void* rptr, uint32_t rec_offs_dblks,
+ uint32_t max_size_dblks) = 0;
+
+ virtual bool rcv_decode(rec_hdr_t h, std::ifstream* ifsp, std::size_t& rec_offs) = 0;
+
+ virtual std::string& str(std::string& str) const = 0;
+ virtual std::size_t data_size() const = 0;
+ virtual std::size_t xid_size() const = 0;
+ virtual std::size_t rec_size() const = 0;
+ inline virtual uint32_t rec_size_dblks() const { return size_dblks(rec_size()); }
+ static inline uint32_t size_dblks(const std::size_t size)
+ { return size_blks(size, QLS_DBLK_SIZE_BYTES); }
+ static inline uint32_t size_sblks(const std::size_t size)
+ { return size_blks(size, QLS_SBLK_SIZE_BYTES); }
+ static inline uint32_t size_blks(const std::size_t size, const std::size_t blksize)
+ { return (size + blksize - 1)/blksize; }
+ virtual uint64_t rid() const = 0;
+
+ protected:
+ virtual void chk_hdr() const = 0;
+ virtual void chk_hdr(uint64_t rid) const = 0;
+ virtual void chk_tail() const = 0;
+ static void chk_hdr(const rec_hdr_t& hdr);
+ static void chk_rid(const rec_hdr_t& hdr, uint64_t rid);
+ static void chk_tail(const rec_tail_t& tail, const rec_hdr_t& hdr);
+ virtual void clean() = 0;
+ }; // class jrec
+
+}}
+
+#endif // ifndef QPID_LEGACYSTORE_JRNL_JREC_H
Added: qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/pmgr.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/pmgr.cpp?rev=1534736&view=auto
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/pmgr.cpp (added)
+++ qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/pmgr.cpp Tue Oct 22 19:09:56 2013
@@ -0,0 +1,202 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "qpid/linearstore/jrnl/pmgr.h"
+
+#include <cerrno>
+#include <cstdlib>
+#include <cstring>
+#include "qpid/linearstore/jrnl/jcfg.h"
+#include "qpid/linearstore/jrnl/jcntl.h"
+#include "qpid/linearstore/jrnl/jerrno.h"
+#include <sstream>
+
+namespace qpid
+{
+namespace qls_jrnl
+{
+
+pmgr::page_cb::page_cb(uint16_t index):
+ _index(index),
+ _state(UNUSED),
+ _frid(0),
+ _wdblks(0),
+ _rdblks(0),
+ _pdtokl(0),
+ _jfp(0),
+ _pbuff(0)
+{}
+
+// TODO: almost identical to pmgr::page_state_str() below - resolve
+const char*
+pmgr::page_cb::state_str() const
+{
+ switch(_state)
+ {
+ case UNUSED:
+ return "UNUSED";
+ case IN_USE:
+ return "IN_USE";
+ case AIO_PENDING:
+ return "AIO_PENDING";
+ }
+ return "<unknown>";
+}
+
+// static
+const uint32_t pmgr::_sblkSizeBytes = QLS_SBLK_SIZE_BYTES;
+
+pmgr::pmgr(jcntl* jc, enq_map& emap, txn_map& tmap):
+ _cache_pgsize_sblks(0),
+ _cache_num_pages(0),
+ _jc(jc),
+ _emap(emap),
+ _tmap(tmap),
+ _page_base_ptr(0),
+ _page_ptr_arr(0),
+ _page_cb_arr(0),
+ _aio_cb_arr(0),
+ _aio_event_arr(0),
+ _ioctx(0),
+ _pg_index(0),
+ _pg_cntr(0),
+ _pg_offset_dblks(0),
+ _aio_evt_rem(0),
+ _cbp(0),
+ _enq_rec(),
+ _deq_rec(),
+ _txn_rec()
+{}
+
+pmgr::~pmgr()
+{
+ pmgr::clean();
+}
+
+void
+pmgr::initialize(aio_callback* const cbp, const uint32_t cache_pgsize_sblks, const uint16_t cache_num_pages)
+{
+ // As static use of this class keeps old values around, clean up first...
+ pmgr::clean();
+ _pg_index = 0;
+ _pg_cntr = 0;
+ _pg_offset_dblks = 0;
+ _aio_evt_rem = 0;
+ _cache_pgsize_sblks = cache_pgsize_sblks;
+ _cache_num_pages = cache_num_pages;
+ _cbp = cbp;
+
+ // 1. Allocate page memory (as a single block)
+ std::size_t cache_pgsize = _cache_num_pages * _cache_pgsize_sblks * _sblkSizeBytes;
+ if (::posix_memalign(&_page_base_ptr, QLS_AIO_ALIGN_BOUNDARY_BYTES, cache_pgsize))
+ {
+ clean();
+ std::ostringstream oss;
+ oss << "posix_memalign(): alignment=" << QLS_AIO_ALIGN_BOUNDARY_BYTES << " size=" << cache_pgsize;
+ oss << FORMAT_SYSERR(errno);
+ throw jexception(jerrno::JERR__MALLOC, oss.str(), "pmgr", "initialize");
+ }
+
+ // 2. Allocate array of page pointers
+ _page_ptr_arr = (void**)std::malloc(_cache_num_pages * sizeof(void*));
+ MALLOC_CHK(_page_ptr_arr, "_page_ptr_arr", "pmgr", "initialize");
+
+ // 3. Allocate and initialize page control block (page_cb) array
+ _page_cb_arr = (page_cb*)std::malloc(_cache_num_pages * sizeof(page_cb));
+ MALLOC_CHK(_page_cb_arr, "_page_cb_arr", "pmgr", "initialize");
+ std::memset(_page_cb_arr, 0, _cache_num_pages * sizeof(page_cb));
+
+ // 4. Allocate IO control block (iocb) array
+ _aio_cb_arr = (aio_cb*)std::malloc(_cache_num_pages * sizeof(aio_cb));
+ MALLOC_CHK(_aio_cb_arr, "_aio_cb_arr", "pmgr", "initialize");
+
+ // 5. Set page pointers in _page_ptr_arr, _page_cb_arr and iocbs to pages within page block
+ for (uint16_t i=0; i<_cache_num_pages; i++)
+ {
+ _page_ptr_arr[i] = (void*)((char*)_page_base_ptr + _cache_pgsize_sblks * _sblkSizeBytes * i);
+ _page_cb_arr[i]._index = i;
+ _page_cb_arr[i]._state = UNUSED;
+ _page_cb_arr[i]._pbuff = _page_ptr_arr[i];
+ _page_cb_arr[i]._pdtokl = new std::deque<data_tok*>;
+ _page_cb_arr[i]._pdtokl->clear();
+ _aio_cb_arr[i].data = (void*)&_page_cb_arr[i];
+ }
+
+ // 6. Allocate io_event array, max one event per cache page plus one for each file
+ const uint16_t max_aio_evts = _cache_num_pages + 1; // One additional event for file header writes
+ _aio_event_arr = (aio_event*)std::malloc(max_aio_evts * sizeof(aio_event));
+ MALLOC_CHK(_aio_event_arr, "_aio_event_arr", "pmgr", "initialize");
+
+ // 7. Initialize AIO context
+ if (int ret = aio::queue_init(max_aio_evts, &_ioctx))
+ {
+ std::ostringstream oss;
+ oss << "io_queue_init() failed: " << FORMAT_SYSERR(-ret);
+ throw jexception(jerrno::JERR__AIO, oss.str(), "pmgr", "initialize");
+ }
+}
+
+void
+pmgr::clean()
+{
+ // Clean up allocated memory here
+
+ if (_ioctx)
+ aio::queue_release(_ioctx);
+
+ std::free(_page_base_ptr);
+ _page_base_ptr = 0;
+
+ if (_page_cb_arr)
+ {
+ for (int i=0; i<_cache_num_pages; i++)
+ delete _page_cb_arr[i]._pdtokl;
+ std::free(_page_ptr_arr);
+ _page_ptr_arr = 0;
+ }
+
+ std::free(_page_cb_arr);
+ _page_cb_arr = 0;
+
+ std::free(_aio_cb_arr);
+ _aio_cb_arr = 0;
+
+ std::free(_aio_event_arr);
+ _aio_event_arr = 0;
+}
+
+// TODO: almost identical to pmgr::page_cb::state_str() above - resolve
+const char*
+pmgr::page_state_str(page_state ps)
+{
+ switch (ps)
+ {
+ case UNUSED:
+ return "UNUSED";
+ case IN_USE:
+ return "IN_USE";
+ case AIO_PENDING:
+ return "AIO_PENDING";
+ }
+ return "<page_state unknown>";
+}
+
+}}
Added: qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/pmgr.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/pmgr.h?rev=1534736&view=auto
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/pmgr.h (added)
+++ qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/pmgr.h Tue Oct 22 19:09:56 2013
@@ -0,0 +1,127 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#ifndef QPID_LEGACYSTORE_JRNL_PMGR_H
+#define QPID_LEGACYSTORE_JRNL_PMGR_H
+
+namespace qpid
+{
+namespace qls_jrnl
+{
+ class pmgr;
+ class jcntl;
+}}
+
+#include <deque>
+#include "qpid/linearstore/jrnl/aio.h"
+#include "qpid/linearstore/jrnl/aio_callback.h"
+#include "qpid/linearstore/jrnl/data_tok.h"
+#include "qpid/linearstore/jrnl/deq_rec.h"
+#include "qpid/linearstore/jrnl/enq_map.h"
+#include "qpid/linearstore/jrnl/enq_rec.h"
+#include "qpid/linearstore/jrnl/txn_map.h"
+#include "qpid/linearstore/jrnl/txn_rec.h"
+
+namespace qpid
+{
+namespace qls_jrnl
+{
+class JournalFile;
+
+ /**
+ * \brief Abstract class for managing either read or write page cache of arbitrary size and
+ * number of cache_num_pages.
+ */
+ class pmgr
+ {
+ public:
+ /**
+ * \brief Enumeration of possible stats of a page within a page cache.
+ */
+ enum page_state
+ {
+ UNUSED, ///< A page is uninitialized, contains no data.
+ IN_USE, ///< Page is in use.
+ AIO_PENDING ///< An AIO request outstanding.
+ };
+
+ /**
+ * \brief Page control block, carries control and state information for each page in the
+ * cache.
+ */
+ struct page_cb
+ {
+ uint16_t _index; ///< Index of this page
+ page_state _state; ///< Status of page
+ uint64_t _frid; ///< First rid in page (used for fhdr init)
+ uint32_t _wdblks; ///< Total number of dblks in page so far
+ uint32_t _rdblks; ///< Total number of dblks in page
+ std::deque<data_tok*>* _pdtokl; ///< Page message tokens list
+ JournalFile* _jfp; ///< Journal file for incrementing compl counts
+ void* _pbuff; ///< Page buffer
+
+ page_cb(uint16_t index); ///< Convenience constructor
+ const char* state_str() const; ///< Return state as string for this pcb
+ };
+
+ protected:
+ static const uint32_t _sblkSizeBytes; ///< Disk softblock size
+ uint32_t _cache_pgsize_sblks; ///< Size of page cache cache_num_pages
+ uint16_t _cache_num_pages; ///< Number of page cache cache_num_pages
+ jcntl* _jc; ///< Pointer to journal controller
+ enq_map& _emap; ///< Ref to enqueue map
+ txn_map& _tmap; ///< Ref to transaction map
+ void* _page_base_ptr; ///< Base pointer to page memory
+ void** _page_ptr_arr; ///< Array of pointers to cache_num_pages in page memory
+ page_cb* _page_cb_arr; ///< Array of page_cb structs
+ aio_cb* _aio_cb_arr; ///< Array of iocb structs
+ aio_event* _aio_event_arr; ///< Array of io_events
+ io_context_t _ioctx; ///< AIO context for read/write operations
+ uint16_t _pg_index; ///< Index of current page being used
+ uint32_t _pg_cntr; ///< Page counter; determines if file rotation req'd
+ uint32_t _pg_offset_dblks; ///< Page offset (used so far) in data blocks
+ uint32_t _aio_evt_rem; ///< Remaining AIO events
+ aio_callback* _cbp; ///< Pointer to callback object
+
+ enq_rec _enq_rec; ///< Enqueue record used for encoding/decoding
+ deq_rec _deq_rec; ///< Dequeue record used for encoding/decoding
+ txn_rec _txn_rec; ///< Transaction record used for encoding/decoding
+
+ public:
+ pmgr(jcntl* jc, enq_map& emap, txn_map& tmap);
+ virtual ~pmgr();
+
+ virtual int32_t get_events(timespec* const timeout, bool flush) = 0;
+ inline uint32_t get_aio_evt_rem() const { return _aio_evt_rem; }
+ static const char* page_state_str(page_state ps);
+ inline uint32_t cache_pgsize_sblks() const { return _cache_pgsize_sblks; }
+ inline uint16_t cache_num_pages() const { return _cache_num_pages; }
+
+ protected:
+ virtual void initialize(aio_callback* const cbp, const uint32_t cache_pgsize_sblks,
+ const uint16_t cache_num_pages);
+ virtual void rotate_page() = 0;
+ virtual void clean();
+ };
+
+}}
+
+#endif // ifndef QPID_LEGACYSTORE_JRNL_PMGR_H
Added: qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/slock.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/slock.h?rev=1534736&view=auto
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/slock.h (added)
+++ qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/slock.h Tue Oct 22 19:09:56 2013
@@ -0,0 +1,73 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#ifndef QPID_LEGACYSTORE_JRNL_SLOCK_H
+#define QPID_LEGACYSTORE_JRNL_SLOCK_H
+
+#include "qpid/linearstore/jrnl/jexception.h"
+#include "qpid/linearstore/jrnl/smutex.h"
+#include <pthread.h>
+
+namespace qpid
+{
+namespace qls_jrnl
+{
+
+ // Ultra-simple scoped lock class, auto-releases mutex when it goes out-of-scope
+ class slock
+ {
+ protected:
+ const smutex& _sm;
+ public:
+ inline slock(const smutex& sm) : _sm(sm)
+ {
+ PTHREAD_CHK(::pthread_mutex_lock(_sm.get()), "::pthread_mutex_lock", "slock", "slock");
+ }
+ inline ~slock()
+ {
+ PTHREAD_CHK(::pthread_mutex_unlock(_sm.get()), "::pthread_mutex_unlock", "slock", "~slock");
+ }
+ };
+
+ // Ultra-simple scoped try-lock class, auto-releases mutex when it goes out-of-scope
+ class stlock
+ {
+ protected:
+ const smutex& _sm;
+ bool _locked;
+ public:
+ inline stlock(const smutex& sm) : _sm(sm), _locked(false)
+ {
+ int ret = ::pthread_mutex_trylock(_sm.get());
+ _locked = (ret == 0); // check if lock obtained
+ if (!_locked && ret != EBUSY) PTHREAD_CHK(ret, "::pthread_mutex_trylock", "stlock", "stlock");
+ }
+ inline ~stlock()
+ {
+ if (_locked)
+ PTHREAD_CHK(::pthread_mutex_unlock(_sm.get()), "::pthread_mutex_unlock", "stlock", "~stlock");
+ }
+ inline bool locked() const { return _locked; }
+ };
+
+}}
+
+#endif // ifndef QPID_LEGACYSTORE_JRNL_SLOCK_H
Added: qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/smutex.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/smutex.h?rev=1534736&view=auto
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/smutex.h (added)
+++ qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/smutex.h Tue Oct 22 19:09:56 2013
@@ -0,0 +1,52 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#ifndef QPID_LEGACYSTORE_JRNL_SMUTEX_H
+#define QPID_LEGACYSTORE_JRNL_SMUTEX_H
+
+#include "qpid/linearstore/jrnl/jexception.h"
+#include <pthread.h>
+
+namespace qpid
+{
+namespace qls_jrnl
+{
+
+ // Ultra-simple scoped mutex class that allows a posix mutex to be initialized and destroyed with error checks
+ class smutex
+ {
+ protected:
+ mutable pthread_mutex_t _m;
+ public:
+ inline smutex()
+ {
+ PTHREAD_CHK(::pthread_mutex_init(&_m, 0), "::pthread_mutex_init", "smutex", "smutex");
+ }
+ inline virtual ~smutex()
+ {
+ PTHREAD_CHK(::pthread_mutex_destroy(&_m), "::pthread_mutex_destroy", "smutex", "~smutex");
+ }
+ inline pthread_mutex_t* get() const { return &_m; }
+ };
+
+}}
+
+#endif // ifndef QPID_LEGACYSTORE_JRNL_SMUTEX_H
Added: qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/time_ns.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/time_ns.cpp?rev=1534736&view=auto
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/time_ns.cpp (added)
+++ qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/time_ns.cpp Tue Oct 22 19:09:56 2013
@@ -0,0 +1,43 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "qpid/linearstore/jrnl/time_ns.h"
+
+#include <sstream>
+
+namespace qpid
+{
+namespace qls_jrnl
+{
+
+const std::string
+time_ns::str(int precision) const
+{
+ const double t = tv_sec + (tv_nsec/1e9);
+ std::ostringstream oss;
+ oss.setf(std::ios::fixed, std::ios::floatfield);
+ oss.precision(precision);
+ oss << t;
+ return oss.str();
+}
+
+
+}}
Added: qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/time_ns.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/time_ns.h?rev=1534736&view=auto
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/time_ns.h (added)
+++ qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/time_ns.h Tue Oct 22 19:09:56 2013
@@ -0,0 +1,93 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#ifndef QPID_LEGACYSTORE_JRNL_TIME_NS_H
+#define QPID_LEGACYSTORE_JRNL_TIME_NS_H
+
+#include <cerrno>
+#include <ctime>
+#include <string>
+
+namespace qpid
+{
+namespace qls_jrnl
+{
+
+struct time_ns : public timespec
+{
+ inline time_ns() { tv_sec = 0; tv_nsec = 0; }
+ inline time_ns(const std::time_t sec, const long nsec = 0) { tv_sec = sec; tv_nsec = nsec; }
+ inline time_ns(const time_ns& t) { tv_sec = t.tv_sec; tv_nsec = t.tv_nsec; }
+
+ inline void set_zero() { tv_sec = 0; tv_nsec = 0; }
+ inline bool is_zero() const { return tv_sec == 0 && tv_nsec == 0; }
+ inline int now() { if(::clock_gettime(CLOCK_REALTIME, this)) return errno; return 0; }
+ const std::string str(int precision = 6) const;
+
+ inline time_ns& operator=(const time_ns& rhs)
+ { tv_sec = rhs.tv_sec; tv_nsec = rhs.tv_nsec; return *this; }
+ inline time_ns& operator+=(const time_ns& rhs)
+ {
+ tv_nsec += rhs.tv_nsec;
+ if (tv_nsec >= 1000000000L) { tv_sec++; tv_nsec -= 1000000000L; }
+ tv_sec += rhs.tv_sec;
+ return *this;
+ }
+ inline time_ns& operator+=(const long ns)
+ {
+ tv_nsec += ns;
+ if (tv_nsec >= 1000000000L) { tv_sec++; tv_nsec -= 1000000000L; }
+ return *this;
+ }
+ inline time_ns& operator-=(const long ns)
+ {
+ tv_nsec -= ns;
+ if (tv_nsec < 0) { tv_sec--; tv_nsec += 1000000000L; }
+ return *this;
+ }
+ inline time_ns& operator-=(const time_ns& rhs)
+ {
+ tv_nsec -= rhs.tv_nsec;
+ if (tv_nsec < 0) { tv_sec--; tv_nsec += 1000000000L; }
+ tv_sec -= rhs.tv_sec;
+ return *this;
+ }
+ inline const time_ns operator+(const time_ns& rhs)
+ { time_ns t(*this); t += rhs; return t; }
+ inline const time_ns operator-(const time_ns& rhs)
+ { time_ns t(*this); t -= rhs; return t; }
+ inline bool operator==(const time_ns& rhs)
+ { return tv_sec == rhs.tv_sec && tv_nsec == rhs.tv_nsec; }
+ inline bool operator!=(const time_ns& rhs)
+ { return tv_sec != rhs.tv_sec || tv_nsec != rhs.tv_nsec; }
+ inline bool operator>(const time_ns& rhs)
+ { if(tv_sec == rhs.tv_sec) return tv_nsec > rhs.tv_nsec; return tv_sec > rhs.tv_sec; }
+ inline bool operator>=(const time_ns& rhs)
+ { if(tv_sec == rhs.tv_sec) return tv_nsec >= rhs.tv_nsec; return tv_sec >= rhs.tv_sec; }
+ inline bool operator<(const time_ns& rhs)
+ { if(tv_sec == rhs.tv_sec) return tv_nsec < rhs.tv_nsec; return tv_sec < rhs.tv_sec; }
+ inline bool operator<=(const time_ns& rhs)
+ { if(tv_sec == rhs.tv_sec) return tv_nsec <= rhs.tv_nsec; return tv_sec <= rhs.tv_sec; }
+};
+
+}}
+
+#endif // ifndef QPID_LEGACYSTORE_JRNL_TIME_NS_H
Added: qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/txn_map.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/txn_map.cpp?rev=1534736&view=auto
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/txn_map.cpp (added)
+++ qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/txn_map.cpp Tue Oct 22 19:09:56 2013
@@ -0,0 +1,248 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "qpid/linearstore/jrnl/txn_map.h"
+
+#include <iomanip>
+#include "qpid/linearstore/jrnl/jerrno.h"
+#include "qpid/linearstore/jrnl/jexception.h"
+#include "qpid/linearstore/jrnl/slock.h"
+#include <sstream>
+
+namespace qpid
+{
+namespace qls_jrnl
+{
+
+// return/error codes
+int16_t txn_map::TMAP_RID_NOT_FOUND = -2;
+int16_t txn_map::TMAP_XID_NOT_FOUND = -1;
+int16_t txn_map::TMAP_OK = 0;
+int16_t txn_map::TMAP_NOT_SYNCED = 0;
+int16_t txn_map::TMAP_SYNCED = 1;
+
+txn_data_struct::txn_data_struct(const uint64_t rid, const uint64_t drid, const uint16_t pfid,
+ const bool enq_flag, const bool commit_flag):
+ _rid(rid),
+ _drid(drid),
+ _pfid(pfid),
+ _enq_flag(enq_flag),
+ _commit_flag(commit_flag),
+ _aio_compl(false)
+{}
+
+txn_map::txn_map():
+ _map()/*,
+ _pfid_txn_cnt()*/
+{}
+
+txn_map::~txn_map() {}
+
+/*
+void
+txn_map::set_num_jfiles(const uint16_t num_jfiles)
+{
+ _pfid_txn_cnt.resize(num_jfiles, 0);
+}
+*/
+
+/*
+uint32_t
+txn_map::get_txn_pfid_cnt(const uint16_t pfid) const
+{
+ return _pfid_txn_cnt.at(pfid);
+}
+*/
+
+bool
+txn_map::insert_txn_data(const std::string& xid, const txn_data& td)
+{
+ bool ok = true;
+ slock s(_mutex);
+ xmap_itr itr = _map.find(xid);
+ if (itr == _map.end()) // not found in map
+ {
+ txn_data_list list;
+ list.push_back(td);
+ std::pair<xmap_itr, bool> ret = _map.insert(xmap_param(xid, list));
+ if (!ret.second) // duplicate
+ ok = false;
+ }
+ else
+ itr->second.push_back(td);
+// _pfid_txn_cnt.at(td._pfid)++;
+ return ok;
+}
+
+const txn_data_list
+txn_map::get_tdata_list(const std::string& xid)
+{
+ slock s(_mutex);
+ return get_tdata_list_nolock(xid);
+}
+
+const txn_data_list
+txn_map::get_tdata_list_nolock(const std::string& xid)
+{
+ xmap_itr itr = _map.find(xid);
+ if (itr == _map.end()) // not found in map
+ return _empty_data_list;
+ return itr->second;
+}
+
+const txn_data_list
+txn_map::get_remove_tdata_list(const std::string& xid)
+{
+ slock s(_mutex);
+ xmap_itr itr = _map.find(xid);
+ if (itr == _map.end()) // not found in map
+ return _empty_data_list;
+ txn_data_list list = itr->second;
+ _map.erase(itr);
+// for (tdl_itr i=list.begin(); i!=list.end(); i++)
+// _pfid_txn_cnt.at(i->_pfid)--;
+ return list;
+}
+
+bool
+txn_map::in_map(const std::string& xid)
+{
+ slock s(_mutex);
+ xmap_itr itr= _map.find(xid);
+ return itr != _map.end();
+}
+
+uint32_t
+txn_map::enq_cnt()
+{
+ return cnt(true);
+}
+
+uint32_t
+txn_map::deq_cnt()
+{
+ return cnt(true);
+}
+
+uint32_t
+txn_map::cnt(const bool enq_flag)
+{
+ slock s(_mutex);
+ uint32_t c = 0;
+ for (xmap_itr i = _map.begin(); i != _map.end(); i++)
+ {
+ for (tdl_itr j = i->second.begin(); j < i->second.end(); j++)
+ {
+ if (j->_enq_flag == enq_flag)
+ c++;
+ }
+ }
+ return c;
+}
+
+int16_t
+txn_map::is_txn_synced(const std::string& xid)
+{
+ slock s(_mutex);
+ xmap_itr itr = _map.find(xid);
+ if (itr == _map.end()) // not found in map
+ return TMAP_XID_NOT_FOUND;
+ bool is_synced = true;
+ for (tdl_itr litr = itr->second.begin(); litr < itr->second.end(); litr++)
+ {
+ if (!litr->_aio_compl)
+ {
+ is_synced = false;
+ break;
+ }
+ }
+ return is_synced ? TMAP_SYNCED : TMAP_NOT_SYNCED;
+}
+
+int16_t
+txn_map::set_aio_compl(const std::string& xid, const uint64_t rid)
+{
+ slock s(_mutex);
+ xmap_itr itr = _map.find(xid);
+ if (itr == _map.end()) // xid not found in map
+ return TMAP_XID_NOT_FOUND;
+ for (tdl_itr litr = itr->second.begin(); litr < itr->second.end(); litr++)
+ {
+ if (litr->_rid == rid)
+ {
+ litr->_aio_compl = true;
+ return TMAP_OK; // rid found
+ }
+ }
+ // xid present, but rid not found
+ return TMAP_RID_NOT_FOUND;
+}
+
+bool
+txn_map::data_exists(const std::string& xid, const uint64_t rid)
+{
+ bool found = false;
+ {
+ slock s(_mutex);
+ txn_data_list tdl = get_tdata_list_nolock(xid);
+ tdl_itr itr = tdl.begin();
+ while (itr != tdl.end() && !found)
+ {
+ found = itr->_rid == rid;
+ itr++;
+ }
+ }
+ return found;
+}
+
+bool
+txn_map::is_enq(const uint64_t rid)
+{
+ bool found = false;
+ {
+ slock s(_mutex);
+ for (xmap_itr i = _map.begin(); i != _map.end() && !found; i++)
+ {
+ txn_data_list list = i->second;
+ for (tdl_itr j = list.begin(); j < list.end() && !found; j++)
+ {
+ if (j->_enq_flag)
+ found = j->_rid == rid;
+ else
+ found = j->_drid == rid;
+ }
+ }
+ }
+ return found;
+}
+
+void
+txn_map::xid_list(std::vector<std::string>& xv)
+{
+ xv.clear();
+ {
+ slock s(_mutex);
+ for (xmap_itr itr = _map.begin(); itr != _map.end(); itr++)
+ xv.push_back(itr->first);
+ }
+}
+
+}}
Added: qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/txn_map.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/txn_map.h?rev=1534736&view=auto
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/txn_map.h (added)
+++ qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/txn_map.h Tue Oct 22 19:09:56 2013
@@ -0,0 +1,145 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#ifndef QPID_LEGACYSTORE_JRNL_TXN_MAP_H
+#define QPID_LEGACYSTORE_JRNL_TXN_MAP_H
+
+namespace qpid
+{
+namespace qls_jrnl
+{
+ class txn_map;
+}}
+
+#include "qpid/linearstore/jrnl/smutex.h"
+#include <map>
+#include <pthread.h>
+#include <string>
+#include <vector>
+
+namespace qpid
+{
+namespace qls_jrnl
+{
+
+ /**
+ * \struct txn_data_struct
+ * \brief Struct encapsulating transaction data necessary for processing a transaction
+ * in the journal once it is closed with either a commit or abort.
+ */
+ struct txn_data_struct
+ {
+ uint64_t _rid; ///< Record id for this operation
+ uint64_t _drid; ///< Dequeue record id for this operation
+ uint16_t _pfid; ///< Physical file id, to be used when transferring to emap on commit
+ bool _enq_flag; ///< If true, enq op, otherwise deq op
+ bool _commit_flag; ///< (2PC transactions) Records 2PC complete c/a mode
+ bool _aio_compl; ///< Initially false, set to true when record AIO returns
+ txn_data_struct(const uint64_t rid, const uint64_t drid, const uint16_t pfid,
+ const bool enq_flag, const bool commit_flag = false);
+ };
+ typedef txn_data_struct txn_data;
+ typedef std::vector<txn_data> txn_data_list;
+ typedef txn_data_list::iterator tdl_itr;
+
+ /**
+ * \class txn_map
+ * \brief Class for storing transaction data for each open (ie not committed or aborted)
+ * xid in the store. If aborted, records are discarded; if committed, they are
+ * transferred to the enqueue map.
+ *
+ * The data is encapsulated by struct txn_data_struct. A vector containing the information
+ * for each operation included as part of the same transaction is mapped against the
+ * xid.
+ *
+ * The aio_compl flag is set true as each AIO write operation for the enqueue or dequeue
+ * returns. Checking that all of these flags are true for a given xid is the mechanism
+ * used to determine if the transaction is syncronized (through method is_txn_synced()).
+ *
+ * On transaction commit, then each operation is handled as follows:
+ *
+ * If an enqueue (_enq_flag is true), then the rid and pfid are transferred to the enq_map.
+ * If a dequeue (_enq_flag is false), then the rid stored in the drid field is used to
+ * remove the corresponding record from the enq_map.
+ *
+ * On transaction abort, then each operation is handled as follows:
+ *
+ * If an enqueue (_enq_flag is true), then the data is simply discarded.
+ * If a dequeue (_enq_flag is false), then the lock for the corresponding enqueue in enq_map
+ * (if not a part of the same transaction) is removed, and the data discarded.
+ *
+ * <pre>
+ * key data
+ *
+ * xid1 --- vector< [ rid, drid, pfid, enq_flag, commit_flag, aio_compl ] >
+ * xid2 --- vector< [ rid, drid, pfid, enq_flag, commit_flag, aio_compl ] >
+ * xid3 --- vector< [ rid, drid, pfid, enq_flag, commit_flag, aio_compl ] >
+ * ...
+ * </pre>
+ */
+ class txn_map
+ {
+ public:
+ // return/error codes
+ static int16_t TMAP_RID_NOT_FOUND;
+ static int16_t TMAP_XID_NOT_FOUND;
+ static int16_t TMAP_OK;
+ static int16_t TMAP_NOT_SYNCED;
+ static int16_t TMAP_SYNCED;
+
+ private:
+ typedef std::pair<std::string, txn_data_list> xmap_param;
+ typedef std::map<std::string, txn_data_list> xmap;
+ typedef xmap::iterator xmap_itr;
+
+ xmap _map;
+ smutex _mutex;
+// std::vector<uint32_t> _pfid_txn_cnt;
+ const txn_data_list _empty_data_list;
+
+ public:
+ txn_map();
+ virtual ~txn_map();
+
+// void set_num_jfiles(const uint16_t num_jfiles);
+// uint32_t get_txn_pfid_cnt(const uint16_t pfid) const;
+ bool insert_txn_data(const std::string& xid, const txn_data& td);
+ const txn_data_list get_tdata_list(const std::string& xid);
+ const txn_data_list get_remove_tdata_list(const std::string& xid);
+ bool in_map(const std::string& xid);
+ uint32_t enq_cnt();
+ uint32_t deq_cnt();
+ int16_t is_txn_synced(const std::string& xid); // -1=xid not found; 0=not synced; 1=synced
+ int16_t set_aio_compl(const std::string& xid, const uint64_t rid); // -2=rid not found; -1=xid not found; 0=done
+ bool data_exists(const std::string& xid, const uint64_t rid);
+ bool is_enq(const uint64_t rid);
+ inline void clear() { _map.clear(); }
+ inline bool empty() const { return _map.empty(); }
+ inline size_t size() const { return _map.size(); }
+ void xid_list(std::vector<std::string>& xv);
+ private:
+ uint32_t cnt(const bool enq_flag);
+ const txn_data_list get_tdata_list_nolock(const std::string& xid);
+ };
+
+}}
+
+#endif // ifndef QPID_LEGACYSTORE_JRNL_TXN_MAP_H
Added: qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/txn_rec.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/txn_rec.cpp?rev=1534736&view=auto
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/txn_rec.cpp (added)
+++ qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/txn_rec.cpp Tue Oct 22 19:09:56 2013
@@ -0,0 +1,440 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "qpid/linearstore/jrnl/txn_rec.h"
+
+#include <cassert>
+#include <cerrno>
+#include <cstdlib>
+#include <cstring>
+#include <iomanip>
+#include "qpid/linearstore/jrnl/jerrno.h"
+#include "qpid/linearstore/jrnl/jexception.h"
+#include <sstream>
+
+namespace qpid
+{
+namespace qls_jrnl
+{
+
+txn_rec::txn_rec():
+// _txn_hdr(),
+ _xidp(0),
+ _buff(0)
+// _txn_tail()
+{
+ ::txn_hdr_init(&_txn_hdr, 0, QLS_JRNL_VERSION, 0, 0, 0);
+ ::rec_tail_init(&_txn_tail, 0, 0, 0);
+}
+
+txn_rec::txn_rec(const uint32_t magic, const uint64_t rid, const void* const xidp,
+ const std::size_t xidlen/*, const bool owi*/):
+// _txn_hdr(magic, RHM_JDAT_VERSION, rid, xidlen, owi),
+ _xidp(xidp),
+ _buff(0)
+// _txn_tail(_txn_hdr)
+{
+ ::txn_hdr_init(&_txn_hdr, magic, QLS_JRNL_VERSION, 0, rid, xidlen);
+ ::rec_tail_copy(&_txn_tail, &_txn_hdr._rhdr, 0);
+}
+
+txn_rec::~txn_rec()
+{
+ clean();
+}
+
+void
+txn_rec::reset(const uint32_t magic)
+{
+ _txn_hdr._rhdr._magic = magic;
+ _txn_hdr._rhdr._rid = 0;
+ _txn_hdr._xidsize = 0;
+ _xidp = 0;
+ _buff = 0;
+ _txn_tail._xmagic = ~magic;
+ _txn_tail._rid = 0;
+}
+
+void
+txn_rec::reset(const uint32_t magic, const uint64_t rid, const void* const xidp,
+ const std::size_t xidlen/*, const bool owi*/)
+{
+ _txn_hdr._rhdr._magic = magic;
+ _txn_hdr._rhdr._rid = rid;
+// _txn_hdr.set_owi(owi);
+ _txn_hdr._xidsize = xidlen;
+ _xidp = xidp;
+ _buff = 0;
+ _txn_tail._xmagic = ~magic;
+ _txn_tail._rid = rid;
+}
+
+uint32_t
+txn_rec::encode(void* wptr, uint32_t rec_offs_dblks, uint32_t max_size_dblks)
+{
+ assert(wptr != 0);
+ assert(max_size_dblks > 0);
+ assert(_xidp != 0 && _txn_hdr._xidsize > 0);
+
+ std::size_t rec_offs = rec_offs_dblks * QLS_DBLK_SIZE_BYTES;
+ std::size_t rem = max_size_dblks * QLS_DBLK_SIZE_BYTES;
+ std::size_t wr_cnt = 0;
+ if (rec_offs_dblks) // Continuation of split dequeue record (over 2 or more pages)
+ {
+ if (size_dblks(rec_size()) - rec_offs_dblks > max_size_dblks) // Further split required
+ {
+ rec_offs -= sizeof(txn_hdr_t);
+ std::size_t wsize = _txn_hdr._xidsize > rec_offs ? _txn_hdr._xidsize - rec_offs : 0;
+ std::size_t wsize2 = wsize;
+ if (wsize)
+ {
+ if (wsize > rem)
+ wsize = rem;
+ std::memcpy(wptr, (const char*)_xidp + rec_offs, wsize);
+ wr_cnt += wsize;
+ rem -= wsize;
+ }
+ rec_offs -= _txn_hdr._xidsize - wsize2;
+ if (rem)
+ {
+ wsize = sizeof(_txn_tail) > rec_offs ? sizeof(_txn_tail) - rec_offs : 0;
+ wsize2 = wsize;
+ if (wsize)
+ {
+ if (wsize > rem)
+ wsize = rem;
+ std::memcpy((char*)wptr + wr_cnt, (char*)&_txn_tail + rec_offs, wsize);
+ wr_cnt += wsize;
+ rem -= wsize;
+ }
+ rec_offs -= sizeof(_txn_tail) - wsize2;
+ }
+ assert(rem == 0);
+ assert(rec_offs == 0);
+ }
+ else // No further split required
+ {
+ rec_offs -= sizeof(txn_hdr_t);
+ std::size_t wsize = _txn_hdr._xidsize > rec_offs ? _txn_hdr._xidsize - rec_offs : 0;
+ if (wsize)
+ {
+ std::memcpy(wptr, (const char*)_xidp + rec_offs, wsize);
+ wr_cnt += wsize;
+ }
+ rec_offs -= _txn_hdr._xidsize - wsize;
+ wsize = sizeof(_txn_tail) > rec_offs ? sizeof(_txn_tail) - rec_offs : 0;
+ if (wsize)
+ {
+ std::memcpy((char*)wptr + wr_cnt, (char*)&_txn_tail + rec_offs, wsize);
+ wr_cnt += wsize;
+#ifdef QLS_CLEAN
+ std::size_t rec_offs = rec_offs_dblks * QLS_DBLK_SIZE_BYTES;
+ std::size_t dblk_rec_size = size_dblks(rec_size() - rec_offs) * QLS_DBLK_SIZE_BYTES;
+ std::memset((char*)wptr + wr_cnt, QLS_CLEAN_CHAR, dblk_rec_size - wr_cnt);
+#endif
+ }
+ rec_offs -= sizeof(_txn_tail) - wsize;
+ assert(rec_offs == 0);
+ }
+ }
+ else // Start at beginning of data record
+ {
+ // Assumption: the header will always fit into the first dblk
+ std::memcpy(wptr, (void*)&_txn_hdr, sizeof(txn_hdr_t));
+ wr_cnt = sizeof(txn_hdr_t);
+ if (size_dblks(rec_size()) > max_size_dblks) // Split required
+ {
+ std::size_t wsize;
+ rem -= sizeof(txn_hdr_t);
+ if (rem)
+ {
+ wsize = rem >= _txn_hdr._xidsize ? _txn_hdr._xidsize : rem;
+ std::memcpy((char*)wptr + wr_cnt, _xidp, wsize);
+ wr_cnt += wsize;
+ rem -= wsize;
+ }
+ if (rem)
+ {
+ wsize = rem >= sizeof(_txn_tail) ? sizeof(_txn_tail) : rem;
+ std::memcpy((char*)wptr + wr_cnt, (void*)&_txn_tail, wsize);
+ wr_cnt += wsize;
+ rem -= wsize;
+ }
+ assert(rem == 0);
+ }
+ else // No split required
+ {
+ std::memcpy((char*)wptr + wr_cnt, _xidp, _txn_hdr._xidsize);
+ wr_cnt += _txn_hdr._xidsize;
+ std::memcpy((char*)wptr + wr_cnt, (void*)&_txn_tail, sizeof(_txn_tail));
+ wr_cnt += sizeof(_txn_tail);
+#ifdef QLS_CLEAN
+ std::size_t dblk_rec_size = size_dblks(rec_size()) * QLS_DBLK_SIZE_BYTES;
+ std::memset((char*)wptr + wr_cnt, QLS_CLEAN_CHAR, dblk_rec_size - wr_cnt);
+#endif
+ }
+ }
+ return size_dblks(wr_cnt);
+}
+
+uint32_t
+txn_rec::decode(rec_hdr_t& h, void* rptr, uint32_t rec_offs_dblks, uint32_t max_size_dblks)
+{
+ assert(rptr != 0);
+ assert(max_size_dblks > 0);
+
+ std::size_t rd_cnt = 0;
+ if (rec_offs_dblks) // Continuation of record on new page
+ {
+ const uint32_t hdr_xid_dblks = size_dblks(sizeof(txn_hdr_t) + _txn_hdr._xidsize);
+ const uint32_t hdr_xid_tail_dblks = size_dblks(sizeof(txn_hdr_t) + _txn_hdr._xidsize + sizeof(rec_tail_t));
+ const std::size_t rec_offs = rec_offs_dblks * QLS_DBLK_SIZE_BYTES;
+
+ if (hdr_xid_tail_dblks - rec_offs_dblks <= max_size_dblks)
+ {
+ // Remainder of xid fits within this page
+ if (rec_offs - sizeof(txn_hdr_t) < _txn_hdr._xidsize)
+ {
+ // Part of xid still outstanding, copy remainder of xid and tail
+ const std::size_t xid_offs = rec_offs - sizeof(txn_hdr_t);
+ const std::size_t xid_rem = _txn_hdr._xidsize - xid_offs;
+ std::memcpy((char*)_buff + xid_offs, rptr, xid_rem);
+ rd_cnt = xid_rem;
+ std::memcpy((void*)&_txn_tail, ((char*)rptr + rd_cnt), sizeof(_txn_tail));
+ chk_tail();
+ rd_cnt += sizeof(_txn_tail);
+ }
+ else
+ {
+ // Tail or part of tail only outstanding, complete tail
+ const std::size_t tail_offs = rec_offs - sizeof(txn_hdr_t) - _txn_hdr._xidsize;
+ const std::size_t tail_rem = sizeof(rec_tail_t) - tail_offs;
+ std::memcpy((char*)&_txn_tail + tail_offs, rptr, tail_rem);
+ chk_tail();
+ rd_cnt = tail_rem;
+ }
+ }
+ else if (hdr_xid_dblks - rec_offs_dblks <= max_size_dblks)
+ {
+ // Remainder of xid fits within this page, tail split
+ const std::size_t xid_offs = rec_offs - sizeof(txn_hdr_t);
+ const std::size_t xid_rem = _txn_hdr._xidsize - xid_offs;
+ std::memcpy((char*)_buff + xid_offs, rptr, xid_rem);
+ rd_cnt += xid_rem;
+ const std::size_t tail_rem = (max_size_dblks * QLS_DBLK_SIZE_BYTES) - rd_cnt;
+ if (tail_rem)
+ {
+ std::memcpy((void*)&_txn_tail, ((char*)rptr + xid_rem), tail_rem);
+ rd_cnt += tail_rem;
+ }
+ }
+ else
+ {
+ // Remainder of xid split
+ const std::size_t xid_cp_size = (max_size_dblks * QLS_DBLK_SIZE_BYTES);
+ std::memcpy((char*)_buff + rec_offs - sizeof(txn_hdr_t), rptr, xid_cp_size);
+ rd_cnt += xid_cp_size;
+ }
+ }
+ else // Start of record
+ {
+ // Get and check header
+ //_txn_hdr.hdr_copy(h);
+ ::rec_hdr_copy(&_txn_hdr._rhdr, &h);
+ rd_cnt = sizeof(rec_hdr_t);
+#if defined(JRNL_BIG_ENDIAN) && defined(JRNL_32_BIT)
+ rd_cnt += sizeof(uint32_t); // Filler 0
+#endif
+ _txn_hdr._xidsize = *(std::size_t*)((char*)rptr + rd_cnt);
+ rd_cnt = sizeof(txn_hdr_t);
+ chk_hdr();
+ _buff = std::malloc(_txn_hdr._xidsize);
+ MALLOC_CHK(_buff, "_buff", "txn_rec", "decode");
+ const uint32_t hdr_xid_dblks = size_dblks(sizeof(txn_hdr_t) + _txn_hdr._xidsize);
+ const uint32_t hdr_xid_tail_dblks = size_dblks(sizeof(txn_hdr_t) + _txn_hdr._xidsize +
+ sizeof(rec_tail_t));
+
+ // Check if record (header + xid + tail) fits within this page, we can check the
+ // tail before the expense of copying data to memory
+ if (hdr_xid_tail_dblks <= max_size_dblks)
+ {
+ // Entire header, xid and tail fits within this page
+ std::memcpy(_buff, (char*)rptr + rd_cnt, _txn_hdr._xidsize);
+ rd_cnt += _txn_hdr._xidsize;
+ std::memcpy((void*)&_txn_tail, (char*)rptr + rd_cnt, sizeof(_txn_tail));
+ rd_cnt += sizeof(_txn_tail);
+ chk_tail();
+ }
+ else if (hdr_xid_dblks <= max_size_dblks)
+ {
+ // Entire header and xid fit within this page, tail split
+ std::memcpy(_buff, (char*)rptr + rd_cnt, _txn_hdr._xidsize);
+ rd_cnt += _txn_hdr._xidsize;
+ const std::size_t tail_rem = (max_size_dblks * QLS_DBLK_SIZE_BYTES) - rd_cnt;
+ if (tail_rem)
+ {
+ std::memcpy((void*)&_txn_tail, (char*)rptr + rd_cnt, tail_rem);
+ rd_cnt += tail_rem;
+ }
+ }
+ else
+ {
+ // Header fits within this page, xid split
+ const std::size_t xid_cp_size = (max_size_dblks * QLS_DBLK_SIZE_BYTES) - rd_cnt;
+ std::memcpy(_buff, (char*)rptr + rd_cnt, xid_cp_size);
+ rd_cnt += xid_cp_size;
+ }
+ }
+ return size_dblks(rd_cnt);
+}
+
+bool
+txn_rec::rcv_decode(rec_hdr_t h, std::ifstream* ifsp, std::size_t& rec_offs)
+{
+ if (rec_offs == 0)
+ {
+ // Read header, allocate for xid
+ //_txn_hdr.hdr_copy(h);
+ ::rec_hdr_copy(&_txn_hdr._rhdr, &h);
+#if defined(JRNL_BIG_ENDIAN) && defined(JRNL_32_BIT)
+ ifsp->ignore(sizeof(uint32_t)); // _filler0
+#endif
+ ifsp->read((char*)&_txn_hdr._xidsize, sizeof(std::size_t));
+#if defined(JRNL_LITTLE_ENDIAN) && defined(JRNL_32_BIT)
+ ifsp->ignore(sizeof(uint32_t)); // _filler0
+#endif
+ rec_offs = sizeof(txn_hdr_t);
+ _buff = std::malloc(_txn_hdr._xidsize);
+ MALLOC_CHK(_buff, "_buff", "txn_rec", "rcv_decode");
+ }
+ if (rec_offs < sizeof(txn_hdr_t) + _txn_hdr._xidsize)
+ {
+ // Read xid (or continue reading xid)
+ std::size_t offs = rec_offs - sizeof(txn_hdr_t);
+ ifsp->read((char*)_buff + offs, _txn_hdr._xidsize - offs);
+ std::size_t size_read = ifsp->gcount();
+ rec_offs += size_read;
+ if (size_read < _txn_hdr._xidsize - offs)
+ {
+ assert(ifsp->eof());
+ // As we may have read past eof, turn off fail bit
+ ifsp->clear(ifsp->rdstate()&(~std::ifstream::failbit));
+ assert(!ifsp->fail() && !ifsp->bad());
+ return false;
+ }
+ }
+ if (rec_offs < sizeof(txn_hdr_t) + _txn_hdr._xidsize + sizeof(rec_tail_t))
+ {
+ // Read tail (or continue reading tail)
+ std::size_t offs = rec_offs - sizeof(txn_hdr_t) - _txn_hdr._xidsize;
+ ifsp->read((char*)&_txn_tail + offs, sizeof(rec_tail_t) - offs);
+ std::size_t size_read = ifsp->gcount();
+ rec_offs += size_read;
+ if (size_read < sizeof(rec_tail_t) - offs)
+ {
+ assert(ifsp->eof());
+ // As we may have read past eof, turn off fail bit
+ ifsp->clear(ifsp->rdstate()&(~std::ifstream::failbit));
+ assert(!ifsp->fail() && !ifsp->bad());
+ return false;
+ }
+ }
+ ifsp->ignore(rec_size_dblks() * QLS_DBLK_SIZE_BYTES - rec_size());
+ chk_tail(); // Throws if tail invalid or record incomplete
+ assert(!ifsp->fail() && !ifsp->bad());
+ return true;
+}
+
+std::size_t
+txn_rec::get_xid(void** const xidpp)
+{
+ if (!_buff)
+ {
+ *xidpp = 0;
+ return 0;
+ }
+ *xidpp = _buff;
+ return _txn_hdr._xidsize;
+}
+
+std::string&
+txn_rec::str(std::string& str) const
+{
+ std::ostringstream oss;
+ if (_txn_hdr._rhdr._magic == QLS_TXA_MAGIC)
+ oss << "dtxa_rec: m=" << _txn_hdr._rhdr._magic;
+ else
+ oss << "dtxc_rec: m=" << _txn_hdr._rhdr._magic;
+ oss << " v=" << (int)_txn_hdr._rhdr._version;
+ oss << " rid=" << _txn_hdr._rhdr._rid;
+ oss << " xid=\"" << _xidp << "\"";
+ str.append(oss.str());
+ return str;
+}
+
+std::size_t
+txn_rec::xid_size() const
+{
+ return _txn_hdr._xidsize;
+}
+
+std::size_t
+txn_rec::rec_size() const
+{
+ return sizeof(txn_hdr_t) + _txn_hdr._xidsize + sizeof(rec_tail_t);
+}
+
+void
+txn_rec::chk_hdr() const
+{
+ jrec::chk_hdr(_txn_hdr._rhdr);
+ if (_txn_hdr._rhdr._magic != QLS_TXA_MAGIC && _txn_hdr._rhdr._magic != QLS_TXC_MAGIC)
+ {
+ std::ostringstream oss;
+ oss << std::hex << std::setfill('0');
+ oss << "dtx magic: rid=0x" << std::setw(16) << _txn_hdr._rhdr._rid;
+ oss << ": expected=(0x" << std::setw(8) << QLS_TXA_MAGIC;
+ oss << " or 0x" << QLS_TXC_MAGIC;
+ oss << ") read=0x" << std::setw(2) << (int)_txn_hdr._rhdr._magic;
+ throw jexception(jerrno::JERR_JREC_BADRECHDR, oss.str(), "txn_rec", "chk_hdr");
+ }
+}
+
+void
+txn_rec::chk_hdr(uint64_t rid) const
+{
+ chk_hdr();
+ jrec::chk_rid(_txn_hdr._rhdr, rid);
+}
+
+void
+txn_rec::chk_tail() const
+{
+ jrec::chk_tail(_txn_tail, _txn_hdr._rhdr);
+}
+
+void
+txn_rec::clean()
+{
+ // clean up allocated memory here
+}
+
+}}
Added: qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/txn_rec.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/txn_rec.h?rev=1534736&view=auto
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/txn_rec.h (added)
+++ qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/txn_rec.h Tue Oct 22 19:09:56 2013
@@ -0,0 +1,88 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#ifndef QPID_LEGACYSTORE_JRNL_TXN_REC_H
+#define QPID_LEGACYSTORE_JRNL_TXN_REC_H
+
+namespace qpid
+{
+namespace qls_jrnl
+{
+class txn_rec;
+}}
+
+#include <cstddef>
+#include "qpid/linearstore/jrnl/jrec.h"
+#include "qpid/linearstore/jrnl/utils/txn_hdr.h"
+
+namespace qpid
+{
+namespace qls_jrnl
+{
+
+ /**
+ * \class txn_rec
+ * \brief Class to handle a single journal DTX commit or abort record.
+ */
+ class txn_rec : public jrec
+ {
+ private:
+ txn_hdr_t _txn_hdr; ///< transaction header
+ const void* _xidp; ///< xid pointer for encoding (writing to disk)
+ void* _buff; ///< Pointer to buffer to receive data read from disk
+ rec_tail_t _txn_tail; ///< Record tail
+
+ public:
+ // constructor used for read operations and xid must have memory allocated
+ txn_rec();
+ // constructor used for write operations, where xid already exists
+ txn_rec(const uint32_t magic, const uint64_t rid, const void* const xidp,
+ const std::size_t xidlen/*, const bool owi*/);
+ virtual ~txn_rec();
+
+ // Prepare instance for use in reading data from journal
+ void reset(const uint32_t magic);
+ // Prepare instance for use in writing data to journal
+ void reset(const uint32_t magic, const uint64_t rid, const void* const xidp,
+ const std::size_t xidlen/*, const bool owi*/);
+ uint32_t encode(void* wptr, uint32_t rec_offs_dblks, uint32_t max_size_dblks);
+ uint32_t decode(rec_hdr_t& h, void* rptr, uint32_t rec_offs_dblks,
+ uint32_t max_size_dblks);
+ // Decode used for recover
+ bool rcv_decode(rec_hdr_t h, std::ifstream* ifsp, std::size_t& rec_offs);
+
+ std::size_t get_xid(void** const xidpp);
+ std::string& str(std::string& str) const;
+ inline std::size_t data_size() const { return 0; } // This record never carries data
+ std::size_t xid_size() const;
+ std::size_t rec_size() const;
+ inline uint64_t rid() const { return _txn_hdr._rhdr._rid; }
+
+ private:
+ void chk_hdr() const;
+ void chk_hdr(uint64_t rid) const;
+ void chk_tail() const;
+ virtual void clean();
+ }; // class txn_rec
+
+}}
+
+#endif // ifndef QPID_LEGACYSTORE_JRNL_TXN_REC_H
Added: qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/utils/deq_hdr.c
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/utils/deq_hdr.c?rev=1534736&view=auto
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/utils/deq_hdr.c (added)
+++ qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/utils/deq_hdr.c Tue Oct 22 19:09:56 2013
@@ -0,0 +1,46 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "deq_hdr.h"
+
+/*static const uint16_t DEQ_HDR_TXNCMPLCOMMIT_MASK = 0x10;*/
+
+void deq_hdr_init(deq_hdr_t* dest, const uint32_t magic, const uint16_t version, const uint16_t uflag, const uint64_t
+ rid, const uint64_t deq_rid, const uint64_t xidsize) {
+ rec_hdr_init(&dest->_rhdr, magic, version, uflag, rid);
+ dest->_deq_rid = deq_rid;
+ dest->_xidsize = xidsize;
+}
+
+void deq_hdr_copy(deq_hdr_t* dest, const deq_hdr_t* src) {
+ rec_hdr_copy(&dest->_rhdr, &src->_rhdr);
+ dest->_deq_rid = src->_deq_rid;
+ dest->_xidsize = src->_xidsize;
+}
+
+bool is_txn_coml_commit(const deq_hdr_t *dh) {
+ return dh->_rhdr._uflag & DEQ_HDR_TXNCMPLCOMMIT_MASK;
+}
+
+void set_txn_coml_commit(deq_hdr_t *dh, const bool commit) {
+ dh->_rhdr._uflag = commit ? dh->_rhdr._uflag | DEQ_HDR_TXNCMPLCOMMIT_MASK :
+ dh->_rhdr._uflag & (~DEQ_HDR_TXNCMPLCOMMIT_MASK);
+}
Added: qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/utils/deq_hdr.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/utils/deq_hdr.h?rev=1534736&view=auto
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/utils/deq_hdr.h (added)
+++ qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/utils/deq_hdr.h Tue Oct 22 19:09:56 2013
@@ -0,0 +1,81 @@
+#ifndef QPID_LINEARSTORE_JRNL_UTILS_DEQ_HDR_H
+#define QPID_LINEARSTORE_JRNL_UTILS_DEQ_HDR_H
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include <stdbool.h>
+#include "rec_hdr.h"
+
+#ifdef __cplusplus
+extern "C"{
+#endif
+
+#pragma pack(1)
+
+/**
+ * \brief Struct for dequeue record.
+ *
+ * Struct for dequeue record. If this record has a non-zero xidsize field (i.e., there is a
+ * valid XID), then this header is followed by the XID of xidsize bytes and a rec_tail. If,
+ * on the other hand, this record has a zero xidsize (i.e., there is no XID), then the rec_tail
+ * is absent.
+ *
+ * Note that this record had its own rid distinct from the rid of the record it is dequeueing.
+ * The rid field below is the rid of the dequeue record itself; the deq-rid field is the rid of a
+ * previous enqueue record being dequeued by this record.
+ *
+ * Record header info in binary format (32 bytes):
+ * <pre>
+ * 0 7
+ * +---+---+---+---+---+---+---+---+ -+
+ * | magic | ver | flags | |
+ * +---+---+---+---+---+---+---+---+ | struct rec_hdr_t
+ * | rid | |
+ * +---+---+---+---+---+---+---+---+ -+
+ * | deq-rid |
+ * +---+---+---+---+---+---+---+---+
+ * | xidsize |
+ * +---+---+---+---+---+---+---+---+
+ *
+ * deq-rid = dequeue record ID
+ * </pre>
+ */
+typedef struct deq_hdr_t {
+ rec_hdr_t _rhdr; /**< Common record header struct */
+ uint64_t _deq_rid; /**< Record ID of record being dequeued */
+ uint64_t _xidsize; /**< XID size */
+} deq_hdr_t;
+
+static const uint16_t DEQ_HDR_TXNCMPLCOMMIT_MASK = 0x10;
+
+void deq_hdr_init(deq_hdr_t* dest, const uint32_t magic, const uint16_t version, const uint16_t uflag,
+ const uint64_t rid, const uint64_t deq_rid, const uint64_t xidsize);
+void deq_hdr_copy(deq_hdr_t* dest, const deq_hdr_t* src);
+bool is_txn_coml_commit(const deq_hdr_t *dh);
+void set_txn_coml_commit(deq_hdr_t *dh, const bool commit);
+
+#pragma pack()
+
+#ifdef __cplusplus
+}
+#endif
+
+#endif /* ifndef QPID_LINEARSTORE_JRNL_UTILS_DEQ_HDR_H */
Added: qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/utils/enq_hdr.c
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/utils/enq_hdr.c?rev=1534736&view=auto
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/utils/enq_hdr.c (added)
+++ qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/utils/enq_hdr.c Tue Oct 22 19:09:56 2013
@@ -0,0 +1,63 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "enq_hdr.h"
+
+//static const uint16_t ENQ_HDR_TRANSIENT_MASK = 0x10;
+//static const uint16_t ENQ_HDR_EXTERNAL_MASK = 0x20;
+
+void enq_hdr_init(enq_hdr_t* dest, const uint32_t magic, const uint16_t version, const uint16_t uflag,
+ const uint64_t rid, const uint64_t xidsize, const uint64_t dsize) {
+ rec_hdr_init(&dest->_rhdr, magic, version, uflag, rid);
+ dest->_xidsize = xidsize;
+ dest->_dsize = dsize;
+}
+
+void enq_hdr_copy(enq_hdr_t* dest, const enq_hdr_t* src) {
+ rec_hdr_copy(&dest->_rhdr, &src->_rhdr);
+ dest->_xidsize = src->_xidsize;
+ dest->_dsize = src->_dsize;
+}
+
+bool is_enq_transient(const enq_hdr_t *eh) {
+ return eh->_rhdr._uflag & ENQ_HDR_TRANSIENT_MASK;
+}
+
+void set_enq_transient(enq_hdr_t *eh, const bool transient) {
+ eh->_rhdr._uflag = transient ? eh->_rhdr._uflag | ENQ_HDR_TRANSIENT_MASK :
+ eh->_rhdr._uflag & (~ENQ_HDR_TRANSIENT_MASK);
+}
+
+bool is_enq_external(const enq_hdr_t *eh) {
+ return eh->_rhdr._uflag & ENQ_HDR_EXTERNAL_MASK;
+}
+
+void set_enq_external(enq_hdr_t *eh, const bool external) {
+ eh->_rhdr._uflag = external ? eh->_rhdr._uflag | ENQ_HDR_EXTERNAL_MASK :
+ eh->_rhdr._uflag & (~ENQ_HDR_EXTERNAL_MASK);
+}
+
+bool validate_enq_hdr(enq_hdr_t *eh, const uint32_t magic, const uint16_t version, const uint64_t rid) {
+ return eh->_rhdr._magic == magic &&
+ eh->_rhdr._version == version &&
+ rid > 0 ? eh->_rhdr._rid == rid /* If rid == 0, don't compare rids */
+ : true;
+}
Added: qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/utils/enq_hdr.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/utils/enq_hdr.h?rev=1534736&view=auto
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/utils/enq_hdr.h (added)
+++ qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/utils/enq_hdr.h Tue Oct 22 19:09:56 2013
@@ -0,0 +1,81 @@
+#ifndef QPID_LINEARSTORE_JRNL_UTILS_ENQ_HDR_H
+#define QPID_LINEARSTORE_JRNL_UTILS_ENQ_HDR_H
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include <stdbool.h>
+#include "rec_hdr.h"
+
+#ifdef __cplusplus
+extern "C"{
+#endif
+
+#pragma pack(1)
+
+/**
+ * \brief Struct for enqueue record.
+ *
+ * Struct for enqueue record. In addition to the common data, this header includes both the
+ * xid and data blob sizes.
+ *
+ * This header precedes all enqueue data in journal files.
+ *
+ * Record header info in binary format (32 bytes):
+ * <pre>
+ * 0 7
+ * +---+---+---+---+---+---+---+---+ -+
+ * | magic | ver | flags | |
+ * +---+---+---+---+---+---+---+---+ | struct rec_hdr_t
+ * | rid | |
+ * +---+---+---+---+---+---+---+---+ -+
+ * | xidsize |
+ * +---+---+---+---+---+---+---+---+
+ * | dsize |
+ * +---+---+---+---+---+---+---+---+
+ * v = file version (If the format or encoding of this file changes, then this
+ * number should be incremented)
+ * </pre>
+ */
+typedef struct enq_hdr_t {
+ rec_hdr_t _rhdr; /**< Common record header struct */
+ uint64_t _xidsize; /**< XID size in octets */
+ uint64_t _dsize; /**< Record data size in octets */
+} enq_hdr_t;
+
+static const uint16_t ENQ_HDR_TRANSIENT_MASK = 0x10;
+static const uint16_t ENQ_HDR_EXTERNAL_MASK = 0x20;
+
+void enq_hdr_init(enq_hdr_t* dest, const uint32_t magic, const uint16_t version, const uint16_t uflag,
+ const uint64_t rid, const uint64_t xidsize, const uint64_t dsize);
+void enq_hdr_copy(enq_hdr_t* dest, const enq_hdr_t* src);
+bool is_enq_transient(const enq_hdr_t *eh);
+void set_enq_transient(enq_hdr_t *eh, const bool transient);
+bool is_enq_external(const enq_hdr_t *eh);
+void set_enq_external(enq_hdr_t *eh, const bool external);
+bool validate_enq_hdr(enq_hdr_t *eh, const uint32_t magic, const uint16_t version, const uint64_t rid);
+
+#pragma pack()
+
+#ifdef __cplusplus
+}
+#endif
+
+#endif /* ifndef QPID_LINEARSTORE_JRNL_UTILS_ENQ_HDR_H */
Added: qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/utils/file_hdr.c
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/utils/file_hdr.c?rev=1534736&view=auto
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/utils/file_hdr.c (added)
+++ qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/utils/file_hdr.c Tue Oct 22 19:09:56 2013
@@ -0,0 +1,105 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "file_hdr.h"
+#include <string.h>
+
+void file_hdr_create(file_hdr_t* dest, const uint32_t magic, const uint16_t version, const uint16_t fhdr_size_sblks,
+ const uint16_t efp_partition, const uint64_t file_size) {
+ rec_hdr_init(&dest->_rhdr, magic, version, 0, 0);
+ dest->_fhdr_size_sblks = fhdr_size_sblks;
+ dest->_efp_partition = efp_partition;
+ dest->_reserved = 0;
+ dest->_file_size_kib = file_size;
+ dest->_fro = 0;
+ dest->_ts_nsec = 0;
+ dest->_ts_sec = 0;
+ dest->_file_number = 0;
+ dest->_queue_name_len = 0;
+}
+
+int file_hdr_init(void* dest, const uint64_t dest_len, const uint16_t uflag, const uint64_t rid, const uint64_t fro,
+ const uint64_t file_number, const uint16_t queue_name_len, const char* queue_name) {
+ file_hdr_t* fhp = (file_hdr_t*)dest;
+ fhp->_rhdr._uflag = uflag;
+ fhp->_rhdr._rid = rid;
+ fhp->_fro = fro;
+ fhp->_file_number = file_number;
+ if (sizeof(file_hdr_t) + queue_name_len < MAX_FILE_HDR_LEN) {
+ fhp->_queue_name_len = queue_name_len;
+ } else {
+ fhp->_queue_name_len = MAX_FILE_HDR_LEN - sizeof(file_hdr_t);
+ }
+ fhp->_queue_name_len = queue_name_len;
+ memcpy((char*)dest + sizeof(file_hdr_t), queue_name, queue_name_len);
+ memset((char*)dest + sizeof(file_hdr_t) + queue_name_len, 0, dest_len - sizeof(file_hdr_t) - queue_name_len);
+ return set_time_now(dest);
+}
+
+void file_hdr_copy(file_hdr_t* dest, const file_hdr_t* src) {
+ rec_hdr_copy(&dest->_rhdr, &src->_rhdr);
+ dest->_fhdr_size_sblks = src->_fhdr_size_sblks; // Should this be copied?
+ dest->_efp_partition = src->_efp_partition; // Should this be copied?
+ dest->_file_size_kib = src->_file_size_kib;
+ dest->_fro = src->_fro;
+ dest->_ts_sec = src->_ts_sec;
+ dest->_ts_nsec = src->_ts_nsec;
+ dest->_file_number = src->_file_number;
+}
+
+void file_hdr_reset(file_hdr_t* target) {
+ target->_rhdr._uflag = 0;
+ target->_rhdr._rid = 0;
+ target->_fro = 0;
+ target->_ts_sec = 0;
+ target->_ts_nsec = 0;
+ target->_file_number = 0;
+ target->_queue_name_len = 0;
+}
+
+int is_file_hdr_reset(file_hdr_t* target) {
+ return target->_rhdr._uflag == 0 &&
+ target->_rhdr._rid == 0 &&
+ target->_ts_sec == 0 &&
+ target->_ts_nsec == 0 &&
+ target->_file_number == 0 &&
+ target->_queue_name_len == 0;
+}
+
+int set_time_now(file_hdr_t *fh)
+{
+ struct timespec ts;
+ int err = clock_gettime(CLOCK_REALTIME, &ts);
+ if (err)
+ return err;
+ fh->_ts_sec = ts.tv_sec;
+ fh->_ts_nsec = ts.tv_nsec;
+ return 0;
+}
+
+
+void set_time(file_hdr_t *fh, struct timespec *ts)
+{
+ fh->_ts_sec = ts->tv_sec;
+ fh->_ts_nsec = ts->tv_nsec;
+}
+
+
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org