You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kp...@apache.org on 2013/10/22 21:09:58 UTC
svn commit: r1534736 [8/8] - in /qpid/trunk/qpid/cpp/src: ./
qpid/linearstore/ qpid/linearstore/jrnl/ qpid/linearstore/jrnl/utils/
tests/linearstore/
Added: qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/utils/file_hdr.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/utils/file_hdr.h?rev=1534736&view=auto
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/utils/file_hdr.h (added)
+++ qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/utils/file_hdr.h Tue Oct 22 19:09:56 2013
@@ -0,0 +1,106 @@
+#ifndef QPID_LINEARSTORE_JRNL_UTILS_FILE_HDR_H
+#define QPID_LINEARSTORE_JRNL_UTILS_FILE_HDR_H
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include <time.h>
+#include "rec_hdr.h"
+
+#ifdef __cplusplus
+extern "C"{
+#endif
+
+#define MAX_FILE_HDR_LEN 4096 // Set to 1 sblk
+
+#pragma pack(1)
+
+/**
+ * \brief Struct for data common to the head of all journal files. In addition to
+ * the common data, this includes the record ID and offset of the first record in
+ * the file.
+ *
+ * This header precedes all data in journal files and occupies the first complete
+ * block in the file. The record ID and offset are updated on each overwrite of the
+ * file.
+ *
+ * File header info in binary format (66 bytes + size of file name in octets):
+ * <pre>
+ * 0 7
+ * +---+---+---+---+---+---+---+---+ -+
+ * | magic | ver | flags | |
+ * +---+---+---+---+---+---+---+---+ | struct rec_hdr_t
+ * | first rid in file | |
+ * +---+---+---+---+---+---+---+---+ -+
+ * | fs | partn | reserved |
+ * +---+---+---+---+---+---+---+---+
+ * | file-size |
+ * +---+---+---+---+---+---+---+---+
+ * | fro |
+ * +---+---+---+---+---+---+---+---+
+ * | timestamp (sec) |
+ * +---+---+---+---+---+---+---+---+
+ * | timestamp (ns) |
+ * +---+---+---+---+---+---+---+---+
+ * | file-number |
+ * +---+---+---+---+---+---+---+---+
+ * | qnl | Queue Name... |
+ * +-------+ |
+ * | |
+ * +---+---+---+---+---+---+---+---+
+ *
+ * ver = Journal version
+ * rid = Record ID
+ * fs = File header size in sblks (defined by JRNL_SBLK_SIZE)
+ * partn = EFP partition from which this file came
+ * fro = First Record Offset
+ * qnl = Length of the queue name in octets.
+ * </pre>
+ */
+typedef struct file_hdr_t {
+ rec_hdr_t _rhdr; /**< Common record header struct, but rid field is used for rid of first compete record in file */
+ uint16_t _fhdr_size_sblks; /**< File header size in sblks (defined by JRNL_SBLK_SIZE) */
+ uint16_t _efp_partition; /**< EFP Partition number from which this file was obtained */
+ uint32_t _reserved;
+ uint64_t _file_size_kib; /**< Size of this file in KiB, excluding header sblk */
+ uint64_t _fro; /**< First Record Offset (FRO) */
+ uint64_t _ts_sec; /**< Time stamp (seconds part) */
+ uint64_t _ts_nsec; /**< Time stamp (nanoseconds part) */
+ uint64_t _file_number; /**< The logical number of this file in a monotonically increasing sequence */
+ uint16_t _queue_name_len; /**< Length of the queue name in octets, which follows this struct in the header */
+} file_hdr_t;
+
+void file_hdr_create(file_hdr_t* dest, const uint32_t magic, const uint16_t version, const uint16_t fhdr_size_sblks,
+ const uint16_t efp_partition, const uint64_t file_size);
+int file_hdr_init(void* dest, const uint64_t dest_len, const uint16_t uflag, const uint64_t rid, const uint64_t fro,
+ const uint64_t file_number, const uint16_t queue_name_len, const char* queue_name);
+void file_hdr_reset(file_hdr_t* target);
+int is_file_hdr_reset(file_hdr_t* target);
+void file_hdr_copy(file_hdr_t* dest, const file_hdr_t* src);
+int set_time_now(file_hdr_t *fh);
+void set_time(file_hdr_t *fh, struct timespec *ts);
+
+#pragma pack()
+
+#ifdef __cplusplus
+}
+#endif
+
+#endif /* ifndef QPID_LINEARSTORE_JRNL_UTILS_FILE_HDR_H */
Added: qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/utils/rec_hdr.c
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/utils/rec_hdr.c?rev=1534736&view=auto
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/utils/rec_hdr.c (added)
+++ qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/utils/rec_hdr.c Tue Oct 22 19:09:56 2013
@@ -0,0 +1,15 @@
+#include "rec_hdr.h"
+
+void rec_hdr_init(rec_hdr_t* dest, const uint32_t magic, const uint16_t version, const uint16_t uflag, const uint64_t rid) {
+ dest->_magic = magic;
+ dest->_version = version;
+ dest->_uflag = uflag;
+ dest->_rid = rid;
+}
+
+void rec_hdr_copy(rec_hdr_t* dest, const rec_hdr_t* src) {
+ dest->_magic = src->_magic;
+ dest->_version = src->_version;
+ dest->_uflag = src->_uflag;
+ dest->_rid = src->_rid;
+}
Added: qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/utils/rec_hdr.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/utils/rec_hdr.h?rev=1534736&view=auto
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/utils/rec_hdr.h (added)
+++ qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/utils/rec_hdr.h Tue Oct 22 19:09:56 2013
@@ -0,0 +1,67 @@
+#ifndef QPID_LINEARSTORE_JRNL_UTILS_REC_HDR_H
+#define QPID_LINEARSTORE_JRNL_UTILS_REC_HDR_H
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include <stdint.h>
+
+#ifdef __cplusplus
+extern "C"{
+#endif
+
+#pragma pack(1)
+
+/**
+ * \brief Struct for data common to the head of all journal files and records.
+ * This includes identification for the file type, the encoding version, endian
+ * indicator and a record ID.
+ *
+ * File header info in binary format (16 bytes):
+ * <pre>
+ * 0 7
+ * +---+---+---+---+---+---+---+---+
+ * | magic | ver | flags |
+ * +---+---+---+---+---+---+---+---+
+ * | rid |
+ * +---+---+---+---+---+---+---+---+
+ *
+ * ver = file version (If the format or encoding of this file changes, then this
+ * number should be incremented)
+ * rid = Record ID
+ * </pre>
+ */
+typedef struct rec_hdr_t {
+ uint32_t _magic; /**< File type identifier (magic number) */
+ uint16_t _version; /**< File encoding version */
+ uint16_t _uflag; /**< User-defined flags */
+ uint64_t _rid; /**< Record ID (rotating 64-bit counter) */
+} rec_hdr_t;
+
+void rec_hdr_init(rec_hdr_t* dest, const uint32_t magic, const uint16_t version, const uint16_t uflag, const uint64_t rid);
+void rec_hdr_copy(rec_hdr_t* dest, const rec_hdr_t* src);
+
+#pragma pack()
+
+#ifdef __cplusplus
+}
+#endif
+
+#endif /* ifndef QPID_LINEARSTORE_JRNL_UTILS_REC_HDR_H */
Added: qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/utils/rec_tail.c
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/utils/rec_tail.c?rev=1534736&view=auto
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/utils/rec_tail.c (added)
+++ qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/utils/rec_tail.c Tue Oct 22 19:09:56 2013
@@ -0,0 +1,34 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "rec_tail.h"
+
+void rec_tail_init(rec_tail_t* dest, const uint32_t xmagic, const uint32_t checksum, const uint64_t rid) {
+ dest->_xmagic = xmagic;
+ dest->_checksum = checksum;
+ dest->_rid = rid;
+}
+
+void rec_tail_copy(rec_tail_t* dest, const rec_hdr_t* src, const uint32_t checksum) {
+ dest->_xmagic = ~(src->_magic);
+ dest->_checksum = checksum;
+ dest->_rid = src->_rid;
+}
Added: qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/utils/rec_tail.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/utils/rec_tail.h?rev=1534736&view=auto
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/utils/rec_tail.h (added)
+++ qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/utils/rec_tail.h Tue Oct 22 19:09:56 2013
@@ -0,0 +1,71 @@
+#ifndef QPID_LEGACYSTORE_JRNL_UTILS_REC_TAIL_H
+#define QPID_LEGACYSTORE_JRNL_UTILS_REC_TAIL_H
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include <stdint.h>
+#include "rec_hdr.h"
+
+#ifdef __cplusplus
+extern "C"{
+#endif
+
+#pragma pack(1)
+
+/**
+ * \brief Struct for data common to the tail of all records. The magic number
+ * used here is the binary inverse (1's complement) of the magic used in the
+ * record header; this minimizes possible confusion with other headers that may
+ * be present during recovery. The tail is used with all records that have either
+ * XIDs or data - ie any size-variable content. Currently the only records that
+ * do NOT use the tail are non-transactional dequeues and filler records.
+ *
+ * The checksum is used to verify the xid and/or data portion of the record
+ * on recovery, and excludes the header and tail.
+ *
+ * Record header info in binary format (16 bytes):
+ * <pre>
+ * 0 7
+ * +---+---+---+---+---+---+---+---+
+ * | ~(magic) | checksum |
+ * +---+---+---+---+---+---+---+---+
+ * | rid |
+ * +---+---+---+---+---+---+---+---+
+ *
+ * rid = Record ID
+ * </pre>
+ */
+typedef struct rec_tail_t {
+ uint32_t _xmagic; /**< Binary inverse (1's complement) of hdr magic number */
+ uint32_t _checksum; /**< Checksum of xid and data */
+ uint64_t _rid; /**< ID (rotating 64-bit counter) */
+} rec_tail_t;
+
+void rec_tail_init(rec_tail_t* dest, const uint32_t xmagic, const uint32_t checksum, const uint64_t rid);
+void rec_tail_copy(rec_tail_t* dest, const rec_hdr_t* src, const uint32_t checksum);
+
+#pragma pack()
+
+#ifdef __cplusplus
+}
+#endif
+
+#endif /* ifnedf QPID_LEGACYSTORE_JRNL_UTILS_REC_TAIL_H */
Added: qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/utils/txn_hdr.c
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/utils/txn_hdr.c?rev=1534736&view=auto
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/utils/txn_hdr.c (added)
+++ qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/utils/txn_hdr.c Tue Oct 22 19:09:56 2013
@@ -0,0 +1,33 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "txn_hdr.h"
+
+void txn_hdr_init(txn_hdr_t* dest, const uint32_t magic, const uint16_t version, const uint16_t uflag,
+ const uint64_t rid, const uint64_t xidsize) {
+ rec_hdr_init(&dest->_rhdr, magic, version, uflag, rid);
+ dest->_xidsize = xidsize;
+}
+
+void txn_hdr_copy(txn_hdr_t* dest, const txn_hdr_t* src) {
+ rec_hdr_copy(&dest->_rhdr, &src->_rhdr);
+ dest->_xidsize = src->_xidsize;
+}
Added: qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/utils/txn_hdr.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/utils/txn_hdr.h?rev=1534736&view=auto
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/utils/txn_hdr.h (added)
+++ qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/utils/txn_hdr.h Tue Oct 22 19:09:56 2013
@@ -0,0 +1,70 @@
+#ifndef QPID_LINEARSTORE_JRNL_UTILS_TXN_HDR_H
+#define QPID_LINEARSTORE_JRNL_UTILS_TXN_HDR_H
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "rec_hdr.h"
+
+#ifdef __cplusplus
+extern "C"{
+#endif
+
+#pragma pack(1)
+
+/**
+ * \brief Struct for transaction commit and abort records.
+ *
+ * Struct for local and DTX commit and abort records. Only the magic distinguishes between them.
+ * Since this record must be used in the context of a valid XID, the xidsize field must not be
+ * zero. Immediately following this record is the XID itself which is xidsize bytes long,
+ * followed by a rec_tail.
+ *
+ * Note that this record had its own rid distinct from the rids of the record(s) making up the
+ * transaction it is committing or aborting.
+ *
+ * Record header info in binary format (24 bytes):
+ * <pre>
+ * 0 7
+ * +---+---+---+---+---+---+---+---+ -+
+ * | magic | v | e | flags | |
+ * +---+---+---+---+---+---+---+---+ | struct rec_hdr_t
+ * | rid | |
+ * +---+---+---+---+---+---+---+---+ -+
+ * | xidsize |
+ * +---+---+---+---+---+---+---+---+
+ * </pre>
+ */
+typedef struct txn_hdr_t {
+ rec_hdr_t _rhdr; /**< Common record header struct */
+ uint64_t _xidsize; /**< XID size */
+} txn_hdr_t;
+
+void txn_hdr_init(txn_hdr_t* dest, const uint32_t magic, const uint16_t version, const uint16_t uflag,
+ const uint64_t rid, const uint64_t xidsize);
+void txn_hdr_copy(txn_hdr_t* dest, const txn_hdr_t* src);
+
+#pragma pack()
+
+#ifdef __cplusplus
+}
+#endif
+
+#endif /* ifndef QPID_LINEARSTORE_JRNL_UTILS_TXN_HDR_H */
Added: qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/wmgr.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/wmgr.cpp?rev=1534736&view=auto
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/wmgr.cpp (added)
+++ qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/wmgr.cpp Tue Oct 22 19:09:56 2013
@@ -0,0 +1,1031 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "qpid/linearstore/jrnl/wmgr.h"
+
+#include <cassert>
+#include <cerrno>
+#include <cstdlib>
+#include <cstring>
+#include "qpid/linearstore/jrnl/utils/file_hdr.h"
+#include "qpid/linearstore/jrnl/jcfg.h"
+#include "qpid/linearstore/jrnl/jcntl.h"
+#include "qpid/linearstore/jrnl/jerrno.h"
+#include "qpid/linearstore/jrnl/JournalFile.h"
+#include <sstream>
+#include <stdint.h>
+
+#include <iostream> // DEBUG
+
+namespace qpid
+{
+namespace qls_jrnl
+{
+
+wmgr::wmgr(jcntl* jc,
+ enq_map& emap,
+ txn_map& tmap,
+ LinearFileController& lfc):
+ pmgr(jc, emap, tmap),
+ _lfc(lfc),
+ _max_dtokpp(0),
+ _max_io_wait_us(0),
+ _cached_offset_dblks(0),
+ _enq_busy(false),
+ _deq_busy(false),
+ _abort_busy(false),
+ _commit_busy(false),
+ _txn_pending_set()
+{}
+
+wmgr::wmgr(jcntl* jc,
+ enq_map& emap,
+ txn_map& tmap,
+ LinearFileController& lfc,
+ const uint32_t max_dtokpp,
+ const uint32_t max_iowait_us):
+ pmgr(jc, emap, tmap),
+ _lfc(lfc),
+ _max_dtokpp(max_dtokpp),
+ _max_io_wait_us(max_iowait_us),
+ _cached_offset_dblks(0),
+ _enq_busy(false),
+ _deq_busy(false),
+ _abort_busy(false),
+ _commit_busy(false),
+ _txn_pending_set()
+{}
+
+wmgr::~wmgr()
+{
+ wmgr::clean();
+}
+
+void
+wmgr::initialize(aio_callback* const cbp,
+ const uint32_t wcache_pgsize_sblks,
+ const uint16_t wcache_num_pages,
+ const uint32_t max_dtokpp,
+ const uint32_t max_iowait_us,
+ std::size_t eo)
+{
+ _enq_busy = false;
+ _deq_busy = false;
+ _abort_busy = false;
+ _commit_busy = false;
+ _max_dtokpp = max_dtokpp;
+ _max_io_wait_us = max_iowait_us;
+
+ initialize(cbp, wcache_pgsize_sblks, wcache_num_pages);
+
+ if (eo)
+ {
+ const uint32_t wr_pg_size_dblks = _cache_pgsize_sblks * QLS_SBLK_SIZE_DBLKS;
+ uint32_t data_dblks = (eo / QLS_DBLK_SIZE_BYTES) - 4; // 4 dblks for file hdr
+ _pg_cntr = data_dblks / wr_pg_size_dblks;
+ _pg_offset_dblks = data_dblks - (_pg_cntr * wr_pg_size_dblks);
+ }
+}
+
+iores
+wmgr::enqueue(const void* const data_buff,
+ const std::size_t tot_data_len,
+ const std::size_t this_data_len,
+ data_tok* dtokp,
+ const void* const xid_ptr,
+ const std::size_t xid_len,
+ const bool transient,
+ const bool external)
+{
+ if (xid_len)
+ assert(xid_ptr != 0);
+
+ if (_deq_busy || _abort_busy || _commit_busy) {
+ std::ostringstream oss;
+ oss << "RHM_IORES_BUSY: enqueue while part way through another op:";
+ oss << " _deq_busy=" << (_deq_busy?"T":"F");
+ oss << " _abort_busy=" << (_abort_busy?"T":"F");
+ oss << " _commit_busy=" << (_commit_busy?"T":"F");
+ throw jexception(oss.str()); // TODO: complete exception
+ }
+
+ if (this_data_len != tot_data_len && !external) {
+ throw jexception("RHM_IORES_NOTIMPL: partial enqueues not implemented"); // TODO: complete exception;
+ }
+
+ iores res = pre_write_check(WMGR_ENQUEUE, dtokp, xid_len, tot_data_len, external);
+ if (res != RHM_IORES_SUCCESS)
+ return res;
+
+ bool cont = false;
+ if (_enq_busy) // If enqueue() exited last time with RHM_IORES_FULL or RHM_IORES_PAGE_AIOWAIT
+ {
+ if (dtokp->wstate() == data_tok::ENQ_PART)
+ cont = true;
+ else
+ {
+ std::ostringstream oss;
+ oss << "This data_tok: id=" << dtokp->id() << " state=" << dtokp->wstate_str();
+ throw jexception(jerrno::JERR_WMGR_ENQDISCONT, oss.str(), "wmgr", "enqueue");
+ }
+ }
+
+ uint64_t rid = (dtokp->external_rid() | cont) ? dtokp->rid() : _lfc.getNextRecordId();
+ _enq_rec.reset(rid, data_buff, tot_data_len, xid_ptr, xid_len, transient, external);
+ if (!cont)
+ {
+ dtokp->set_rid(rid);
+ dtokp->set_dequeue_rid(0);
+ if (xid_len)
+ dtokp->set_xid(xid_ptr, xid_len);
+ else
+ dtokp->clear_xid();
+ _enq_busy = true;
+ }
+//std::cout << "---+++ wmgr::enqueue() ENQ rid=0x" << std::hex << rid << " " << std::dec << std::flush; // DEBUG
+ bool done = false;
+ while (!done)
+ {
+//std::cout << "*" << std::flush; // DEBUG
+ assert(_pg_offset_dblks < _cache_pgsize_sblks * QLS_SBLK_SIZE_DBLKS);
+ void* wptr = (void*)((char*)_page_ptr_arr[_pg_index] + _pg_offset_dblks * QLS_DBLK_SIZE_BYTES);
+ uint32_t data_offs_dblks = dtokp->dblocks_written();
+ uint32_t ret = _enq_rec.encode(wptr, data_offs_dblks,
+ (_cache_pgsize_sblks * QLS_SBLK_SIZE_DBLKS) - _pg_offset_dblks);
+
+ // Remember fid which contains the record header in case record is split over several files
+ if (data_offs_dblks == 0) {
+ dtokp->set_fid(_lfc.getCurrentFileSeqNum());
+ }
+ _pg_offset_dblks += ret;
+ _cached_offset_dblks += ret;
+ dtokp->incr_dblocks_written(ret);
+ dtokp->incr_pg_cnt();
+ _page_cb_arr[_pg_index]._pdtokl->push_back(dtokp);
+
+ // Is the encoding of this record complete?
+ if (dtokp->dblocks_written() >= _enq_rec.rec_size_dblks())
+ {
+//std::cout << "!" << std::flush; // DEBUG
+ // TODO: Incorrect - must set state to ENQ_CACHED; ENQ_SUBM is set when AIO returns.
+ dtokp->set_wstate(data_tok::ENQ_SUBM);
+ dtokp->set_dsize(tot_data_len);
+ // Only add this data token to page token list when submit is complete, this way
+ // long multi-page messages have their token on the page containing the END of the
+ // message. AIO callbacks will then only process this token when entire message is
+ // enqueued.
+ _lfc.incrEnqueuedRecordCount(dtokp->fid());
+//std::cout << "[0x" << std::hex << _lfc.getEnqueuedRecordCount() << std::dec << std::flush; // DEBUG
+
+ if (xid_len) // If part of transaction, add to transaction map
+ {
+ std::string xid((const char*)xid_ptr, xid_len);
+ _tmap.insert_txn_data(xid, txn_data(rid, 0, dtokp->fid(), true));
+ }
+ else
+ {
+ if (_emap.insert_pfid(rid, dtokp->fid(), 0) < enq_map::EMAP_OK) // fail
+ {
+ // The only error code emap::insert_pfid() returns is enq_map::EMAP_DUP_RID.
+ std::ostringstream oss;
+ oss << std::hex << "rid=0x" << rid << " _pfid=0x" << dtokp->fid();
+ throw jexception(jerrno::JERR_MAP_DUPLICATE, oss.str(), "wmgr", "enqueue");
+ }
+ }
+
+ done = true;
+ } else {
+//std::cout << "$" << std::endl << std::flush; // DEBUG
+ dtokp->set_wstate(data_tok::ENQ_PART);
+ }
+
+ file_header_check(rid, cont, _enq_rec.rec_size_dblks() - data_offs_dblks);
+ flush_check(res, cont, done, rid);
+ }
+ if (dtokp->wstate() >= data_tok::ENQ_SUBM)
+ _enq_busy = false;
+//std::cout << " res=" << iores_str(res) << " _enq_busy=" << (_enq_busy?"T":"F") << std::endl << std::flush; // DEBUG
+ return res;
+}
+
+iores
+wmgr::dequeue(data_tok* dtokp,
+ const void* const xid_ptr,
+ const std::size_t xid_len,
+ const bool txn_coml_commit)
+{
+ if (xid_len)
+ assert(xid_ptr != 0);
+
+ if (_enq_busy || _abort_busy || _commit_busy) {
+ std::ostringstream oss;
+ oss << "RHM_IORES_BUSY: dequeue while part way through another op:";
+ oss << " _enq_busy=" << (_enq_busy?"T":"F");
+ oss << " _abort_busy=" << (_abort_busy?"T":"F");
+ oss << " _commit_busy=" << (_commit_busy?"T":"F");
+ throw jexception(oss.str()); // TODO: complete exception
+ }
+
+ iores res = pre_write_check(WMGR_DEQUEUE, dtokp);
+ if (res != RHM_IORES_SUCCESS)
+ return res;
+
+ bool cont = false;
+ if (_deq_busy) // If dequeue() exited last time with RHM_IORES_FULL or RHM_IORES_PAGE_AIOWAIT
+ {
+ if (dtokp->wstate() == data_tok::DEQ_PART)
+ cont = true;
+ else
+ {
+ std::ostringstream oss;
+ oss << "This data_tok: id=" << dtokp->id() << " state=" << dtokp->wstate_str();
+ throw jexception(jerrno::JERR_WMGR_DEQDISCONT, oss.str(), "wmgr", "dequeue");
+ }
+ }
+
+ const bool ext_rid = dtokp->external_rid();
+ uint64_t rid = (ext_rid | cont) ? dtokp->rid() : _lfc.getNextRecordId();
+ uint64_t dequeue_rid = (ext_rid | cont) ? dtokp->dequeue_rid() : dtokp->rid();
+ _deq_rec.reset(rid, dequeue_rid, xid_ptr, xid_len/*, _wrfc.owi()*/, txn_coml_commit);
+ if (!cont)
+ {
+ if (!ext_rid)
+ {
+ dtokp->set_rid(rid);
+ dtokp->set_dequeue_rid(dequeue_rid);
+ }
+ if (xid_len)
+ dtokp->set_xid(xid_ptr, xid_len);
+ else
+ dtokp->clear_xid();
+ dequeue_check(dtokp->xid(), dequeue_rid);
+ dtokp->set_dblocks_written(0); // Reset dblks_written from previous op
+ _deq_busy = true;
+ }
+//std::cout << "---+++ wmgr::dequeue() DEQ rid=0x" << std::hex << rid << " drid=0x" << dequeue_rid << " " << std::dec << std::flush; // DEBUG
+ bool done = false;
+ while (!done)
+ {
+//std::cout << "*" << std::flush; // DEBUG
+ assert(_pg_offset_dblks < _cache_pgsize_sblks * QLS_SBLK_SIZE_DBLKS);
+ void* wptr = (void*)((char*)_page_ptr_arr[_pg_index] + _pg_offset_dblks * QLS_DBLK_SIZE_BYTES);
+ uint32_t data_offs_dblks = dtokp->dblocks_written();
+ uint32_t ret = _deq_rec.encode(wptr, data_offs_dblks,
+ (_cache_pgsize_sblks * QLS_SBLK_SIZE_DBLKS) - _pg_offset_dblks);
+
+ // Remember fid which contains the record header in case record is split over several files
+ if (data_offs_dblks == 0) {
+ dtokp->set_fid(_lfc.getCurrentFileSeqNum());
+ }
+ _pg_offset_dblks += ret;
+ _cached_offset_dblks += ret;
+ dtokp->incr_dblocks_written(ret);
+ dtokp->incr_pg_cnt();
+ _page_cb_arr[_pg_index]._pdtokl->push_back(dtokp);
+
+ // Is the encoding of this record complete?
+ if (dtokp->dblocks_written() >= _deq_rec.rec_size_dblks())
+ {
+//std::cout << "!" << std::flush; // DEBUG
+ // TODO: Incorrect - must set state to ENQ_CACHED; ENQ_SUBM is set when AIO returns.
+ dtokp->set_wstate(data_tok::DEQ_SUBM);
+
+ if (xid_len) // If part of transaction, add to transaction map
+ {
+ // If the enqueue is part of a pending txn, it will not yet be in emap
+ _emap.lock(dequeue_rid); // ignore rid not found error
+ std::string xid((const char*)xid_ptr, xid_len);
+ _tmap.insert_txn_data(xid, txn_data(rid, dequeue_rid, dtokp->fid(), false));
+ }
+ else
+ {
+ uint64_t fid;
+ short eres = _emap.get_remove_pfid(dtokp->dequeue_rid(), fid);
+ if (eres < enq_map::EMAP_OK) // fail
+ {
+ if (eres == enq_map::EMAP_RID_NOT_FOUND)
+ {
+ std::ostringstream oss;
+ oss << std::hex << "rid=0x" << rid;
+ throw jexception(jerrno::JERR_MAP_NOTFOUND, oss.str(), "wmgr", "dequeue");
+ }
+ if (eres == enq_map::EMAP_LOCKED)
+ {
+ std::ostringstream oss;
+ oss << std::hex << "rid=0x" << rid;
+ throw jexception(jerrno::JERR_MAP_LOCKED, oss.str(), "wmgr", "dequeue");
+ }
+ }
+//std::cout << "[0x" << std::hex << _lfc.getEnqueuedRecordCount(fid) << std::dec << std::flush; // DEBUG
+//try {
+ _lfc.decrEnqueuedRecordCount(fid);
+//} catch (std::exception& e) { std::cout << "***OOPS*** " << e.what() << " cfid=" << _lfc.getCurrentFileSeqNum() << " fid=" << fid << std::flush; throw; }
+ }
+
+ done = true;
+ } else {
+//std::cout << "$" << std::flush; // DEBUG
+ dtokp->set_wstate(data_tok::DEQ_PART);
+ }
+
+ file_header_check(rid, cont, _deq_rec.rec_size_dblks() - data_offs_dblks);
+ flush_check(res, cont, done, rid);
+ }
+ if (dtokp->wstate() >= data_tok::DEQ_SUBM)
+ _deq_busy = false;
+//std::cout << " res=" << iores_str(res) << " _deq_busy=" << (_deq_busy?"T":"F") << std::endl << std::flush; // DEBUG
+ return res;
+}
+
+iores
+wmgr::abort(data_tok* dtokp,
+ const void* const xid_ptr,
+ const std::size_t xid_len)
+{
+ // commit and abort MUST have a valid xid
+ assert(xid_ptr != 0 && xid_len > 0);
+
+ if (_enq_busy || _deq_busy || _commit_busy) {
+ std::ostringstream oss;
+ oss << "RHM_IORES_BUSY: abort while part way through another op:";
+ oss << " _enq_busy=" << (_enq_busy?"T":"F");
+ oss << " _deq_busy=" << (_deq_busy?"T":"F");
+ oss << " _commit_busy=" << (_commit_busy?"T":"F");
+ throw jexception(oss.str()); // TODO: complete exception
+ }
+
+ iores res = pre_write_check(WMGR_ABORT, dtokp);
+ if (res != RHM_IORES_SUCCESS)
+ return res;
+
+ bool cont = false;
+ if (_abort_busy) // If abort() exited last time with RHM_IORES_FULL or RHM_IORES_PAGE_AIOWAIT
+ {
+ if (dtokp->wstate() == data_tok::ABORT_PART)
+ cont = true;
+ else
+ {
+ std::ostringstream oss;
+ oss << "This data_tok: id=" << dtokp->id() << " state=" << dtokp->wstate_str();
+ throw jexception(jerrno::JERR_WMGR_DEQDISCONT, oss.str(), "wmgr", "abort");
+ }
+ }
+
+ uint64_t rid = (dtokp->external_rid() | cont) ? dtokp->rid() : _lfc.getNextRecordId();
+ _txn_rec.reset(QLS_TXA_MAGIC, rid, xid_ptr, xid_len/*, _wrfc.owi()*/);
+ if (!cont)
+ {
+ dtokp->set_rid(rid);
+ dtokp->set_dequeue_rid(0);
+ dtokp->set_xid(xid_ptr, xid_len);
+ dtokp->set_dblocks_written(0); // Reset dblks_written from previous op
+ _abort_busy = true;
+ }
+ bool done = false;
+ while (!done)
+ {
+ assert(_pg_offset_dblks < _cache_pgsize_sblks * QLS_SBLK_SIZE_DBLKS);
+ void* wptr = (void*)((char*)_page_ptr_arr[_pg_index] + _pg_offset_dblks * QLS_DBLK_SIZE_BYTES);
+ uint32_t data_offs_dblks = dtokp->dblocks_written();
+ uint32_t ret = _txn_rec.encode(wptr, data_offs_dblks,
+ (_cache_pgsize_sblks * QLS_SBLK_SIZE_DBLKS) - _pg_offset_dblks);
+
+ // Remember fid which contains the record header in case record is split over several files
+ if (data_offs_dblks == 0)
+ dtokp->set_fid(_lfc.getCurrentFileSeqNum());
+ _pg_offset_dblks += ret;
+ _cached_offset_dblks += ret;
+ dtokp->incr_dblocks_written(ret);
+ dtokp->incr_pg_cnt();
+ _page_cb_arr[_pg_index]._pdtokl->push_back(dtokp);
+
+ // Is the encoding of this record complete?
+ if (dtokp->dblocks_written() >= _txn_rec.rec_size_dblks())
+ {
+ dtokp->set_wstate(data_tok::ABORT_SUBM);
+
+ // Delete this txn from tmap, unlock any locked records in emap
+ std::string xid((const char*)xid_ptr, xid_len);
+ txn_data_list tdl = _tmap.get_remove_tdata_list(xid); // tdl will be empty if xid not found
+ for (tdl_itr itr = tdl.begin(); itr != tdl.end(); itr++)
+ {
+ if (!itr->_enq_flag)
+ _emap.unlock(itr->_drid); // ignore rid not found error
+ if (itr->_enq_flag)
+ _lfc.decrEnqueuedRecordCount(itr->_pfid);
+ }
+ std::pair<std::set<std::string>::iterator, bool> res = _txn_pending_set.insert(xid);
+ if (!res.second)
+ {
+ std::ostringstream oss;
+ oss << std::hex << "_txn_pending_set: xid=\"" << xid << "\"";
+ throw jexception(jerrno::JERR_MAP_DUPLICATE, oss.str(), "wmgr", "abort");
+ }
+
+ done = true;
+ }
+ else
+ dtokp->set_wstate(data_tok::ABORT_PART);
+
+ file_header_check(rid, cont, _txn_rec.rec_size_dblks() - data_offs_dblks);
+ flush_check(res, cont, done, rid);
+ }
+ if (dtokp->wstate() >= data_tok::ABORT_SUBM)
+ _abort_busy = false;
+ return res;
+}
+
+iores
+wmgr::commit(data_tok* dtokp,
+ const void* const xid_ptr,
+ const std::size_t xid_len)
+{
+ // commit and abort MUST have a valid xid
+ assert(xid_ptr != 0 && xid_len > 0);
+
+ if (_enq_busy || _deq_busy || _abort_busy) {
+ std::ostringstream oss;
+ oss << "RHM_IORES_BUSY: commit while part way through another op:";
+ oss << " _enq_busy=" << (_enq_busy?"T":"F");
+ oss << " _deq_busy=" << (_deq_busy?"T":"F");
+ oss << " _abort_busy=" << (_abort_busy?"T":"F");
+ throw jexception(oss.str()); // TODO: complete exception
+ }
+
+ iores res = pre_write_check(WMGR_COMMIT, dtokp);
+ if (res != RHM_IORES_SUCCESS)
+ return res;
+
+ bool cont = false;
+ if (_commit_busy) // If commit() exited last time with RHM_IORES_FULL or RHM_IORES_PAGE_AIOWAIT
+ {
+ if (dtokp->wstate() == data_tok::COMMIT_PART)
+ cont = true;
+ else
+ {
+ std::ostringstream oss;
+ oss << "This data_tok: id=" << dtokp->id() << " state=" << dtokp->wstate_str();
+ throw jexception(jerrno::JERR_WMGR_DEQDISCONT, oss.str(), "wmgr", "commit");
+ }
+ }
+
+ uint64_t rid = (dtokp->external_rid() | cont) ? dtokp->rid() : _lfc.getNextRecordId();
+ _txn_rec.reset(QLS_TXC_MAGIC, rid, xid_ptr, xid_len/*, _wrfc.owi()*/);
+ if (!cont)
+ {
+ dtokp->set_rid(rid);
+ dtokp->set_dequeue_rid(0);
+ dtokp->set_xid(xid_ptr, xid_len);
+ dtokp->set_dblocks_written(0); // Reset dblks_written from previous op
+ _commit_busy = true;
+ }
+ bool done = false;
+ while (!done)
+ {
+ assert(_pg_offset_dblks < _cache_pgsize_sblks * QLS_SBLK_SIZE_DBLKS);
+ void* wptr = (void*)((char*)_page_ptr_arr[_pg_index] + _pg_offset_dblks * QLS_DBLK_SIZE_BYTES);
+ uint32_t data_offs_dblks = dtokp->dblocks_written();
+ uint32_t ret = _txn_rec.encode(wptr, data_offs_dblks,
+ (_cache_pgsize_sblks * QLS_SBLK_SIZE_DBLKS) - _pg_offset_dblks);
+
+ // Remember fid which contains the record header in case record is split over several files
+ if (data_offs_dblks == 0)
+ dtokp->set_fid(_lfc.getCurrentFileSeqNum());
+ _pg_offset_dblks += ret;
+ _cached_offset_dblks += ret;
+ dtokp->incr_dblocks_written(ret);
+ dtokp->incr_pg_cnt();
+ _page_cb_arr[_pg_index]._pdtokl->push_back(dtokp);
+
+ // Is the encoding of this record complete?
+ if (dtokp->dblocks_written() >= _txn_rec.rec_size_dblks())
+ {
+ dtokp->set_wstate(data_tok::COMMIT_SUBM);
+
+ // Delete this txn from tmap, process records into emap
+ std::string xid((const char*)xid_ptr, xid_len);
+ txn_data_list tdl = _tmap.get_remove_tdata_list(xid); // tdl will be empty if xid not found
+ for (tdl_itr itr = tdl.begin(); itr != tdl.end(); itr++)
+ {
+ if (itr->_enq_flag) // txn enqueue
+ {
+ if (_emap.insert_pfid(itr->_rid, itr->_pfid, 0) < enq_map::EMAP_OK) // fail
+ {
+ // The only error code emap::insert_pfid() returns is enq_map::EMAP_DUP_RID.
+ std::ostringstream oss;
+ oss << std::hex << "rid=0x" << itr->_rid << " _pfid=0x" << itr->_pfid;
+ throw jexception(jerrno::JERR_MAP_DUPLICATE, oss.str(), "wmgr", "commit");
+ }
+ }
+ else // txn dequeue
+ {
+ uint64_t fid;
+ short eres = _emap.get_remove_pfid(itr->_drid, fid, true);
+ if (eres < enq_map::EMAP_OK) // fail
+ {
+ if (eres == enq_map::EMAP_RID_NOT_FOUND)
+ {
+ std::ostringstream oss;
+ oss << std::hex << "rid=0x" << rid;
+ throw jexception(jerrno::JERR_MAP_NOTFOUND, oss.str(), "wmgr", "dequeue");
+ }
+ if (eres == enq_map::EMAP_LOCKED)
+ {
+ std::ostringstream oss;
+ oss << std::hex << "rid=0x" << rid;
+ throw jexception(jerrno::JERR_MAP_LOCKED, oss.str(), "wmgr", "dequeue");
+ }
+ }
+ _lfc.decrEnqueuedRecordCount(fid);
+ }
+ }
+ std::pair<std::set<std::string>::iterator, bool> res = _txn_pending_set.insert(xid);
+ if (!res.second)
+ {
+ std::ostringstream oss;
+ oss << std::hex << "_txn_pending_set: xid=\"" << xid << "\"";
+ throw jexception(jerrno::JERR_MAP_DUPLICATE, oss.str(), "wmgr", "commit");
+ }
+
+ done = true;
+ }
+ else
+ dtokp->set_wstate(data_tok::COMMIT_PART);
+
+ file_header_check(rid, cont, _txn_rec.rec_size_dblks() - data_offs_dblks);
+ flush_check(res, cont, done, rid);
+ }
+ if (dtokp->wstate() >= data_tok::COMMIT_SUBM)
+ _commit_busy = false;
+ return res;
+}
+
+void
+wmgr::file_header_check(const uint64_t rid,
+ const bool cont,
+ const uint32_t rec_dblks_rem)
+{
+ if (_lfc.isEmpty()) // File never written (i.e. no header or data)
+ {
+ std::size_t fro = 0;
+ if (cont) {
+ bool file_fit = rec_dblks_rem <= _lfc.dataSize_sblks() * QLS_SBLK_SIZE_DBLKS; // Will fit within this journal file
+ bool file_full = rec_dblks_rem == _lfc.dataSize_sblks() * QLS_SBLK_SIZE_DBLKS; // Will exactly fill this journal file
+ if (file_fit && !file_full) {
+ fro = (rec_dblks_rem + (QLS_JRNL_FHDR_RES_SIZE_SBLKS * QLS_SBLK_SIZE_DBLKS)) * QLS_DBLK_SIZE_BYTES;
+ }
+ } else {
+ fro = QLS_JRNL_FHDR_RES_SIZE_SBLKS * QLS_SBLK_SIZE_BYTES;
+ }
+ _lfc.asyncFileHeaderWrite(_ioctx, 0, rid, fro);
+ _aio_evt_rem++;
+ }
+}
+
+void
+wmgr::flush_check(iores& res,
+ bool& cont,
+ bool& done, const uint64_t /*rid*/) // DEBUG
+{
+ // Is page is full, flush
+ if (_pg_offset_dblks >= _cache_pgsize_sblks * QLS_SBLK_SIZE_DBLKS)
+ {
+ res = write_flush();
+ assert(res == RHM_IORES_SUCCESS);
+
+ if (_page_cb_arr[_pg_index]._state == AIO_PENDING && !done)
+ {
+ res = RHM_IORES_PAGE_AIOWAIT;
+ done = true;
+ }
+
+ // If file is full, rotate to next file
+ uint32_t fileSize_pgs = _lfc.fileSize_sblks() / _cache_pgsize_sblks;
+ if (_pg_cntr >= fileSize_pgs)
+ {
+ get_next_file();
+ if (!done) {
+ cont = true;
+ }
+//std::cout << "***** wmgr::flush_check(): GET NEXT FILE: rid=0x" << std::hex << rid << std::dec << " res=" << iores_str(res) << " cont=" << (cont?"T":"F") << " done=" << (done?"T":"F") << std::endl; // DEBUG
+ }
+ }
+}
+
+iores
+wmgr::flush()
+{
+ iores res = write_flush();
+ uint32_t fileSize_pgs = _lfc.fileSize_sblks() / _cache_pgsize_sblks;
+ if (res == RHM_IORES_SUCCESS && _pg_cntr >= fileSize_pgs) {
+ get_next_file();
+ }
+ return res;
+}
+
+iores
+wmgr::write_flush()
+{
+ iores res = RHM_IORES_SUCCESS;
+ // Don't bother flushing an empty page or one that is still in state AIO_PENDING
+ if (_cached_offset_dblks)
+ {
+ if (_page_cb_arr[_pg_index]._state == AIO_PENDING) {
+//std::cout << "#"; // DEBUG
+ res = RHM_IORES_PAGE_AIOWAIT;
+ } else {
+ if (_page_cb_arr[_pg_index]._state != IN_USE)
+ {
+ std::ostringstream oss;
+ oss << "pg_index=" << _pg_index << " state=" << _page_cb_arr[_pg_index].state_str();
+ throw jexception(jerrno::JERR_WMGR_BADPGSTATE, oss.str(), "wmgr", "write_flush");
+ }
+
+ // Send current page using AIO
+
+ // In manual flushes, dblks may not coincide with sblks, add filler records ("RHMx") if necessary.
+ dblk_roundup();
+
+ std::size_t pg_offs = (_pg_offset_dblks - _cached_offset_dblks) * QLS_DBLK_SIZE_BYTES;
+ aio_cb* aiocbp = &_aio_cb_arr[_pg_index];
+ _lfc.asyncPageWrite(_ioctx, aiocbp, (char*)_page_ptr_arr[_pg_index] + pg_offs, _cached_offset_dblks);
+ _page_cb_arr[_pg_index]._state = AIO_PENDING;
+ _aio_evt_rem++;
+//std::cout << "." << _aio_evt_rem << std::flush; // DEBUG
+ _cached_offset_dblks = 0;
+ _jc->instr_incr_outstanding_aio_cnt();
+
+ rotate_page(); // increments _pg_index, resets _pg_offset_dblks if req'd
+ if (_page_cb_arr[_pg_index]._state == UNUSED)
+ _page_cb_arr[_pg_index]._state = IN_USE;
+ }
+ }
+ get_events(0, false);
+ if (_page_cb_arr[_pg_index]._state == UNUSED)
+ _page_cb_arr[_pg_index]._state = IN_USE;
+ return res;
+}
+
+void
+wmgr::get_next_file()
+{
+ _pg_cntr = 0;
+//std::cout << "&&&&& wmgr::get_next_file(): " << status_str() << std::endl; // DEBUG
+ _lfc.pullEmptyFileFromEfp();
+}
+
+int32_t
+wmgr::get_events(timespec* const timeout,
+ bool flush)
+{
+ if (_aio_evt_rem == 0) // no events to get
+ return 0;
+
+ int ret = 0;
+ if ((ret = aio::getevents(_ioctx, flush ? _aio_evt_rem : 1, _aio_evt_rem, _aio_event_arr, timeout)) < 0)
+ {
+ if (ret == -EINTR) // Interrupted by signal
+ return 0;
+ std::ostringstream oss;
+ oss << "io_getevents() failed: " << std::strerror(-ret) << " (" << ret << ")";
+ throw jexception(jerrno::JERR__AIO, oss.str(), "wmgr", "get_events");
+ }
+
+ if (ret == 0 && timeout)
+ return jerrno::AIO_TIMEOUT;
+
+ int32_t tot_data_toks = 0;
+ for (int i=0; i<ret; i++) // Index of returned AIOs
+ {
+ if (_aio_evt_rem == 0)
+ {
+ std::ostringstream oss;
+ oss << "_aio_evt_rem; evt " << (i + 1) << " of " << ret;
+ throw jexception(jerrno::JERR__UNDERFLOW, oss.str(), "wmgr", "get_events");
+ }
+ _aio_evt_rem--;
+//std::cout << "'" << _aio_evt_rem; // DEBUG
+ aio_cb* aiocbp = _aio_event_arr[i].obj; // This I/O control block (iocb)
+ page_cb* pcbp = (page_cb*)(aiocbp->data); // This page control block (pcb)
+ long aioret = (long)_aio_event_arr[i].res;
+ if (aioret < 0) {
+ std::ostringstream oss;
+ oss << "AIO write operation failed: " << std::strerror(-aioret) << " (" << aioret << ") [";
+ if (pcbp) {
+ oss << "pg=" << pcbp->_index;
+ } else {
+ file_hdr_t* fhp = (file_hdr_t*)aiocbp->u.c.buf;
+ oss << "fnum=" << fhp->_file_number;
+ oss << " qname=" << std::string((char*)fhp + sizeof(file_hdr_t), fhp->_queue_name_len);
+ }
+ oss << " size=" << aiocbp->u.c.nbytes;
+ oss << " offset=" << aiocbp->u.c.offset << " fh=" << aiocbp->aio_fildes << "]";
+ throw jexception(jerrno::JERR__AIO, oss.str(), "wmgr", "get_events");
+ }
+ if (pcbp) // Page writes have pcb
+ {
+//std::cout << "p"; // DEBUG
+ uint32_t s = pcbp->_pdtokl->size();
+ std::vector<data_tok*> dtokl;
+ dtokl.reserve(s);
+ for (uint32_t k=0; k<s; k++)
+ {
+ data_tok* dtokp = pcbp->_pdtokl->at(k);
+ if (dtokp->decr_pg_cnt() == 0)
+ {
+ std::set<std::string>::iterator it;
+ switch (dtokp->wstate())
+ {
+ case data_tok::ENQ_SUBM:
+ dtokl.push_back(dtokp);
+ tot_data_toks++;
+ dtokp->set_wstate(data_tok::ENQ);
+ if (dtokp->has_xid())
+ // Ignoring return value here. A non-zero return can signify that the transaction
+ // has committed or aborted, and which was completed prior to the aio returning.
+ _tmap.set_aio_compl(dtokp->xid(), dtokp->rid());
+ break;
+ case data_tok::DEQ_SUBM:
+ dtokl.push_back(dtokp);
+ tot_data_toks++;
+ dtokp->set_wstate(data_tok::DEQ);
+ if (dtokp->has_xid())
+ // Ignoring return value - see note above.
+ _tmap.set_aio_compl(dtokp->xid(), dtokp->rid());
+ break;
+ case data_tok::ABORT_SUBM:
+ dtokl.push_back(dtokp);
+ tot_data_toks++;
+ dtokp->set_wstate(data_tok::ABORTED);
+ it = _txn_pending_set.find(dtokp->xid());
+ if (it == _txn_pending_set.end())
+ {
+ std::ostringstream oss;
+ oss << std::hex << "_txn_pending_set: abort xid=\"";
+ oss << dtokp->xid() << "\"";
+ throw jexception(jerrno::JERR_MAP_NOTFOUND, oss.str(), "wmgr",
+ "get_events");
+ }
+ _txn_pending_set.erase(it);
+ break;
+ case data_tok::COMMIT_SUBM:
+ dtokl.push_back(dtokp);
+ tot_data_toks++;
+ dtokp->set_wstate(data_tok::COMMITTED);
+ it = _txn_pending_set.find(dtokp->xid());
+ if (it == _txn_pending_set.end())
+ {
+ std::ostringstream oss;
+ oss << std::hex << "_txn_pending_set: commit xid=\"";
+ oss << dtokp->xid() << "\"";
+ throw jexception(jerrno::JERR_MAP_NOTFOUND, oss.str(), "wmgr",
+ "get_events");
+ }
+ _txn_pending_set.erase(it);
+ break;
+ case data_tok::ENQ_PART:
+ case data_tok::DEQ_PART:
+ case data_tok::ABORT_PART:
+ case data_tok::COMMIT_PART:
+ // ignore these
+ break;
+ default:
+ // throw for anything else
+ std::ostringstream oss;
+ oss << "dtok_id=" << dtokp->id() << " dtok_state=" << dtokp->wstate_str();
+ throw jexception(jerrno::JERR_WMGR_BADDTOKSTATE, oss.str(), "wmgr",
+ "get_events");
+ }
+ }
+ }
+
+ // Increment the completed write offset
+ // NOTE: We cannot use _wrfc here, as it may have rotated since submitting count.
+ // Use stored pointer to fcntl in the pcb instead.
+ pcbp->_jfp->addCompletedDblkCount(pcbp->_wdblks);
+ pcbp->_jfp->decrOutstandingAioOperationCount();
+ _jc->instr_decr_outstanding_aio_cnt();
+
+ // Clean up this pcb's data_tok list
+ pcbp->_pdtokl->clear();
+ pcbp->_state = UNUSED;
+//std::cout << "c" << pcbp->_index << pcbp->state_str(); // DEBUG
+
+ // Perform AIO return callback
+ if (_cbp && tot_data_toks)
+ _cbp->wr_aio_cb(dtokl);
+ }
+ else // File header writes have no pcb
+ {
+//std::cout << "f"; // DEBUG
+ file_hdr_t* fhp = (file_hdr_t*)aiocbp->u.c.buf;
+ _lfc.addWriteCompletedDblkCount(fhp->_file_number, QLS_JRNL_FHDR_RES_SIZE_SBLKS * QLS_SBLK_SIZE_DBLKS);
+ _lfc.decrOutstandingAioOperationCount(fhp->_file_number);
+ }
+ }
+
+ return tot_data_toks;
+}
+
+bool
+wmgr::is_txn_synced(const std::string& xid)
+{
+ // Ignore xid not found error here
+ if (_tmap.is_txn_synced(xid) == txn_map::TMAP_NOT_SYNCED)
+ return false;
+ // Check for outstanding commit/aborts
+ std::set<std::string>::iterator it = _txn_pending_set.find(xid);
+ return it == _txn_pending_set.end();
+}
+
+void
+wmgr::initialize(aio_callback* const cbp,
+ const uint32_t wcache_pgsize_sblks,
+ const uint16_t wcache_num_pages)
+{
+
+ pmgr::initialize(cbp, wcache_pgsize_sblks, wcache_num_pages);
+ wmgr::clean();
+ _page_cb_arr[0]._state = IN_USE;
+ _ddtokl.clear();
+ _cached_offset_dblks = 0;
+ _enq_busy = false;
+}
+
+iores
+wmgr::pre_write_check(const _op_type op,
+ const data_tok* const dtokp,
+ const std::size_t /*xidsize*/,
+ const std::size_t /*dsize*/,
+ const bool /*external*/) const
+{
+ // Check status of current file
+ // TODO: Replace for LFC
+/*
+ if (!_wrfc.is_wr_reset())
+ {
+ if (!_wrfc.wr_reset())
+ return RHM_IORES_FULL;
+ }
+*/
+
+ // Check status of current page is ok for writing
+ if (_page_cb_arr[_pg_index]._state != IN_USE)
+ {
+ if (_page_cb_arr[_pg_index]._state == UNUSED)
+ _page_cb_arr[_pg_index]._state = IN_USE;
+ else if (_page_cb_arr[_pg_index]._state == AIO_PENDING)
+ return RHM_IORES_PAGE_AIOWAIT;
+ else
+ {
+ std::ostringstream oss;
+ oss << "jrnl=" << _jc->id() << " op=" << _op_str[op];
+ oss << " index=" << _pg_index << " pg_state=" << _page_cb_arr[_pg_index].state_str();
+ throw jexception(jerrno::JERR_WMGR_BADPGSTATE, oss.str(), "wmgr", "pre_write_check");
+ }
+ }
+
+ // operation-specific checks
+ switch (op)
+ {
+ case WMGR_ENQUEUE:
+ {
+ if (!dtokp->is_writable())
+ {
+ std::ostringstream oss;
+ oss << "jrnl=" << _jc->id() << " op=" << _op_str[op];
+ oss << " dtok_id=" << dtokp->id() << " dtok_state=" << dtokp->wstate_str();
+ throw jexception(jerrno::JERR_WMGR_BADDTOKSTATE, oss.str(), "wmgr",
+ "pre_write_check");
+ }
+ }
+ break;
+ case WMGR_DEQUEUE:
+ if (!dtokp->is_dequeueable())
+ {
+ std::ostringstream oss;
+ oss << "jrnl=" << _jc->id() << " op=" << _op_str[op];
+ oss << " dtok_id=" << dtokp->id() << " dtok_state=" << dtokp->wstate_str();
+ throw jexception(jerrno::JERR_WMGR_BADDTOKSTATE, oss.str(), "wmgr",
+ "pre_write_check");
+ }
+ break;
+ case WMGR_ABORT:
+ break;
+ case WMGR_COMMIT:
+ break;
+ }
+
+ return RHM_IORES_SUCCESS;
+}
+
+void
+wmgr::dequeue_check(const std::string& xid,
+ const uint64_t drid)
+{
+ // First check emap
+ bool found = false;
+ uint64_t fid;
+ short eres = _emap.get_pfid(drid, fid);
+ if (eres < enq_map::EMAP_OK) { // fail
+ if (eres == enq_map::EMAP_RID_NOT_FOUND) {
+ if (xid.size()) {
+ found = _tmap.data_exists(xid, drid);
+ }
+ } else if (eres == enq_map::EMAP_LOCKED) {
+ std::ostringstream oss;
+ oss << std::hex << "drid=0x" << drid;
+ throw jexception(jerrno::JERR_MAP_LOCKED, oss.str(), "wmgr", "dequeue_check");
+ }
+ } else {
+ found = true;
+ }
+ if (!found) {
+ std::ostringstream oss;
+ oss << "jrnl=" << _jc->id() << " drid=0x" << std::hex << drid;
+ throw jexception(jerrno::JERR_WMGR_DEQRIDNOTENQ, oss.str(), "wmgr", "dequeue_check");
+ }
+}
+
+void
+wmgr::dblk_roundup()
+{
+ const uint32_t xmagic = QLS_EMPTY_MAGIC;
+ uint32_t wdblks = jrec::size_blks(_cached_offset_dblks, QLS_SBLK_SIZE_DBLKS) * QLS_SBLK_SIZE_DBLKS;
+ while (_cached_offset_dblks < wdblks)
+ {
+ void* wptr = (void*)((char*)_page_ptr_arr[_pg_index] + _pg_offset_dblks * QLS_DBLK_SIZE_BYTES);
+ std::memcpy(wptr, (const void*)&xmagic, sizeof(xmagic));
+#ifdef QLS_CLEAN
+ std::memset((char*)wptr + sizeof(xmagic), QLS_CLEAN_CHAR, QLS_DBLK_SIZE_BYTES - sizeof(xmagic));
+#endif
+ _pg_offset_dblks++;
+ _cached_offset_dblks++;
+ }
+}
+
+void
+wmgr::rotate_page()
+{
+//std::cout << "^^^^^ wmgr::rotate_page() " << status_str() << " pi=" << _pg_index; // DEBUG
+ if (_pg_offset_dblks >= _cache_pgsize_sblks * QLS_SBLK_SIZE_DBLKS)
+ {
+ _pg_offset_dblks = 0;
+ _pg_cntr++;
+ }
+ if (++_pg_index >= _cache_num_pages)
+ _pg_index = 0;
+//std::cout << "->" << _pg_index << std::endl; // DEBUG
+}
+
+void
+wmgr::clean() {
+ // Clean up allocated memory here
+}
+
+const std::string
+wmgr::status_str() const
+{
+ std::ostringstream oss;
+ oss << "wmgr: pi=" << _pg_index << " pc=" << _pg_cntr;
+ oss << " po=" << _pg_offset_dblks << " aer=" << _aio_evt_rem;
+ oss << " edac=" << (_enq_busy?"T":"F") << (_deq_busy?"T":"F");
+ oss << (_abort_busy?"T":"F") << (_commit_busy?"T":"F");
+ oss << " ps=[";
+ for (int i=0; i<_cache_num_pages; i++)
+ {
+ switch (_page_cb_arr[i]._state)
+ {
+ case UNUSED: oss << "-"; break;
+ case IN_USE: oss << "U"; break;
+ case AIO_PENDING: oss << "A"; break;
+ default: oss << _page_cb_arr[i]._state;
+ }
+ }
+ oss << "] ";
+ return oss.str();
+}
+
+// static
+
+const char* wmgr::_op_str[] = {"enqueue", "dequeue", "abort", "commit"};
+
+}}
Added: qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/wmgr.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/wmgr.h?rev=1534736&view=auto
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/wmgr.h (added)
+++ qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/wmgr.h Tue Oct 22 19:09:56 2013
@@ -0,0 +1,159 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#ifndef QPID_LEGACYSTORE_JRNL_WMGR_H
+#define QPID_LEGACYSTORE_JRNL_WMGR_H
+
+namespace qpid
+{
+namespace qls_jrnl
+{
+class wmgr;
+}}
+
+#include <cstring>
+#include "qpid/linearstore/jrnl/EmptyFilePoolTypes.h"
+#include "qpid/linearstore/jrnl/enums.h"
+#include "qpid/linearstore/jrnl/pmgr.h"
+#include <set>
+
+class file_hdr_t;
+
+namespace qpid
+{
+namespace qls_jrnl
+{
+ class LinearFileController;
+
+ /**
+ * \brief Class for managing a write page cache of arbitrary size and number of pages.
+ *
+ * The write page cache works on the principle of caching the write data within a page until
+ * that page is either full or flushed; this initiates a single AIO write operation to store
+ * the data on disk.
+ *
+ * The maximum disk throughput is achieved by keeping the write operations of uniform size.
+ * Waiting for a page cache to fill achieves this; and in high data volume/throughput situations
+ * achieves the optimal disk throughput. Calling flush() forces a write of the current page cache
+ * no matter how full it is, and disrupts the uniformity of the write operations. This should
+ * normally only be done if throughput drops and there is a danger of a page of unwritten data
+ * waiting around for excessive time.
+ *
+ * The usual tradeoff between data storage latency and throughput performance applies.
+ */
+ class wmgr : public pmgr
+ {
+ private:
+ LinearFileController& _lfc; ///< Linear File Controller ref
+ uint32_t _max_dtokpp; ///< Max data writes per page
+ uint32_t _max_io_wait_us; ///< Max wait in microseconds till submit
+ uint32_t _cached_offset_dblks; ///< Amount of unwritten data in page (dblocks)
+ std::deque<data_tok*> _ddtokl; ///< Deferred dequeue data_tok list
+
+ // TODO: Convert _enq_busy etc into a proper threadsafe lock
+ // TODO: Convert to enum? Are these encodes mutually exclusive?
+ bool _enq_busy; ///< Flag true if enqueue is in progress
+ bool _deq_busy; ///< Flag true if dequeue is in progress
+ bool _abort_busy; ///< Flag true if abort is in progress
+ bool _commit_busy; ///< Flag true if commit is in progress
+
+ enum _op_type { WMGR_ENQUEUE = 0, WMGR_DEQUEUE, WMGR_ABORT, WMGR_COMMIT };
+ static const char* _op_str[];
+
+ enq_rec _enq_rec; ///< Enqueue record used for encoding/decoding
+ deq_rec _deq_rec; ///< Dequeue record used for encoding/decoding
+ txn_rec _txn_rec; ///< Transaction record used for encoding/decoding
+ std::set<std::string> _txn_pending_set; ///< Set containing xids of pending commits/aborts
+
+ public:
+ wmgr(jcntl* jc,
+ enq_map& emap,
+ txn_map& tmap,
+ LinearFileController& lfc);
+ wmgr(jcntl* jc,
+ enq_map& emap,
+ txn_map& tmap,
+ LinearFileController& lfc,
+ const uint32_t max_dtokpp,
+ const uint32_t max_iowait_us);
+ virtual ~wmgr();
+
+ void initialize(aio_callback* const cbp,
+ const uint32_t wcache_pgsize_sblks,
+ const uint16_t wcache_num_pages,
+ const uint32_t max_dtokpp,
+ const uint32_t max_iowait_us,
+ std::size_t eo = 0);
+ iores enqueue(const void* const data_buff,
+ const std::size_t tot_data_len,
+ const std::size_t this_data_len,
+ data_tok* dtokp,
+ const void* const xid_ptr,
+ const std::size_t xid_len,
+ const bool transient,
+ const bool external);
+ iores dequeue(data_tok* dtokp,
+ const void* const xid_ptr,
+ const std::size_t xid_len,
+ const bool txn_coml_commit);
+ iores abort(data_tok* dtokp,
+ const void* const xid_ptr,
+ const std::size_t xid_len);
+ iores commit(data_tok* dtokp,
+ const void* const xid_ptr,
+ const std::size_t xid_len);
+ iores flush();
+ int32_t get_events(timespec* const timeout,
+ bool flush);
+ bool is_txn_synced(const std::string& xid);
+ inline bool curr_pg_blocked() const { return _page_cb_arr[_pg_index]._state != UNUSED; }
+ inline uint32_t unflushed_dblks() { return _cached_offset_dblks; }
+
+ // Debug aid
+ const std::string status_str() const;
+
+ private:
+ void initialize(aio_callback* const cbp,
+ const uint32_t wcache_pgsize_sblks,
+ const uint16_t wcache_num_pages);
+ iores pre_write_check(const _op_type op,
+ const data_tok* const dtokp,
+ const std::size_t xidsize = 0,
+ const std::size_t dsize = 0,
+ const bool external = false) const;
+ void dequeue_check(const std::string& xid,
+ const uint64_t drid);
+ void file_header_check(const uint64_t rid,
+ const bool cont,
+ const uint32_t rec_dblks_rem);
+ void flush_check(iores& res,
+ bool& cont,
+ bool& done, const uint64_t rid);
+ iores write_flush();
+ void get_next_file();
+ void dblk_roundup();
+ void rotate_page();
+ void clean();
+ };
+
+}}
+
+#endif // ifndef QPID_LEGACYSTORE_JRNL_WMGR_H
Added: qpid/trunk/qpid/cpp/src/qpid/linearstore/management-schema.xml
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/linearstore/management-schema.xml?rev=1534736&view=auto
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/linearstore/management-schema.xml (added)
+++ qpid/trunk/qpid/cpp/src/qpid/linearstore/management-schema.xml Tue Oct 22 19:09:56 2013
@@ -0,0 +1,99 @@
+<schema package="org.apache.qpid.linearstore">
+
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+-->
+
+ <class name="Store">
+ <property name="brokerRef" type="objId" access="RO" references="qpid.Broker" index="y" parentRef="y"/>
+ <property name="location" type="sstr" access="RO" desc="Logical directory on disk"/>
+ <!--property name="defaultInitialFileCount" type="uint16" access="RO" unit="file" desc="Default number of files initially allocated to each journal"/-->
+ <!--property name="defaultDataFileSize" type="uint32" access="RO" unit="RdPg" desc="Default size of each journal data file"/-->
+ <property name="tplIsInitialized" type="bool" access="RO" desc="Transaction prepared list has been initialized by a transactional prepare"/>
+ <property name="tplDirectory" type="sstr" access="RO" desc="Transaction prepared list directory"/>
+ <property name="tplWritePageSize" type="uint32" access="RO" unit="byte" desc="Page size in transaction prepared list write-page-cache"/>
+ <property name="tplWritePages" type="uint32" access="RO" unit="wpage" desc="Number of pages in transaction prepared list write-page-cache"/>
+ <!--property name="tplInitialFileCount" type="uint16" access="RO" unit="file" desc="Number of files initially allocated to transaction prepared list journal"/-->
+ <!--property name="tplDataFileSize" type="uint32" access="RO" unit="byte" desc="Size of each journal data file in transaction prepared list journal"/-->
+ <!--property name="tplCurrentFileCount" type="uint32" access="RO" unit="file" desc="Number of files currently allocated to transaction prepared list journal"/-->
+
+ <statistic name="tplTransactionDepth" type="hilo32" unit="txn" desc="Number of currently enqueued prepared transactions"/>
+ <statistic name="tplTxnPrepares" type="count64" unit="record" desc="Total transaction prepares on transaction prepared list"/>
+ <statistic name="tplTxnCommits" type="count64" unit="record" desc="Total transaction commits on transaction prepared list"/>
+ <statistic name="tplTxnAborts" type="count64" unit="record" desc="Total transaction aborts on transaction prepared list"/>
+ <statistic name="tplOutstandingAIOs" type="hilo32" unit="aio_op" desc="Number of currently outstanding AIO requests in Async IO system"/>
+ </class>
+
+ <class name="Journal">
+ <property name="queueRef" type="objId" access="RO" references="qpid.Queue" isGeneralReference="y"/>
+ <property name="name" type="sstr" access="RO" index="y"/>
+ <property name="directory" type="sstr" access="RO" desc="Directory containing journal files"/>
+ <property name="baseFileName" type="sstr" access="RO" desc="Base filename prefix for journal"/>
+ <property name="writePageSize" type="uint32" access="RO" unit="byte" desc="Page size in write-page-cache"/>
+ <property name="writePages" type="uint32" access="RO" unit="wpage" desc="Number of pages in write-page-cache"/>
+ <property name="readPageSize" type="uint32" access="RO" unit="byte" desc="Page size in read-page-cache"/>
+ <property name="readPages" type="uint32" access="RO" unit="rpage" desc="Number of pages in read-page-cache"/>
+ <!--property name="initialFileCount" type="uint16" access="RO" unit="file" desc="Number of files initially allocated to this journal"/-->
+ <!--property name="autoExpand" type="bool" access="RO" desc="Auto-expand enabled"/-->
+ <!--property name="currentFileCount" type="uint16" access="RO" unit="file" desc="Number of files currently allocated to this journal"/-->
+ <!--property name="maxFileCount" type="uint16" access="RO" unit="file" desc="Max number of files allowed for this journal"/-->
+ <!--property name="dataFileSize" type="uint32" access="RO" unit="byte" desc="Size of each journal data file"/-->
+
+ <statistic name="recordDepth" type="hilo32" unit="record" desc="Number of currently enqueued records (durable messages)"/>
+ <statistic name="enqueues" type="count64" unit="record" desc="Total enqueued records on journal"/>
+ <statistic name="dequeues" type="count64" unit="record" desc="Total dequeued records on journal"/>
+ <statistic name="txn" type="count32" unit="record" desc="Total open transactions (xids) on journal"/>
+ <statistic name="txnEnqueues" type="count64" unit="record" desc="Total transactional enqueued records on journal"/>
+ <statistic name="txnDequeues" type="count64" unit="record" desc="Total transactional dequeued records on journal"/>
+ <statistic name="txnCommits" type="count64" unit="record" desc="Total transactional commit records on journal"/>
+ <statistic name="txnAborts" type="count64" unit="record" desc="Total transactional abort records on journal"/>
+ <statistic name="outstandingAIOs" type="hilo32" unit="aio_op" desc="Number of currently outstanding AIO requests in Async IO system"/>
+
+<!--
+ The following are not yet "wired up" in JournalImpl.cpp
+-->
+ <statistic name="freeFileCount" type="hilo32" unit="file" desc="Number of files free on this journal. Includes free files trapped in holes."/>
+ <statistic name="availableFileCount" type="hilo32" unit="file" desc="Number of files available to be written. Excluding holes"/>
+ <statistic name="writeWaitFailures" type="count64" unit="record" desc="AIO Wait failures on write"/>
+ <statistic name="writeBusyFailures" type="count64" unit="record" desc="AIO Busy failures on write"/>
+ <statistic name="readRecordCount" type="count64" unit="record" desc="Records read from the journal"/>
+ <statistic name="readBusyFailures" type="count64" unit="record" desc="AIO Busy failures on read"/>
+ <statistic name="writePageCacheDepth" type="hilo32" unit="wpage" desc="Current depth of write-page-cache"/>
+ <statistic name="readPageCacheDepth" type="hilo32" unit="rpage" desc="Current depth of read-page-cache"/>
+
+ <!--method name="expand" desc="Increase number of files allocated for this journal">
+ <arg name="by" type="uint32" dir="I" desc="Number of files to increase journal size by"/>
+ </method-->
+ </class>
+
+ <eventArguments>
+ <!--arg name="autoExpand" type="bool" desc="Journal auto-expand enabled"/-->
+ <arg name="fileSize" type="uint32" desc="Journal file size in bytes"/>
+ <arg name="jrnlId" type="sstr" desc="Journal Id"/>
+ <arg name="numEnq" type="uint32" desc="Number of recovered enqueues"/>
+ <arg name="numFiles" type="uint16" desc="Number of journal files"/>
+ <arg name="numTxn" type="uint32" desc="Number of recovered transactions"/>
+ <arg name="numTxnDeq" type="uint32" desc="Number of recovered transactional dequeues"/>
+ <arg name="numTxnEnq" type="uint32" desc="Number of recovered transactional enqueues"/>
+ <arg name="what" type="sstr" desc="Description of event"/>
+ </eventArguments>
+ <event name="enqThresholdExceeded" sev="warn" args="jrnlId, what"/>
+ <event name="created" sev="notice" args="jrnlId, fileSize, numFiles"/>
+ <event name="full" sev="error" args="jrnlId, what"/>
+ <event name="recovered" sev="notice" args="jrnlId, fileSize, numFiles, numEnq, numTxn, numTxnEnq, numTxnDeq"/>
+</schema>
Added: qpid/trunk/qpid/cpp/src/tests/linearstore/linearstoredirsetup.sh
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/linearstore/linearstoredirsetup.sh?rev=1534736&view=auto
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/linearstore/linearstoredirsetup.sh (added)
+++ qpid/trunk/qpid/cpp/src/tests/linearstore/linearstoredirsetup.sh Tue Oct 22 19:09:56 2013
@@ -0,0 +1,24 @@
+#!/bin/bash
+
+STORE_DIR=/tmp
+LINEARSTOREDIR=~/RedHat/linearstore
+
+rm -rf $STORE_DIR/qls
+rm -rf $STORE_DIR/p002
+rm $STORE_DIR/p004
+
+mkdir $STORE_DIR/qls
+mkdir $STORE_DIR/p002
+touch $STORE_DIR/p004
+mkdir $STORE_DIR/qls/p001
+touch $STORE_DIR/qls/p003
+ln -s $STORE_DIR/p002 $STORE_DIR/qls/p002
+ln -s $STORE_DIR/p004 $STORE_DIR/qls/p004
+
+${LINEARSTOREDIR}/tools/src/py/linearstore/efptool.py $STORE_DIR/qls/ -a -p 1 -s 2048 -n 25
+${LINEARSTOREDIR}/tools/src/py/linearstore/efptool.py $STORE_DIR/qls/ -a -p 1 -s 512 -n 25
+${LINEARSTOREDIR}/tools/src/py/linearstore/efptool.py $STORE_DIR/qls/ -a -p 2 -s 2048 -n 25
+
+${LINEARSTOREDIR}/tools/src/py/linearstore/efptool.py $STORE_DIR/qls/ -l
+tree -la $STORE_DIR/qls
+
Propchange: qpid/trunk/qpid/cpp/src/tests/linearstore/linearstoredirsetup.sh
------------------------------------------------------------------------------
svn:executable = *
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org