You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kp...@apache.org on 2013/10/22 21:09:58 UTC
svn commit: r1534736 [4/8] - in /qpid/trunk/qpid/cpp/src: ./
qpid/linearstore/ qpid/linearstore/jrnl/ qpid/linearstore/jrnl/utils/
tests/linearstore/
Added: qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/LinearFileController.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/LinearFileController.cpp?rev=1534736&view=auto
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/LinearFileController.cpp (added)
+++ qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/LinearFileController.cpp Tue Oct 22 19:09:56 2013
@@ -0,0 +1,304 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "qpid/linearstore/jrnl/LinearFileController.h"
+
+#include <fstream>
+#include "qpid/linearstore/jrnl/EmptyFilePool.h"
+#include "qpid/linearstore/jrnl/jcfg.h"
+#include "qpid/linearstore/jrnl/jcntl.h"
+#include "qpid/linearstore/jrnl/JournalFile.h"
+#include "qpid/linearstore/jrnl/slock.h"
+#include "qpid/linearstore/jrnl/utils/file_hdr.h"
+
+namespace qpid {
+namespace qls_jrnl {
+
+LinearFileController::LinearFileController(jcntl& jcntlRef) :
+ jcntlRef_(jcntlRef),
+ emptyFilePoolPtr_(0),
+ currentJournalFilePtr_(0),
+ fileSeqCounter_(0),
+ recordIdCounter_(0)
+{}
+
+LinearFileController::~LinearFileController() {}
+
+void LinearFileController::initialize(const std::string& journalDirectory,
+ EmptyFilePool* emptyFilePoolPtr,
+ uint64_t initialFileNumberVal) {
+ journalDirectory_.assign(journalDirectory);
+ emptyFilePoolPtr_ = emptyFilePoolPtr;
+ fileSeqCounter_ = initialFileNumberVal;
+}
+
+void LinearFileController::finalize() {
+ while (!journalFileList_.empty()) {
+ delete journalFileList_.front();
+ journalFileList_.pop_front();
+ }
+}
+
+void LinearFileController::addJournalFile(const std::string& fileName,
+ const uint64_t fileNumber,
+ const uint32_t fileSize_kib,
+ const uint32_t completedDblkCount) {
+ if (currentJournalFilePtr_)
+ currentJournalFilePtr_->close();
+ currentJournalFilePtr_ = new JournalFile(fileName, fileNumber, fileSize_kib);
+ currentJournalFilePtr_->initialize(completedDblkCount);
+ {
+ slock l(journalFileListMutex_);
+ journalFileList_.push_back(currentJournalFilePtr_);
+ }
+ currentJournalFilePtr_->open();
+}
+
+efpDataSize_kib_t LinearFileController::dataSize_kib() const {
+ return emptyFilePoolPtr_->dataSize_kib();
+}
+
+efpDataSize_sblks_t LinearFileController::dataSize_sblks() const {
+ return emptyFilePoolPtr_->dataSize_sblks();
+}
+
+efpFileSize_kib_t LinearFileController::fileSize_kib() const {
+ return emptyFilePoolPtr_->fileSize_kib();
+}
+
+efpFileSize_sblks_t LinearFileController::fileSize_sblks() const {
+ return emptyFilePoolPtr_->fileSize_sblks();
+}
+
+uint64_t LinearFileController::getNextRecordId() {
+ return recordIdCounter_.increment();
+}
+
+void LinearFileController::pullEmptyFileFromEfp() {
+ if (currentJournalFilePtr_)
+ currentJournalFilePtr_->close();
+ std::string ef = emptyFilePoolPtr_->takeEmptyFile(journalDirectory_); // Moves file from EFP only, returns new file name
+//std::cout << "*** LinearFileController::pullEmptyFileFromEfp() qn=" << jcntlRef.id() << " ef=" << ef << std::endl; // DEBUG
+ addJournalFile(ef, getNextFileSeqNum(), emptyFilePoolPtr_->dataSize_kib(), 0);
+}
+
+void LinearFileController::purgeFilesToEfp() {
+ slock l(journalFileListMutex_);
+ while (journalFileList_.front()->isNoEnqueuedRecordsRemaining()) {
+ emptyFilePoolPtr_->returnEmptyFile(journalFileList_.front()->getFqFileName());
+ delete journalFileList_.front();
+ journalFileList_.pop_front();
+ }
+}
+
+uint32_t LinearFileController::getEnqueuedRecordCount(const efpFileCount_t fileSeqNumber) {
+ slock l(journalFileListMutex_);
+ return find(fileSeqNumber)->getEnqueuedRecordCount();
+}
+
+uint32_t LinearFileController::incrEnqueuedRecordCount(const efpFileCount_t fileSeqNumber) {
+ assertCurrentJournalFileValid("incrEnqueuedRecordCount");
+ return find(fileSeqNumber)->incrEnqueuedRecordCount();
+}
+
+uint32_t LinearFileController::decrEnqueuedRecordCount(const efpFileCount_t fileSeqNumber) {
+ slock l(journalFileListMutex_);
+ return find(fileSeqNumber)->decrEnqueuedRecordCount();
+}
+
+uint32_t LinearFileController::addWriteCompletedDblkCount(const efpFileCount_t fileSeqNumber, const uint32_t a) {
+ slock l(journalFileListMutex_);
+ return find(fileSeqNumber)->addCompletedDblkCount(a);
+}
+
+uint16_t LinearFileController::decrOutstandingAioOperationCount(const efpFileCount_t fileSeqNumber) {
+ slock l(journalFileListMutex_);
+ return find(fileSeqNumber)->decrOutstandingAioOperationCount();
+}
+
+void LinearFileController::asyncFileHeaderWrite(io_context_t ioContextPtr,
+ const uint16_t userFlags,
+ const uint64_t recordId,
+ const uint64_t firstRecordOffset) {
+ currentJournalFilePtr_->asyncFileHeaderWrite(ioContextPtr,
+ emptyFilePoolPtr_->getPartitionNumber(),
+ emptyFilePoolPtr_->dataSize_kib(),
+ userFlags,
+ recordId,
+ firstRecordOffset,
+ jcntlRef_.id());
+}
+
+void LinearFileController::asyncPageWrite(io_context_t ioContextPtr,
+ aio_cb* aioControlBlockPtr,
+ void* data,
+ uint32_t dataSize_dblks) {
+ assertCurrentJournalFileValid("asyncPageWrite");
+ currentJournalFilePtr_->asyncPageWrite(ioContextPtr, aioControlBlockPtr, data, dataSize_dblks);
+}
+
+uint64_t LinearFileController::getCurrentFileSeqNum() const {
+ assertCurrentJournalFileValid("getCurrentFileSeqNum");
+ return currentJournalFilePtr_->getFileSeqNum();
+}
+
+uint32_t LinearFileController::getEnqueuedRecordCount() const {
+ assertCurrentJournalFileValid("getEnqueuedRecordCount");
+ return currentJournalFilePtr_->getEnqueuedRecordCount();
+}
+
+uint32_t LinearFileController::incrEnqueuedRecordCount() {
+ assertCurrentJournalFileValid("incrEnqueuedRecordCount");
+ return currentJournalFilePtr_->incrEnqueuedRecordCount();
+}
+
+uint32_t LinearFileController::addEnqueuedRecordCount(const uint32_t a) {
+ assertCurrentJournalFileValid("addEnqueuedRecordCount");
+ return currentJournalFilePtr_->addEnqueuedRecordCount(a);
+}
+
+uint32_t LinearFileController::decrEnqueuedRecordCount() {
+ assertCurrentJournalFileValid("decrEnqueuedRecordCount");
+ return currentJournalFilePtr_->decrEnqueuedRecordCount();
+}
+
+uint32_t LinearFileController::subtrEnqueuedRecordCount(const uint32_t s) {
+ assertCurrentJournalFileValid("subtrEnqueuedRecordCount");
+ return currentJournalFilePtr_->subtrEnqueuedRecordCount(s);
+}
+
+uint32_t LinearFileController::getWriteSubmittedDblkCount() const {
+ assertCurrentJournalFileValid("getWriteSubmittedDblkCount");
+ return currentJournalFilePtr_->getSubmittedDblkCount();
+}
+
+uint32_t LinearFileController::addWriteSubmittedDblkCount(const uint32_t a) {
+ assertCurrentJournalFileValid("addWriteSubmittedDblkCount");
+ return currentJournalFilePtr_->addSubmittedDblkCount(a);
+}
+
+uint32_t LinearFileController::getWriteCompletedDblkCount() const {
+ assertCurrentJournalFileValid("getWriteCompletedDblkCount");
+ return currentJournalFilePtr_->getCompletedDblkCount();
+}
+
+uint32_t LinearFileController::addWriteCompletedDblkCount(const uint32_t a) {
+ assertCurrentJournalFileValid("addWriteCompletedDblkCount");
+ return currentJournalFilePtr_->addCompletedDblkCount(a);
+}
+
+uint16_t LinearFileController::getOutstandingAioOperationCount() const {
+ assertCurrentJournalFileValid("getOutstandingAioOperationCount");
+ return currentJournalFilePtr_->getOutstandingAioOperationCount();
+}
+
+uint16_t LinearFileController::incrOutstandingAioOperationCount() {
+ assertCurrentJournalFileValid("incrOutstandingAioOperationCount");
+ return currentJournalFilePtr_->incrOutstandingAioOperationCount();
+}
+
+uint16_t LinearFileController::decrOutstandingAioOperationCount() {
+ assertCurrentJournalFileValid("decrOutstandingAioOperationCount");
+ return currentJournalFilePtr_->decrOutstandingAioOperationCount();
+}
+
+bool LinearFileController::isEmpty() const {
+ assertCurrentJournalFileValid("isEmpty");
+ return currentJournalFilePtr_->isEmpty();
+}
+
+bool LinearFileController::isDataEmpty() const {
+ assertCurrentJournalFileValid("isDataEmpty");
+ return currentJournalFilePtr_->isDataEmpty();
+}
+
+u_int32_t LinearFileController::dblksRemaining() const {
+ assertCurrentJournalFileValid("dblksRemaining");
+ return currentJournalFilePtr_->dblksRemaining();
+}
+
+bool LinearFileController::isFull() const {
+ assertCurrentJournalFileValid("isFull");
+ return currentJournalFilePtr_->isFull();
+}
+
+bool LinearFileController::isFullAndComplete() const {
+ assertCurrentJournalFileValid("isFullAndComplete");
+ return currentJournalFilePtr_->isFullAndComplete();
+}
+
+u_int32_t LinearFileController::getOutstandingAioDblks() const {
+ assertCurrentJournalFileValid("getOutstandingAioDblks");
+ return currentJournalFilePtr_->getOutstandingAioDblks();
+}
+
+bool LinearFileController::needNextFile() const {
+ assertCurrentJournalFileValid("getNextFile");
+ return currentJournalFilePtr_->getNextFile();
+}
+
+const std::string LinearFileController::status(const uint8_t indentDepth) const {
+ std::string indent((size_t)indentDepth, '.');
+ std::ostringstream oss;
+ oss << indent << "LinearFileController: queue=" << jcntlRef_.id() << std::endl;
+ oss << indent << " journalDirectory=" << journalDirectory_ << std::endl;
+ oss << indent << " fileSeqCounter=" << fileSeqCounter_.get() << std::endl;
+ oss << indent << " recordIdCounter=" << recordIdCounter_.get() << std::endl;
+ oss << indent << " journalFileList.size=" << journalFileList_.size() << std::endl;
+ if (checkCurrentJournalFileValid()) {
+ oss << currentJournalFilePtr_->status_str(indentDepth+2);
+ } else {
+ oss << indent << " <No current journal file>" << std::endl;
+ }
+ return oss.str();
+}
+
+// --- protected functions ---
+
+bool LinearFileController::checkCurrentJournalFileValid() const {
+ return currentJournalFilePtr_ != 0;
+}
+
+void LinearFileController::assertCurrentJournalFileValid(const char* const functionName) const {
+ if (!checkCurrentJournalFileValid()) {
+ throw jexception(jerrno::JERR__NULL, "LinearFileController", functionName);
+ }
+}
+
+// NOTE: NOT THREAD SAFE - journalFileList is accessed by multiple threads - use under external lock
+JournalFile* LinearFileController::find(const efpFileCount_t fileSeqNumber) {
+ if (currentJournalFilePtr_ != 0 && currentJournalFilePtr_->getFileSeqNum() == fileSeqNumber)
+ return currentJournalFilePtr_;
+ for (JournalFileListItr_t i=journalFileList_.begin(); i!=journalFileList_.end(); ++i) {
+ if ((*i)->getFileSeqNum() == fileSeqNumber) {
+ return *i;
+ }
+ }
+ std::ostringstream oss;
+ oss << "fileSeqNumber=" << fileSeqNumber;
+ throw jexception(jerrno::JERR_LFCR_SEQNUMNOTFOUND, oss.str(), "LinearFileController", "find");
+}
+
+uint64_t LinearFileController::getNextFileSeqNum() {
+ return fileSeqCounter_.increment();
+}
+
+}} // namespace qpid::qls_jrnl
Added: qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/LinearFileController.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/LinearFileController.h?rev=1534736&view=auto
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/LinearFileController.h (added)
+++ qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/LinearFileController.h Tue Oct 22 19:09:56 2013
@@ -0,0 +1,136 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#ifndef QPID_LINEARSTORE_LINEARFILECONTROLLER_H_
+#define QPID_LINEARSTORE_LINEARFILECONTROLLER_H_
+
+#include <deque>
+#include "qpid/linearstore/jrnl/AtomicCounter.h"
+#include "qpid/linearstore/jrnl/EmptyFilePoolTypes.h"
+
+// libaio forward declares
+typedef struct io_context* io_context_t;
+typedef struct iocb aio_cb;
+
+namespace qpid {
+namespace qls_jrnl {
+
+class EmptyFilePool;
+class jcntl;
+class JournalFile;
+
+class LinearFileController
+{
+protected:
+ typedef std::deque<JournalFile*> JournalFileList_t;
+ typedef JournalFileList_t::iterator JournalFileListItr_t;
+
+ jcntl& jcntlRef_;
+ std::string journalDirectory_;
+ EmptyFilePool* emptyFilePoolPtr_;
+ JournalFile* currentJournalFilePtr_;
+ AtomicCounter<uint64_t> fileSeqCounter_;
+ AtomicCounter<uint64_t> recordIdCounter_;
+
+ JournalFileList_t journalFileList_;
+ smutex journalFileListMutex_;
+
+public:
+ LinearFileController(jcntl& jcntlRef);
+ virtual ~LinearFileController();
+
+ void initialize(const std::string& journalDirectory,
+ EmptyFilePool* emptyFilePoolPtr,
+ uint64_t initialFileNumberVal);
+ void finalize();
+
+ void addJournalFile(const std::string& fileName,
+ const uint64_t fileNumber,
+ const uint32_t fileSize_kib,
+ const uint32_t completedDblkCount);
+
+ efpDataSize_kib_t dataSize_kib() const;
+ efpDataSize_sblks_t dataSize_sblks() const;
+ efpFileSize_kib_t fileSize_kib() const;
+ efpFileSize_sblks_t fileSize_sblks() const;
+ uint64_t getNextRecordId();
+ void pullEmptyFileFromEfp();
+ void purgeFilesToEfp();
+
+ // Functions for manipulating counts of non-current JournalFile instances in journalFileList_
+ uint32_t getEnqueuedRecordCount(const efpFileCount_t fileSeqNumber);
+ uint32_t incrEnqueuedRecordCount(const efpFileCount_t fileSeqNumber);
+ uint32_t decrEnqueuedRecordCount(const efpFileCount_t fileSeqNumber);
+ uint32_t addWriteCompletedDblkCount(const efpFileCount_t fileSeqNumber,
+ const uint32_t a);
+ uint16_t decrOutstandingAioOperationCount(const efpFileCount_t fileSeqNumber);
+
+ // Pass-through functions for JournalFile class
+ void asyncFileHeaderWrite(io_context_t ioContextPtr,
+ const uint16_t userFlags,
+ const uint64_t recordId,
+ const uint64_t firstRecordOffset);
+ void asyncPageWrite(io_context_t ioContextPtr,
+ aio_cb* aioControlBlockPtr,
+ void* data,
+ uint32_t dataSize_dblks);
+
+ uint64_t getCurrentFileSeqNum() const;
+
+ uint32_t getEnqueuedRecordCount() const;
+ uint32_t incrEnqueuedRecordCount();
+ uint32_t addEnqueuedRecordCount(const uint32_t a);
+ uint32_t decrEnqueuedRecordCount();
+ uint32_t subtrEnqueuedRecordCount(const uint32_t s);
+
+ uint32_t getWriteSubmittedDblkCount() const;
+ uint32_t addWriteSubmittedDblkCount(const uint32_t a);
+
+ uint32_t getWriteCompletedDblkCount() const;
+ uint32_t addWriteCompletedDblkCount(const uint32_t a);
+
+ uint16_t getOutstandingAioOperationCount() const;
+ uint16_t incrOutstandingAioOperationCount();
+ uint16_t decrOutstandingAioOperationCount();
+
+ bool isEmpty() const; // True if no writes of any kind have occurred
+ bool isDataEmpty() const; // True if only file header written, data is still empty
+ u_int32_t dblksRemaining() const; // Dblks remaining until full
+ bool isFull() const; // True if all possible dblks have been submitted (but may not yet have returned from AIO)
+ bool isFullAndComplete() const; // True if all submitted dblks have returned from AIO
+ u_int32_t getOutstandingAioDblks() const; // Dblks still to be written
+ bool needNextFile() const; // True when next file is needed
+
+ // Debug aid
+ const std::string status(const uint8_t indentDepth) const;
+
+protected:
+ void assertCurrentJournalFileValid(const char* const functionName) const;
+ bool checkCurrentJournalFileValid() const;
+ JournalFile* find(const efpFileCount_t fileSeqNumber);
+ uint64_t getNextFileSeqNum();
+};
+
+typedef void (LinearFileController::*lfcAddJournalFileFn)(const std::string&, const uint64_t, const uint32_t, const uint32_t);
+
+}} // namespace qpid::qls_jrnl
+
+#endif // QPID_LINEARSTORE_LINEARFILECONTROLLER_H_
Added: qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/RecoveryManager.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/RecoveryManager.cpp?rev=1534736&view=auto
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/RecoveryManager.cpp (added)
+++ qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/RecoveryManager.cpp Tue Oct 22 19:09:56 2013
@@ -0,0 +1,645 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "qpid/linearstore/jrnl/RecoveryManager.h"
+
+#include <algorithm>
+#include <cstdlib>
+#include <iomanip>
+#include "qpid/linearstore/jrnl/data_tok.h"
+#include "qpid/linearstore/jrnl/deq_rec.h"
+#include "qpid/linearstore/jrnl/EmptyFilePoolManager.h"
+#include "qpid/linearstore/jrnl/enq_map.h"
+#include "qpid/linearstore/jrnl/enq_rec.h"
+#include "qpid/linearstore/jrnl/jcfg.h"
+#include "qpid/linearstore/jrnl/jdir.h"
+#include "qpid/linearstore/jrnl/JournalLog.h"
+#include "qpid/linearstore/jrnl/jrec.h"
+#include "qpid/linearstore/jrnl/LinearFileController.h"
+#include "qpid/linearstore/jrnl/txn_map.h"
+#include "qpid/linearstore/jrnl/txn_rec.h"
+#include "qpid/linearstore/jrnl/utils/enq_hdr.h"
+#include "qpid/linearstore/jrnl/utils/file_hdr.h"
+#include <sstream>
+#include <string>
+#include <vector>
+
+namespace qpid {
+namespace qls_jrnl
+{
+
+RecoveryManager::RecoveryManager(const std::string& journalDirectory,
+ const std::string& queuename,
+ enq_map& enqueueMapRef,
+ txn_map& transactionMapRef,
+ JournalLog& journalLogRef) :
+ journalDirectory_(journalDirectory),
+ queueName_(queuename),
+ enqueueMapRef_(enqueueMapRef),
+ transactionMapRef_(transactionMapRef),
+ journalLogRef_(journalLogRef),
+ journalEmptyFlag_(false),
+ firstRecordOffset_(0),
+ endOffset_(0),
+ highestRecordId_(0ULL),
+ highestFileNumber_(0ULL),
+ lastFileFullFlag_(false),
+ fileSize_kib_(0)
+{}
+
+RecoveryManager::~RecoveryManager() {}
+
+void RecoveryManager::analyzeJournals(const std::vector<std::string>* preparedTransactionListPtr,
+ EmptyFilePoolManager* emptyFilePoolManager,
+ EmptyFilePool** emptyFilePoolPtrPtr) {
+ // Analyze file headers of existing journal files
+ efpIdentity_t efpIdentity;
+ analyzeJournalFileHeaders(efpIdentity);
+ *emptyFilePoolPtrPtr = emptyFilePoolManager->getEmptyFilePool(efpIdentity);
+ fileSize_kib_ = (*emptyFilePoolPtrPtr)->fileSize_kib();
+ // Check for file full condition
+ lastFileFullFlag_ = endOffset_ == (std::streamoff)(*emptyFilePoolPtrPtr)->fileSize_kib() * 1024;
+
+ // Restore all read and write pointers and transactions
+ if (!journalEmptyFlag_) {
+ while (getNextRecordHeader()) {
+
+ }
+ if (inFileStream_.is_open()) {
+ inFileStream_.close();
+ }
+ // Remove leading files which have no enqueued records
+ removeEmptyFiles(*emptyFilePoolPtrPtr);
+
+ // Remove all txns from tmap that are not in the prepared list
+ if (preparedTransactionListPtr) {
+ std::vector<std::string> xidList;
+ transactionMapRef_.xid_list(xidList);
+ for (std::vector<std::string>::iterator itr = xidList.begin(); itr != xidList.end(); itr++) {
+ std::vector<std::string>::const_iterator pitr =
+ std::find(preparedTransactionListPtr->begin(), preparedTransactionListPtr->end(), *itr);
+ if (pitr == preparedTransactionListPtr->end()) { // not found in prepared list
+ txn_data_list tdl = transactionMapRef_.get_remove_tdata_list(*itr); // tdl will be empty if xid not found
+ // Unlock any affected enqueues in emap
+ for (tdl_itr i=tdl.begin(); i<tdl.end(); i++) {
+ if (i->_enq_flag) { // enq op - decrement enqueue count
+ enqueueCountList_[i->_pfid]--;
+ } else if (enqueueMapRef_.is_enqueued(i->_drid, true)) { // deq op - unlock enq record
+ int16_t ret = enqueueMapRef_.unlock(i->_drid);
+ if (ret < enq_map::EMAP_OK) { // fail
+ // enq_map::unlock()'s only error is enq_map::EMAP_RID_NOT_FOUND
+ std::ostringstream oss;
+ oss << std::hex << "_emap.unlock(): drid=0x\"" << i->_drid;
+ throw jexception(jerrno::JERR_MAP_NOTFOUND, oss.str(), "RecoveryManager", "analyzeJournals");
+ }
+ }
+ }
+ }
+ }
+ }
+ enqueueMapRef_.rid_list(recordIdList_);
+ recordIdListConstItr_ = recordIdList_.begin();
+ }
+}
+
+std::streamoff RecoveryManager::getEndOffset() const {
+ return endOffset_;
+}
+
+uint64_t RecoveryManager::getHighestFileNumber() const {
+ return highestFileNumber_;
+}
+
+uint64_t RecoveryManager::getHighestRecordId() const {
+ return highestRecordId_;
+}
+
+bool RecoveryManager::isLastFileFull() const {
+ return lastFileFullFlag_;
+}
+
+bool RecoveryManager::readNextRemainingRecord(void** const dataPtrPtr,
+ std::size_t& dataSize,
+ void** const xidPtrPtr,
+ std::size_t& xidSize,
+ bool& transient,
+ bool& external,
+ data_tok* const dtokp,
+ bool /*ignore_pending_txns*/) {
+ if (!dtokp->is_readable()) {
+ std::ostringstream oss;
+ oss << std::hex << std::setfill('0') << "dtok_id=0x" << std::setw(8) << dtokp->id();
+ oss << "; dtok_rid=0x" << std::setw(16) << dtokp->rid() << "; dtok_wstate=" << dtokp->wstate_str();
+ throw jexception(jerrno::JERR_JCNTL_ENQSTATE, oss.str(), "RecoveryManager", "readNextRemainingRecord");
+ }
+ if (recordIdListConstItr_ == recordIdList_.end()) {
+ return false;
+ }
+ enq_map::emap_data_struct_t eds;
+ enqueueMapRef_.get_data(*recordIdListConstItr_, eds);
+ uint64_t fileNumber = eds._pfid;
+ currentJournalFileConstItr_ = fileNumberNameMap_.find(fileNumber);
+ getNextFile(false);
+
+ inFileStream_.seekg(eds._file_posn, std::ifstream::beg);
+ if (!inFileStream_.good()) {
+ std::ostringstream oss;
+ oss << "Could not find offset 0x" << std::hex << eds._file_posn << " in file " << getCurrentFileName();
+ throw jexception(jerrno::JERR__FILEIO, oss.str(), "RecoveryManager", "readNextRemainingRecord");
+ }
+ ::enq_hdr_t enqueueHeader;
+ inFileStream_.read((char*)&enqueueHeader, sizeof(::enq_hdr_t));
+ if (inFileStream_.gcount() != sizeof(::enq_hdr_t)) {
+ std::ostringstream oss;
+ oss << "Could not read enqueue header from file " << getCurrentFileName() << " at offset 0x" << std::hex << eds._file_posn;
+ throw jexception(jerrno::JERR__FILEIO, oss.str(), "RecoveryManager", "readNextRemainingRecord");
+ }
+ // check flags
+ transient = ::is_enq_transient(&enqueueHeader);
+ external = ::is_enq_external(&enqueueHeader);
+
+ // read xid
+ xidSize = enqueueHeader._xidsize;
+ *xidPtrPtr = ::malloc(xidSize);
+ if (*xidPtrPtr == 0) {
+ std::ostringstream oss;
+ oss << "xidPtr, size=0x" << std::hex << xidSize;
+ throw jexception(jerrno::JERR__MALLOC, oss.str(), "RecoveryManager", "readNextRemainingRecord");
+ }
+ readJournalData((char*)*xidPtrPtr, xidSize);
+
+ // read data
+ dataSize = enqueueHeader._dsize;
+ *dataPtrPtr = ::malloc(dataSize);
+ if (*xidPtrPtr == 0) {
+ std::ostringstream oss;
+ oss << "dataPtr, size=0x" << std::hex << dataSize;
+ throw jexception(jerrno::JERR__MALLOC, oss.str(), "RecoveryManager", "readNextRemainingRecord");
+ }
+ readJournalData((char*)*dataPtrPtr, dataSize);
+ return true;
+}
+
+void RecoveryManager::setLinearFileControllerJournals(lfcAddJournalFileFn fnPtr,
+ LinearFileController* lfcPtr) {
+//std::cout << "****** RecoveryManager::setLinearFileControllerJournals():" << std::endl; // DEBUG
+ for (fileNumberNameMapConstItr_t i = fileNumberNameMap_.begin(); i != fileNumberNameMap_.end(); ++i) {
+ uint32_t fileDblkCount = i->first == highestFileNumber_ ? // Is this this last file?
+ endOffset_ / QLS_DBLK_SIZE_BYTES : // Last file uses _endOffset
+ fileSize_kib_ * 1024 / QLS_DBLK_SIZE_BYTES; // All others use file size to make them full
+ (lfcPtr->*fnPtr)(i->second, i->first, fileSize_kib_, fileDblkCount);
+//std::cout << " ** f=" << i->second.substr(i->second.rfind('/')+1) << ",fn=" << i->first << ",s=" << _fileSize_kib << ",eo=" << fileDblkCount << "(" << (fileDblkCount * QLS_DBLK_SIZE_BYTES / 1024) << "kiB)" << std::endl; // DEBUG
+ }
+}
+
+std::string RecoveryManager::toString(const std::string& jid,
+ bool compact) {
+ std::ostringstream oss;
+ if (compact) {
+ oss << "Recovery journal analysis (jid=\"" << jid << "\"):";
+ oss << " jfl=[";
+ for (fileNumberNameMapConstItr_t i=fileNumberNameMap_.begin(); i!=fileNumberNameMap_.end(); ++i) {
+ if (i!=fileNumberNameMap_.begin()) oss << " ";
+ oss << i->first << ":" << i->second.substr(i->second.rfind('/')+1);
+ }
+ oss << "] ecl=[ ";
+ for (enqueueCountListConstItr_t j = enqueueCountList_.begin(); j!=enqueueCountList_.end(); ++j) {
+ if (j != enqueueCountList_.begin()) oss << " ";
+ oss << *j;
+ }
+ oss << " ] empty=" << (journalEmptyFlag_ ? "T" : "F");
+ oss << " fro=0x" << std::hex << firstRecordOffset_ << std::dec << " (" << (firstRecordOffset_/QLS_DBLK_SIZE_BYTES) << " dblks)";
+ oss << " eo=0x" << std::hex << endOffset_ << std::dec << " (" << (endOffset_/QLS_DBLK_SIZE_BYTES) << " dblks)";
+ oss << " hrid=0x" << std::hex << highestRecordId_ << std::dec;
+ oss << " hfnum=0x" << std::hex << highestFileNumber_ << std::dec;
+ oss << " lffull=" << (lastFileFullFlag_ ? "T" : "F");
+ } else {
+ oss << "Recovery journal analysis (jid=\"" << jid << "\"):" << std::endl;
+ oss << " Number of journal files = " << fileNumberNameMap_.size() << std::endl;
+ oss << " Journal File List:" << std::endl;
+ for (fileNumberNameMapConstItr_t i=fileNumberNameMap_.begin(); i!=fileNumberNameMap_.end(); ++i) {
+ oss << " " << i->first << ": " << i->second.substr(i->second.rfind('/')+1) << std::endl;
+ }
+ oss << " Enqueue Counts: [ " << std::endl;
+ for (enqueueCountListConstItr_t j = enqueueCountList_.begin(); j!=enqueueCountList_.end(); ++j) {
+ if (j != enqueueCountList_.begin()) oss << ", ";
+ oss << *j;
+ }
+ oss << " ]" << std::endl;
+ oss << " Journal empty = " << (journalEmptyFlag_ ? "TRUE" : "FALSE") << std::endl;
+ oss << " First record offset in first file = 0x" << std::hex << firstRecordOffset_ <<
+ std::dec << " (" << (firstRecordOffset_/QLS_DBLK_SIZE_BYTES) << " dblks)" << std::endl;
+ oss << " End offset = 0x" << std::hex << endOffset_ << std::dec << " (" <<
+ (endOffset_/QLS_DBLK_SIZE_BYTES) << " dblks)" << std::endl;
+ oss << " Highest rid = 0x" << std::hex << highestRecordId_ << std::dec << std::endl;
+ oss << " Highest file number = 0x" << std::hex << highestFileNumber_ << std::dec << std::endl;
+ oss << " Last file full = " << (lastFileFullFlag_ ? "TRUE" : "FALSE") << std::endl;
+ oss << " Enqueued records (txn & non-txn):" << std::endl;
+ }
+ return oss.str();
+}
+
+// --- protected functions ---
+
+void RecoveryManager::analyzeJournalFileHeaders(efpIdentity_t& efpIdentity) {
+ std::string headerQueueName;
+ ::file_hdr_t fileHeader;
+ directoryList_t directoryList;
+ jdir::read_dir(journalDirectory_, directoryList, false, true, false, true);
+ for (directoryListConstItr_t i = directoryList.begin(); i != directoryList.end(); ++i) {
+ readJournalFileHeader(*i, fileHeader, headerQueueName);
+ if (headerQueueName.compare(queueName_) != 0) {
+ std::ostringstream oss;
+ oss << "Journal file " << (*i) << " belongs to queue \"" << headerQueueName << "\": ignoring";
+ journalLogRef_.log(JournalLog::LOG_WARN, queueName_, oss.str());
+ } else {
+ fileNumberNameMap_[fileHeader._file_number] = *i;
+ if (fileHeader._file_number > highestFileNumber_) {
+ highestFileNumber_ = fileHeader._file_number;
+ }
+ }
+ }
+ efpIdentity.first = fileHeader._efp_partition;
+ efpIdentity.second = fileHeader._file_size_kib;
+ enqueueCountList_.resize(fileNumberNameMap_.size(), 0);
+ currentJournalFileConstItr_ = fileNumberNameMap_.begin();
+}
+
+void RecoveryManager::checkFileStreamOk(bool checkEof) {
+ if (inFileStream_.fail() || inFileStream_.bad() || checkEof ? inFileStream_.eof() : false) {
+ throw jexception("read failure"); // TODO complete exception
+ }
+}
+
+void RecoveryManager::checkJournalAlignment(const std::streampos recordPosition) {
+ std::streampos currentPosn = recordPosition;
+ unsigned sblkOffset = currentPosn % QLS_SBLK_SIZE_BYTES;
+ if (sblkOffset)
+ {
+ std::ostringstream oss1;
+ oss1 << std::hex << "Bad record alignment found at fid=0x" << getCurrentFileNumber();
+ oss1 << " offs=0x" << currentPosn << " (likely journal overwrite boundary); " << std::dec;
+ oss1 << (QLS_SBLK_SIZE_DBLKS - (sblkOffset/QLS_DBLK_SIZE_BYTES)) << " filler record(s) required.";
+ journalLogRef_.log(JournalLog::LOG_WARN, queueName_, oss1.str());
+
+ std::ofstream outFileStream(getCurrentFileName().c_str(), std::ios_base::in | std::ios_base::out | std::ios_base::binary);
+ if (!outFileStream.good()) {
+ throw jexception(jerrno::JERR__FILEIO, getCurrentFileName(), "RecoveryManager", "checkJournalAlignment");
+ }
+ outFileStream.seekp(currentPosn);
+
+ // Prepare write buffer containing a single empty record (1 dblk)
+ void* writeBuffer = std::malloc(QLS_DBLK_SIZE_BYTES);
+ if (writeBuffer == 0) {
+ throw jexception(jerrno::JERR__MALLOC, "RecoveryManager", "checkJournalAlignment");
+ }
+ const uint32_t xmagic = QLS_EMPTY_MAGIC;
+ ::memcpy(writeBuffer, (const void*)&xmagic, sizeof(xmagic));
+ ::memset((char*)writeBuffer + sizeof(xmagic), QLS_CLEAN_CHAR, QLS_DBLK_SIZE_BYTES - sizeof(xmagic));
+
+ // Write as many empty records as are needed to get to sblk boundary
+ while (currentPosn % QLS_SBLK_SIZE_BYTES) {
+ outFileStream.write((const char*)writeBuffer, QLS_DBLK_SIZE_BYTES);
+ if (outFileStream.fail()) {
+ throw jexception(jerrno::JERR_RCVM_WRITE, "RecoveryManager", "checkJournalAlignment");
+ }
+ std::ostringstream oss2;
+ oss2 << std::hex << "Recover phase write: Wrote filler record: fid=0x" << getCurrentFileNumber();
+ oss2 << " offs=0x" << currentPosn;
+ journalLogRef_.log(JournalLog::LOG_NOTICE, queueName_, oss2.str());
+ currentPosn = outFileStream.tellp();
+ }
+ outFileStream.close();
+ std::free(writeBuffer);
+ journalLogRef_.log(JournalLog::LOG_INFO, queueName_, "Bad record alignment fixed.");
+ }
+ endOffset_ = currentPosn;
+}
+
+bool RecoveryManager::decodeRecord(jrec& record,
+ std::size_t& cumulativeSizeRead,
+ ::rec_hdr_t& headerRecord,
+ std::streampos& fileOffset)
+{
+// uint16_t start_fid = getCurrentFileNumber();
+ std::streampos start_file_offs = fileOffset;
+
+ if (highestRecordId_ == 0) {
+ highestRecordId_ = headerRecord._rid;
+ } else if (headerRecord._rid - highestRecordId_ < 0x8000000000000000ULL) { // RFC 1982 comparison for unsigned 64-bit
+ highestRecordId_ = headerRecord._rid;
+ }
+
+ bool done = false;
+ while (!done) {
+ try {
+ done = record.rcv_decode(headerRecord, &inFileStream_, cumulativeSizeRead);
+ }
+ catch (const jexception& e) {
+// TODO - review this logic and tidy up how rd._lfid is assigned. See new jinf.get_end_file() fn.
+// Original
+// if (e.err_code() != jerrno::JERR_JREC_BADRECTAIL ||
+// fid != (rd._ffid ? rd._ffid - 1 : _num_jfiles - 1)) throw;
+// Tried this, but did not work
+// if (e.err_code() != jerrno::JERR_JREC_BADRECTAIL || h._magic != 0) throw;
+ checkJournalAlignment(start_file_offs);
+// rd._lfid = start_fid;
+ return false;
+ }
+ if (!done && !getNextFile(false)) {
+ checkJournalAlignment(start_file_offs);
+ return false;
+ }
+ }
+ return true;
+}
+
+std::string RecoveryManager::getCurrentFileName() const {
+ return currentJournalFileConstItr_->second;
+}
+
+uint64_t RecoveryManager::getCurrentFileNumber() const {
+ return currentJournalFileConstItr_->first;
+}
+
+bool RecoveryManager::getNextFile(bool jumpToFirstRecordOffsetFlag) {
+ if (inFileStream_.is_open()) {
+ if (inFileStream_.eof() || !inFileStream_.good())
+ {
+ inFileStream_.clear();
+ endOffset_ = inFileStream_.tellg(); // remember file offset before closing
+ if (endOffset_ == -1) { throw jexception("tellg() failure"); } // Check for error code -1 TODO: compelete exception
+ inFileStream_.close();
+ if (++currentJournalFileConstItr_ == fileNumberNameMap_.end()) {
+ return false;
+ }
+ }
+ }
+ if (!inFileStream_.is_open())
+ {
+ inFileStream_.clear(); // clear eof flag, req'd for older versions of c++
+ inFileStream_.open(getCurrentFileName().c_str(), std::ios_base::in | std::ios_base::binary);
+ if (!inFileStream_.good()) {
+ throw jexception(jerrno::JERR__FILEIO, getCurrentFileName(), "RecoveryManager", "getNextFile");
+ }
+
+ // Read file header
+//std::cout << " F" << getCurrentFileNumber() << std::flush; // DEBUG
+ file_hdr_t fhdr;
+ inFileStream_.read((char*)&fhdr, sizeof(fhdr));
+ checkFileStreamOk(true);
+ if (fhdr._rhdr._magic == QLS_FILE_MAGIC) {
+ firstRecordOffset_ = fhdr._fro;
+ std::streamoff foffs = jumpToFirstRecordOffsetFlag ? firstRecordOffset_ : QLS_SBLK_SIZE_BYTES;
+ inFileStream_.seekg(foffs);
+ } else {
+ inFileStream_.close();
+ if (currentJournalFileConstItr_ == fileNumberNameMap_.begin()) {
+ journalEmptyFlag_ = true;
+ }
+ return false;
+ }
+ }
+ return true;
+}
+
+bool RecoveryManager::getNextRecordHeader()
+{
+ std::size_t cum_size_read = 0;
+ void* xidp = 0;
+ rec_hdr_t h;
+
+ bool hdr_ok = false;
+ std::streampos file_pos;
+ while (!hdr_ok) {
+ if (!inFileStream_.is_open()) {
+ if (!getNextFile(true)) {
+ return false;
+ }
+ }
+ file_pos = inFileStream_.tellg();
+//std::cout << " 0x" << std::hex << file_pos << std::dec; // DEBUG
+ inFileStream_.read((char*)&h, sizeof(rec_hdr_t));
+ if (inFileStream_.gcount() == sizeof(rec_hdr_t)) {
+ hdr_ok = true;
+ } else {
+ if (!getNextFile(true)) {
+ return false;
+ }
+ }
+ }
+
+ switch(h._magic) {
+ case QLS_ENQ_MAGIC:
+ {
+//std::cout << ".e" << std::flush; // DEBUG
+ enq_rec er;
+ uint64_t start_fid = getCurrentFileNumber(); // fid may increment in decode() if record folds over file boundary
+ if (!decodeRecord(er, cum_size_read, h, file_pos)) {
+ return false;
+ }
+ if (!er.is_transient()) { // Ignore transient msgs
+ enqueueCountList_[start_fid]++;
+ if (er.xid_size()) {
+ er.get_xid(&xidp);
+ if (xidp != 0) { throw jexception("Null xid with non-null xid_size"); } // TODO complete exception
+ std::string xid((char*)xidp, er.xid_size());
+ transactionMapRef_.insert_txn_data(xid, txn_data(h._rid, 0, start_fid, true));
+ if (transactionMapRef_.set_aio_compl(xid, h._rid) < txn_map::TMAP_OK) { // fail - xid or rid not found
+ std::ostringstream oss;
+ oss << std::hex << "_tmap.set_aio_compl: txn_enq xid=\"" << xid << "\" rid=0x" << h._rid;
+ throw jexception(jerrno::JERR_MAP_NOTFOUND, oss.str(), "RecoveryManager", "getNextRecordHeader");
+ }
+ std::free(xidp);
+ } else {
+ if (enqueueMapRef_.insert_pfid(h._rid, start_fid, file_pos) < enq_map::EMAP_OK) { // fail
+ // The only error code emap::insert_pfid() returns is enq_map::EMAP_DUP_RID.
+ std::ostringstream oss;
+ oss << std::hex << "rid=0x" << h._rid << " _pfid=0x" << start_fid;
+ throw jexception(jerrno::JERR_MAP_DUPLICATE, oss.str(), "RecoveryManager", "getNextRecordHeader");
+ }
+ }
+ }
+ }
+ break;
+ case QLS_DEQ_MAGIC:
+ {
+//std::cout << ".d" << std::flush; // DEBUG
+ deq_rec dr;
+ uint16_t start_fid = getCurrentFileNumber(); // fid may increment in decode() if record folds over file boundary
+ if (!decodeRecord(dr, cum_size_read, h, file_pos)) {
+ return false;
+ }
+ if (dr.xid_size()) {
+ // If the enqueue is part of a pending txn, it will not yet be in emap
+ enqueueMapRef_.lock(dr.deq_rid()); // ignore not found error
+ dr.get_xid(&xidp);
+ if (xidp != 0) { throw jexception("Null xid with non-null xid_size"); } // TODO complete exception
+ std::string xid((char*)xidp, dr.xid_size());
+ transactionMapRef_.insert_txn_data(xid, txn_data(dr.rid(), dr.deq_rid(), start_fid, false,
+ dr.is_txn_coml_commit()));
+ if (transactionMapRef_.set_aio_compl(xid, dr.rid()) < txn_map::TMAP_OK) { // fail - xid or rid not found
+ std::ostringstream oss;
+ oss << std::hex << "_tmap.set_aio_compl: txn_deq xid=\"" << xid << "\" rid=0x" << dr.rid();
+ throw jexception(jerrno::JERR_MAP_NOTFOUND, oss.str(), "RecoveryManager", "getNextRecordHeader");
+ }
+ std::free(xidp);
+ } else {
+ uint64_t enq_fid;
+ if (enqueueMapRef_.get_remove_pfid(dr.deq_rid(), enq_fid, true) == enq_map::EMAP_OK) { // ignore not found error
+ enqueueCountList_[enq_fid]--;
+ }
+ }
+ }
+ break;
+ case QLS_TXA_MAGIC:
+ {
+//std::cout << ".a" << std::flush; // DEBUG
+ txn_rec ar;
+ if (!decodeRecord(ar, cum_size_read, h, file_pos)) {
+ return false;
+ }
+ // Delete this txn from tmap, unlock any locked records in emap
+ ar.get_xid(&xidp);
+ if (xidp != 0) {
+ throw jexception("Null xid with non-null xid_size"); // TODO complete exception
+ }
+ std::string xid((char*)xidp, ar.xid_size());
+ txn_data_list tdl = transactionMapRef_.get_remove_tdata_list(xid); // tdl will be empty if xid not found
+ for (tdl_itr itr = tdl.begin(); itr != tdl.end(); itr++) {
+ if (itr->_enq_flag) {
+ enqueueCountList_[itr->_pfid]--;
+ } else {
+ enqueueMapRef_.unlock(itr->_drid); // ignore not found error
+ }
+ }
+ std::free(xidp);
+ }
+ break;
+ case QLS_TXC_MAGIC:
+ {
+//std::cout << ".t" << std::flush; // DEBUG
+ txn_rec cr;
+ if (!decodeRecord(cr, cum_size_read, h, file_pos)) {
+ return false;
+ }
+ // Delete this txn from tmap, process records into emap
+ cr.get_xid(&xidp);
+ if (xidp != 0) {
+ throw jexception("Null xid with non-null xid_size"); // TODO complete exception
+ }
+ std::string xid((char*)xidp, cr.xid_size());
+ txn_data_list tdl = transactionMapRef_.get_remove_tdata_list(xid); // tdl will be empty if xid not found
+ for (tdl_itr itr = tdl.begin(); itr != tdl.end(); itr++) {
+ if (itr->_enq_flag) { // txn enqueue
+ if (enqueueMapRef_.insert_pfid(itr->_rid, itr->_pfid, file_pos) < enq_map::EMAP_OK) { // fail
+ // The only error code emap::insert_pfid() returns is enq_map::EMAP_DUP_RID.
+ std::ostringstream oss;
+ oss << std::hex << "rid=0x" << itr->_rid << " _pfid=0x" << itr->_pfid;
+ throw jexception(jerrno::JERR_MAP_DUPLICATE, oss.str(), "RecoveryManager", "getNextRecordHeader");
+ }
+ } else { // txn dequeue
+ uint64_t enq_fid;
+ if (enqueueMapRef_.get_remove_pfid(itr->_drid, enq_fid, true) == enq_map::EMAP_OK) // ignore not found error
+ enqueueCountList_[enq_fid]--;
+ }
+ }
+ std::free(xidp);
+ }
+ break;
+ case QLS_EMPTY_MAGIC:
+ {
+//std::cout << ".x" << std::flush; // DEBUG
+ uint32_t rec_dblks = jrec::size_dblks(sizeof(::rec_hdr_t));
+ inFileStream_.ignore(rec_dblks * QLS_DBLK_SIZE_BYTES - sizeof(::rec_hdr_t));
+ checkFileStreamOk(false);
+ if (!getNextFile(false)) {
+ return false;
+ }
+ }
+ break;
+ case 0:
+//std::cout << ".0" << std::endl << std::flush; // DEBUG
+ checkJournalAlignment(file_pos);
+ return false;
+ default:
+//std::cout << ".?" << std::endl << std::flush; // DEBUG
+ // Stop as this is the overwrite boundary.
+ checkJournalAlignment(file_pos);
+ return false;
+ }
+ return true;
+}
+
+void RecoveryManager::readJournalData(char* target,
+ const std::streamsize readSize) {
+ std::streamoff bytesRead = 0;
+ while (bytesRead < readSize) {
+ if (inFileStream_.eof()) {
+ getNextFile(false);
+ }
+ bool readFitsInFile = inFileStream_.tellg() + readSize <= fileSize_kib_ * 1024;
+ std::streamoff readSize = readFitsInFile ? readSize : (fileSize_kib_ * 1024) - inFileStream_.tellg();
+ inFileStream_.read(target + bytesRead, readSize);
+ if (inFileStream_.gcount() != readSize) {
+ throw jexception(); // TODO - proper exception
+ }
+ bytesRead += readSize;
+ }
+}
+
+// static private
+void RecoveryManager::readJournalFileHeader(const std::string& journalFileName,
+ ::file_hdr_t& fileHeaderRef,
+ std::string& queueName) {
+ const std::size_t headerBlockSize = QLS_JRNL_FHDR_RES_SIZE_SBLKS * QLS_SBLK_SIZE_KIB * 1024;
+ char buffer[headerBlockSize];
+ std::ifstream ifs(journalFileName.c_str(), std::ifstream::in | std::ifstream::binary);
+ if (!ifs.good()) {
+ std::ostringstream oss;
+ oss << "File=" << journalFileName;
+ throw jexception(jerrno::JERR_RCVM_OPENRD, oss.str(), "RecoveryManager", "readJournalFileHeader");
+ }
+ ifs.read(buffer, headerBlockSize);
+ if (!ifs) {
+ std::streamsize s = ifs.gcount();
+ ifs.close();
+ std::ostringstream oss;
+ oss << "File=" << journalFileName << "; attempted_read_size=" << headerBlockSize << "; actual_read_size=" << s;
+ throw jexception(jerrno::JERR_RCVM_READ, oss.str(), "RecoveryManager", "readJournalFileHeader");
+ }
+ ifs.close();
+ ::memcpy(&fileHeaderRef, buffer, sizeof(::file_hdr_t));
+ queueName.assign(buffer + sizeof(::file_hdr_t), fileHeaderRef._queue_name_len);
+
+}
+
+void RecoveryManager::removeEmptyFiles(EmptyFilePool* emptyFilePoolPtr) {
+ while (enqueueCountList_.front() == 0 && enqueueCountList_.size() > 1) {
+ fileNumberNameMapItr_t i = fileNumberNameMap_.begin();
+//std::cout << "*** File " << i->first << ": " << i->second << " is empty." << std::endl;
+ emptyFilePoolPtr->returnEmptyFile(i->second);
+ fileNumberNameMap_.erase(i);
+ enqueueCountList_.pop_front();
+ }
+}
+
+}} // namespace qpid::qls_jrnl
Added: qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/RecoveryManager.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/RecoveryManager.h?rev=1534736&view=auto
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/RecoveryManager.h (added)
+++ qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/RecoveryManager.h Tue Oct 22 19:09:56 2013
@@ -0,0 +1,134 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#ifndef QPID_LINEARSTORE_RECOVERYSTATE_H_
+#define QPID_LINEARSTORE_RECOVERYSTATE_H_
+
+#include <deque>
+#include <fstream>
+#include <map>
+#include "qpid/linearstore/jrnl/LinearFileController.h"
+#include <stdint.h>
+#include <vector>
+
+struct file_hdr_t;
+struct rec_hdr_t;
+
+namespace qpid {
+namespace qls_jrnl {
+
+class data_tok;
+class enq_map;
+class EmptyFilePool;
+class EmptyFilePoolManager;
+class JournalLog;
+class jrec;
+class txn_map;
+
+class RecoveryManager
+{
+protected:
+ // Types
+ typedef std::vector<std::string> directoryList_t;
+ typedef directoryList_t::const_iterator directoryListConstItr_t;
+ typedef std::map<uint64_t, std::string> fileNumberNameMap_t;
+ typedef fileNumberNameMap_t::iterator fileNumberNameMapItr_t;
+ typedef fileNumberNameMap_t::const_iterator fileNumberNameMapConstItr_t;
+ typedef std::deque<uint32_t> enqueueCountList_t;
+ typedef enqueueCountList_t::const_iterator enqueueCountListConstItr_t;
+ typedef std::vector<uint64_t> recordIdList_t;
+ typedef recordIdList_t::const_iterator recordIdListConstItr_t;
+
+ // Location and identity
+ const std::string journalDirectory_;
+ const std::string queueName_;
+ enq_map& enqueueMapRef_;
+ txn_map& transactionMapRef_;
+ JournalLog& journalLogRef_;
+
+ // Initial journal analysis data
+ fileNumberNameMap_t fileNumberNameMap_; ///< File number - name map
+ enqueueCountList_t enqueueCountList_; ///< Number enqueued records found for each file
+ bool journalEmptyFlag_; ///< Journal data files empty
+ std::streamoff firstRecordOffset_; ///< First record offset in ffid
+ std::streamoff endOffset_; ///< End offset (first byte past last record)
+ uint64_t highestRecordId_; ///< Highest rid found
+ uint64_t highestFileNumber_; ///< Highest file number found
+ bool lastFileFullFlag_; ///< Last file is full
+
+ // State for recovery of individual enqueued records
+ uint32_t fileSize_kib_;
+ fileNumberNameMapConstItr_t currentJournalFileConstItr_;
+ std::string currentFileName_;
+ std::ifstream inFileStream_;
+ recordIdList_t recordIdList_;
+ recordIdListConstItr_t recordIdListConstItr_;
+
+public:
+ RecoveryManager(const std::string& journalDirectory,
+ const std::string& queuename,
+ enq_map& enqueueMapRef,
+ txn_map& transactionMapRef,
+ JournalLog& journalLogRef);
+ virtual ~RecoveryManager();
+
+ void analyzeJournals(const std::vector<std::string>* preparedTransactionListPtr,
+ EmptyFilePoolManager* emptyFilePoolManager,
+ EmptyFilePool** emptyFilePoolPtrPtr);
+ std::streamoff getEndOffset() const;
+ uint64_t getHighestFileNumber() const;
+ uint64_t getHighestRecordId() const;
+ bool isLastFileFull() const;
+ bool readNextRemainingRecord(void** const dataPtrPtr,
+ std::size_t& dataSize,
+ void** const xidPtrPtr,
+ std::size_t& xidSize,
+ bool& transient,
+ bool& external,
+ data_tok* const dtokp,
+ bool ignore_pending_txns);
+ void setLinearFileControllerJournals(lfcAddJournalFileFn fnPtr,
+ LinearFileController* lfcPtr);
+ std::string toString(const std::string& jid,
+ bool compact = true);
+protected:
+ void analyzeJournalFileHeaders(efpIdentity_t& efpIdentity);
+ void checkFileStreamOk(bool checkEof);
+ void checkJournalAlignment(const std::streampos recordPosition);
+ bool decodeRecord(jrec& record,
+ std::size_t& cumulativeSizeRead,
+ ::rec_hdr_t& recordHeader,
+ std::streampos& fileOffset);
+ std::string getCurrentFileName() const;
+ uint64_t getCurrentFileNumber() const;
+ bool getNextFile(bool jumpToFirstRecordOffsetFlag);
+ bool getNextRecordHeader();
+ void readJournalData(char* target, const std::streamsize size);
+ void removeEmptyFiles(EmptyFilePool* emptyFilePoolPtr);
+
+ static void readJournalFileHeader(const std::string& journalFileName,
+ ::file_hdr_t& fileHeaderRef,
+ std::string& queueName);
+};
+
+}} // namespace qpid::qls_jrnl
+
+#endif // QPID_LINEARSTORE_RECOVERYSTATE_H_
Added: qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/aio.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/aio.h?rev=1534736&view=auto
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/aio.h (added)
+++ qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/aio.h Tue Oct 22 19:09:56 2013
@@ -0,0 +1,140 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#ifndef QPID_LEGACYSTORE_JRNL_AIO_H
+#define QPID_LEGACYSTORE_JRNL_AIO_H
+
+#include <libaio.h>
+#include <cstring>
+#include <string.h>
+
+namespace qpid
+{
+namespace qls_jrnl
+{
+
+typedef iocb aio_cb;
+typedef io_event aio_event;
+
+/**
+ * \brief This class is a C++ wrapper class for the libaio functions used by the journal. Note that only those
+ * functions used by the journal are included here. This is not a complete implementation of all libaio functions.
+ */
+class aio
+{
+public:
+ static inline int queue_init(int maxevents, io_context_t* ctxp)
+ {
+ return ::io_queue_init(maxevents, ctxp);
+ }
+
+ static inline int queue_release(io_context_t ctx)
+ {
+ return ::io_queue_release(ctx);
+ }
+
+ static inline int submit(io_context_t ctx, long nr, aio_cb* aios[])
+ {
+ return ::io_submit(ctx, nr, aios);
+ }
+
+ static inline int getevents(io_context_t ctx, long min_nr, long nr, aio_event* events, timespec* const timeout)
+ {
+ return ::io_getevents(ctx, min_nr, nr, events, timeout);
+ }
+
+ /**
+ * \brief This function allows iocbs to be initialized with a pointer that can be re-used. This prepares an
+ * aio_cb struct for read use. (This is a wrapper for libaio's ::io_prep_pread() function.)
+ *
+ * \param aiocbp Pointer to the aio_cb struct to be prepared.
+ * \param fd File descriptor to be used for read.
+ * \param buf Pointer to buffer in which read data is to be placed.
+ * \param count Number of bytes to read - buffer must be large enough.
+ * \param offset Offset within file from which data will be read.
+ */
+ static inline void prep_pread(aio_cb* aiocbp, int fd, void* buf, std::size_t count, int64_t offset)
+ {
+ ::io_prep_pread(aiocbp, fd, buf, count, offset);
+ }
+
+ /**
+ * \brief Special version of libaio's io_prep_pread() which preserves the value of the data pointer. This allows
+ * iocbs to be initialized with a pointer that can be re-used. This prepares a aio_cb struct for read use.
+ *
+ * \param aiocbp Pointer to the aio_cb struct to be prepared.
+ * \param fd File descriptor to be used for read.
+ * \param buf Pointer to buffer in which read data is to be placed.
+ * \param count Number of bytes to read - buffer must be large enough.
+ * \param offset Offset within file from which data will be read.
+ */
+ static inline void prep_pread_2(aio_cb* aiocbp, int fd, void* buf, std::size_t count, int64_t offset)
+ {
+ std::memset((void*) ((char*) aiocbp + sizeof(void*)), 0, sizeof(aio_cb) - sizeof(void*));
+ aiocbp->aio_fildes = fd;
+ aiocbp->aio_lio_opcode = IO_CMD_PREAD;
+ aiocbp->aio_reqprio = 0;
+ aiocbp->u.c.buf = buf;
+ aiocbp->u.c.nbytes = count;
+ aiocbp->u.c.offset = offset;
+ }
+
+ /**
+ * \brief This function allows iocbs to be initialized with a pointer that can be re-used. This function prepares
+ * an aio_cb struct for write use. (This is a wrapper for libaio's ::io_prep_pwrite() function.)
+ *
+ * \param aiocbp Pointer to the aio_cb struct to be prepared.
+ * \param fd File descriptor to be used for write.
+ * \param buf Pointer to buffer in which data to be written is located.
+ * \param count Number of bytes to write.
+ * \param offset Offset within file to which data will be written.
+ */
+ static inline void prep_pwrite(aio_cb* aiocbp, int fd, void* buf, std::size_t count, int64_t offset)
+ {
+ ::io_prep_pwrite(aiocbp, fd, buf, count, offset);
+ }
+
+ /**
+ * \brief Special version of libaio's io_prep_pwrite() which preserves the value of the data pointer. This allows
+ * iocbs to be initialized with a pointer that can be re-used. This function prepares an aio_cb struct for write
+ * use.
+ *
+ * \param aiocbp Pointer to the aio_cb struct to be prepared.
+ * \param fd File descriptor to be used for write.
+ * \param buf Pointer to buffer in which data to be written is located.
+ * \param count Number of bytes to write.
+ * \param offset Offset within file to which data will be written.
+ */
+ static inline void prep_pwrite_2(aio_cb* aiocbp, int fd, void* buf, std::size_t count, int64_t offset)
+ {
+ std::memset((void*) ((char*) aiocbp + sizeof(void*)), 0, sizeof(aio_cb) - sizeof(void*));
+ aiocbp->aio_fildes = fd;
+ aiocbp->aio_lio_opcode = IO_CMD_PWRITE;
+ aiocbp->aio_reqprio = 0;
+ aiocbp->u.c.buf = buf;
+ aiocbp->u.c.nbytes = count;
+ aiocbp->u.c.offset = offset;
+ }
+};
+
+}}
+
+#endif // ifndef QPID_LEGACYSTORE_JRNL_AIO_H
Added: qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/aio_callback.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/aio_callback.h?rev=1534736&view=auto
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/aio_callback.h (added)
+++ qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/aio_callback.h Tue Oct 22 19:09:56 2013
@@ -0,0 +1,45 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#ifndef QPID_LEGACYSTORE_JRNL_AIO_CALLBACK_H
+#define QPID_LEGACYSTORE_JRNL_AIO_CALLBACK_H
+
+#include <stdint.h>
+#include <vector>
+
+namespace qpid
+{
+namespace qls_jrnl
+{
+
+ class data_tok;
+
+ class aio_callback
+ {
+ public:
+ virtual ~aio_callback() {}
+ virtual void wr_aio_cb(std::vector<data_tok*>& dtokl) = 0;
+ virtual void rd_aio_cb(std::vector<uint16_t>& pil) = 0;
+ };
+
+}}
+
+#endif // ifndef QPID_LEGACYSTORE_JRNL_AIO_CALLBACK_H
Added: qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/cvar.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/cvar.h?rev=1534736&view=auto
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/cvar.h (added)
+++ qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/cvar.h Tue Oct 22 19:09:56 2013
@@ -0,0 +1,76 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#ifndef QPID_LEGACYSTORE_JRNL_CVAR_H
+#define QPID_LEGACYSTORE_JRNL_CVAR_H
+
+#include <cstring>
+#include "qpid/linearstore/jrnl/jerrno.h"
+#include "qpid/linearstore/jrnl/jexception.h"
+#include "qpid/linearstore/jrnl/smutex.h"
+#include "qpid/linearstore/jrnl/time_ns.h"
+#include <pthread.h>
+#include <sstream>
+
+namespace qpid
+{
+namespace qls_jrnl
+{
+
+ // Ultra-simple thread condition variable class
+ class cvar
+ {
+ private:
+ const smutex& _sm;
+ pthread_cond_t _c;
+ public:
+ inline cvar(const smutex& sm) : _sm(sm) { ::pthread_cond_init(&_c, 0); }
+ inline ~cvar() { ::pthread_cond_destroy(&_c); }
+ inline void wait()
+ {
+ PTHREAD_CHK(::pthread_cond_wait(&_c, _sm.get()), "::pthread_cond_wait", "cvar", "wait");
+ }
+ inline void timedwait(timespec& ts)
+ {
+ PTHREAD_CHK(::pthread_cond_timedwait(&_c, _sm.get(), &ts), "::pthread_cond_timedwait", "cvar", "timedwait");
+ }
+ inline bool waitintvl(const long intvl_ns)
+ {
+ time_ns t; t.now(); t+=intvl_ns;
+ int ret = ::pthread_cond_timedwait(&_c, _sm.get(), &t);
+ if (ret == ETIMEDOUT)
+ return true;
+ PTHREAD_CHK(ret, "::pthread_cond_timedwait", "cvar", "waitintvl");
+ return false;
+ }
+ inline void signal()
+ {
+ PTHREAD_CHK(::pthread_cond_signal(&_c), "::pthread_cond_signal", "cvar", "notify");
+ }
+ inline void broadcast()
+ {
+ PTHREAD_CHK(::pthread_cond_broadcast(&_c), "::pthread_cond_broadcast", "cvar", "broadcast");
+ }
+ };
+
+}}
+
+#endif // ifndef QPID_LEGACYSTORE_JRNL_CVAR_H
Added: qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/data_tok.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/data_tok.cpp?rev=1534736&view=auto
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/data_tok.cpp (added)
+++ qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/data_tok.cpp Tue Oct 22 19:09:56 2013
@@ -0,0 +1,188 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "qpid/linearstore/jrnl/data_tok.h"
+
+#include <iomanip>
+#include "qpid/linearstore/jrnl/jerrno.h"
+#include "qpid/linearstore/jrnl/jexception.h"
+#include "qpid/linearstore/jrnl/slock.h"
+#include <sstream>
+
+namespace qpid
+{
+namespace qls_jrnl
+{
+
+// Static members
+
+uint64_t data_tok::_cnt = 0;
+smutex data_tok::_mutex;
+
+data_tok::data_tok():
+ _wstate(NONE),
+// _rstate(UNREAD),
+ _dsize(0),
+ _dblks_written(0),
+// _dblks_read(0),
+ _pg_cnt(0),
+ _fid(0),
+ _rid(0),
+ _xid(),
+ _dequeue_rid(0),
+ _external_rid(false)
+{
+ slock s(_mutex);
+ _icnt = _cnt++;
+}
+
+data_tok::~data_tok() {}
+
+const char*
+data_tok::wstate_str() const
+{
+ return wstate_str(_wstate);
+}
+
+const char*
+data_tok::wstate_str(write_state wstate)
+{
+ switch (wstate)
+ {
+ case NONE:
+ return "NONE";
+ case ENQ_CACHED:
+ return "ENQ_CACHED";
+ case ENQ_PART:
+ return "ENQ_PART";
+ case ENQ_SUBM:
+ return "ENQ_SUBM";
+ case ENQ:
+ return "ENQ";
+ case DEQ_CACHED:
+ return "DEQ_CACHED";
+ case DEQ_PART:
+ return "DEQ_PART";
+ case DEQ_SUBM:
+ return "DEQ_SUBM";
+ case DEQ:
+ return "DEQ";
+ case ABORT_CACHED:
+ return "ABORT_CACHED";
+ case ABORT_PART:
+ return "ABORT_PART";
+ case ABORT_SUBM:
+ return "ABORT_SUBM";
+ case ABORTED:
+ return "ABORTED";
+ case COMMIT_CACHED:
+ return "COMMIT_CACHED";
+ case COMMIT_PART:
+ return "COMMIT_PART";
+ case COMMIT_SUBM:
+ return "COMMIT_SUBM";
+ case COMMITTED:
+ return "COMMITTED";
+ }
+ // Not using default: forces compiler to ensure all cases are covered.
+ return "<wstate unknown>";
+}
+
+/*
+const char*
+data_tok::rstate_str() const
+{
+ return rstate_str(_rstate);
+}
+*/
+
+/*
+const char*
+data_tok::rstate_str(read_state rstate)
+{
+ switch (rstate)
+ {
+ case NONE:
+ return "NONE";
+ case READ_PART:
+ return "READ_PART";
+ case SKIP_PART:
+ return "SKIP_PART";
+ case READ:
+ return "READ";
+ // Not using default: forces compiler to ensure all cases are covered.
+ }
+ return "<rstate unknown>";
+}
+*/
+
+/*
+void
+data_tok::set_rstate(const read_state rstate)
+{
+ if (_wstate != ENQ && rstate != UNREAD)
+ {
+ std::ostringstream oss;
+ oss << "Attempted to change read state to " << rstate_str(rstate);
+ oss << " while write state is not enqueued (wstate ENQ); wstate=" << wstate_str() << ".";
+ throw jexception(jerrno::JERR_DTOK_ILLEGALSTATE, oss.str(), "data_tok",
+ "set_rstate");
+ }
+ _rstate = rstate;
+}
+*/
+
+void
+data_tok::reset()
+{
+ _wstate = NONE;
+// _rstate = UNREAD;
+ _dsize = 0;
+ _dblks_written = 0;
+// _dblks_read = 0;
+ _pg_cnt = 0;
+ _fid = 0;
+ _rid = 0;
+ _xid.clear();
+}
+
+// debug aid
+std::string
+data_tok::status_str() const
+{
+ std::ostringstream oss;
+ oss << std::hex << std::setfill('0');
+ oss << "dtok id=0x" << _icnt << "; ws=" << wstate_str()/* << "; rs=" << rstate_str()*/;
+ oss << "; fid=0x" << _fid << "; rid=0x" << _rid << "; xid=";
+ for (unsigned i=0; i<_xid.size(); i++)
+ {
+ if (isprint(_xid[i]))
+ oss << _xid[i];
+ else
+ oss << "/" << std::setw(2) << (int)((char)_xid[i]);
+ }
+ oss << "; drid=0x" << _dequeue_rid << " extrid=" << (_external_rid?"T":"F");
+ oss << "; ds=0x" << _dsize << "; dw=0x" << _dblks_written/* << "; dr=0x" << _dblks_read*/;
+ oss << "; pc=0x" << _pg_cnt;
+ return oss.str();
+}
+
+}}
Added: qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/data_tok.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/data_tok.h?rev=1534736&view=auto
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/data_tok.h (added)
+++ qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/data_tok.h Tue Oct 22 19:09:56 2013
@@ -0,0 +1,160 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#ifndef QPID_LEGACYSTORE_JRNL_DATA_TOK_H
+#define QPID_LEGACYSTORE_JRNL_DATA_TOK_H
+
+namespace qpid
+{
+namespace qls_jrnl
+{
+class data_tok;
+}}
+
+#include <cassert>
+#include <cstddef>
+#include "qpid/linearstore/jrnl/smutex.h"
+#include <pthread.h>
+#include <string>
+
+namespace qpid
+{
+namespace qls_jrnl
+{
+
+ /**
+ * \class data_tok
+ * \brief Data block token (data_tok) used to track wstate of a data block through asynchronous
+ * I/O process
+ */
+ class data_tok
+ {
+ public:
+ // TODO: Fix this, separate write state from operation
+ // ie: wstate = NONE, CACHED, PART, SUBM, COMPL
+ // op = ENQUEUE, DEQUEUE, ABORT, COMMIT
+ enum write_state
+ {
+ NONE, ///< Data block not sent to journal
+ ENQ_CACHED, ///< Data block enqueue written to page cache
+ ENQ_PART, ///< Data block part-submitted to AIO, waiting for page buffer to free up
+ ENQ_SUBM, ///< Data block enqueue submitted to AIO
+ ENQ, ///< Data block enqueue AIO write complete (enqueue complete)
+ DEQ_CACHED, ///< Data block dequeue written to page cache
+ DEQ_PART, ///< Data block part-submitted to AIO, waiting for page buffer to free up
+ DEQ_SUBM, ///< Data block dequeue submitted to AIO
+ DEQ, ///< Data block dequeue AIO write complete (dequeue complete)
+ ABORT_CACHED,
+ ABORT_PART,
+ ABORT_SUBM,
+ ABORTED,
+ COMMIT_CACHED,
+ COMMIT_PART,
+ COMMIT_SUBM,
+ COMMITTED
+ };
+
+/*
+ enum read_state
+ {
+ UNREAD, ///< Data block not read
+ READ_PART, ///< Data block is part-read; waiting for page buffer to fill
+ SKIP_PART, ///< Prev. dequeued dblock is part-skipped; waiting for page buffer to fill
+ READ ///< Data block is fully read
+ };
+*/
+
+ protected:
+ static smutex _mutex;
+ static uint64_t _cnt;
+ uint64_t _icnt;
+ write_state _wstate; ///< Enqueued / dequeued state of data
+// read_state _rstate; ///< Read state of data
+ std::size_t _dsize; ///< Data size in bytes
+ uint32_t _dblks_written; ///< Data blocks read/written
+// uint32_t _dblks_read; ///< Data blocks read/written
+ uint32_t _pg_cnt; ///< Page counter - incr for each page containing part of data
+ uint64_t _fid; ///< FID containing header of enqueue record
+ uint64_t _rid; ///< RID of data set by enqueue operation
+ std::string _xid; ///< XID set by enqueue operation
+ uint64_t _dequeue_rid; ///< RID of data set by dequeue operation
+ bool _external_rid; ///< Flag to indicate external setting of rid
+
+ public:
+ data_tok();
+ virtual ~data_tok();
+
+ inline uint64_t id() const { return _icnt; }
+ inline write_state wstate() const { return _wstate; }
+ const char* wstate_str() const;
+ static const char* wstate_str(write_state wstate);
+// inline read_state rstate() const { return _rstate; }
+// const char* rstate_str() const;
+// static const char* rstate_str(read_state rstate);
+ inline bool is_writable() const { return _wstate == NONE || _wstate == ENQ_PART; }
+ inline bool is_enqueued() const { return _wstate == ENQ; }
+ inline bool is_readable() const { return _wstate == ENQ; }
+// inline bool is_read() const { return _rstate == READ; }
+ inline bool is_dequeueable() const { return _wstate == ENQ || _wstate == DEQ_PART; }
+ inline void set_wstate(const write_state wstate) { _wstate = wstate; }
+// void set_rstate(const read_state rstate);
+ inline std::size_t dsize() const { return _dsize; }
+ inline void set_dsize(std::size_t dsize) { _dsize = dsize; }
+
+ inline uint32_t dblocks_written() const { return _dblks_written; }
+ inline void incr_dblocks_written(uint32_t dblks_written)
+ { _dblks_written += dblks_written; }
+ inline void set_dblocks_written(uint32_t dblks_written) { _dblks_written = dblks_written; }
+
+// inline uint32_t dblocks_read() const { return _dblks_read; }
+// inline void incr_dblocks_read(uint32_t dblks_read) { _dblks_read += dblks_read; }
+// inline void set_dblocks_read(uint32_t dblks_read) { _dblks_read = dblks_read; }
+
+ inline uint32_t pg_cnt() const { return _pg_cnt; }
+ inline uint32_t incr_pg_cnt() { return ++_pg_cnt; }
+ inline uint32_t decr_pg_cnt() { assert(_pg_cnt != 0); return --_pg_cnt; }
+
+ inline uint64_t fid() const { return _fid; }
+ inline void set_fid(const uint64_t fid) { _fid = fid; }
+ inline uint64_t rid() const { return _rid; }
+ inline void set_rid(const uint64_t rid) { _rid = rid; }
+ inline uint64_t dequeue_rid() const {return _dequeue_rid; }
+ inline void set_dequeue_rid(const uint64_t rid) { _dequeue_rid = rid; }
+ inline bool external_rid() const { return _external_rid; }
+ inline void set_external_rid(const bool external_rid) { _external_rid = external_rid; }
+
+ inline bool has_xid() const { return !_xid.empty(); }
+ inline const std::string& xid() const { return _xid; }
+ inline void clear_xid() { _xid.clear(); }
+ inline void set_xid(const std::string& xid) { _xid.assign(xid); }
+ inline void set_xid(const void* xidp, const std::size_t xid_len)
+ { _xid.assign((const char*)xidp, xid_len); }
+
+ void reset();
+
+ // debug aid
+ std::string status_str() const;
+ };
+
+} // namespace qls_jrnl
+} // namespace jrnl
+
+#endif // ifndef QPID_LEGACYSTORE_JRNL_DATA_TOK_H
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org