You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kp...@apache.org on 2013/10/07 20:39:25 UTC
svn commit: r1530024 [1/4] - in /qpid/branches/linearstore/qpid/cpp/src: ./
qpid/linearstore/ qpid/linearstore/jrnl/ qpid/linearstore/jrnl/utils/
tests/linearstore/
Author: kpvdr
Date: Mon Oct 7 18:39:24 2013
New Revision: 1530024
URL: http://svn.apache.org/r1530024
Log:
QPID-4984: WIP - Compiles, but functionally incomplete
Added:
qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/AtomicCounter.h
qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/LinearFileController.cpp
- copied, changed from r1525056, qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/JournalFileController.cpp
qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/LinearFileController.h
- copied, changed from r1525056, qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/JournalFileController.h
qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/RecoveryManager.cpp
qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/RecoveryManager.h
qpid/branches/linearstore/qpid/cpp/src/tests/linearstore/
qpid/branches/linearstore/qpid/cpp/src/tests/linearstore/linearstoredirsetup.sh (with props)
Removed:
qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/JournalFileController.cpp
qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/JournalFileController.h
Modified:
qpid/branches/linearstore/qpid/cpp/src/linearstore.cmake
qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/EmptyFilePoolManagerImpl.cpp
qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/JournalImpl.cpp
qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/JournalImpl.h
qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/MessageStoreImpl.cpp
qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/MessageStoreImpl.h
qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/EmptyFilePool.cpp
qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/EmptyFilePool.h
qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/EmptyFilePoolManager.cpp
qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/EmptyFilePoolManager.h
qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/EmptyFilePoolPartition.cpp
qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/EmptyFilePoolPartition.h
qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/EmptyFilePoolTypes.h
qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/JournalFile.cpp
qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/JournalFile.h
qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/JournalLog.cpp
qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/data_tok.cpp
qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/data_tok.h
qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/deq_rec.cpp
qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/enq_map.cpp
qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/enq_map.h
qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/enq_rec.cpp
qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/enums.h
qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/jcfg.h
qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/jcntl.cpp
qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/jcntl.h
qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/jdir.cpp
qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/jdir.h
qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/jerrno.cpp
qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/jerrno.h
qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/jrec.h
qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/pmgr.cpp
qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/pmgr.h
qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/rcvdat.h
qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/rmgr.cpp
qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/txn_map.cpp
qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/txn_map.h
qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/txn_rec.cpp
qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/utils/enq_hdr.c
qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/utils/enq_hdr.h
qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/utils/file_hdr.c
qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/utils/file_hdr.h
qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/wmgr.cpp
qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/wmgr.h
Modified: qpid/branches/linearstore/qpid/cpp/src/linearstore.cmake
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/linearstore.cmake?rev=1530024&r1=1530023&r2=1530024&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/linearstore.cmake (original)
+++ qpid/branches/linearstore/qpid/cpp/src/linearstore.cmake Mon Oct 7 18:39:24 2013
@@ -83,27 +83,20 @@ if (BUILD_LINEARSTORE)
qpid/linearstore/jrnl/EmptyFilePool.cpp
qpid/linearstore/jrnl/EmptyFilePoolManager.cpp
qpid/linearstore/jrnl/EmptyFilePoolPartition.cpp
- #qpid/linearstore/jrnl/fcntl.cpp
qpid/linearstore/jrnl/jcntl.cpp
qpid/linearstore/jrnl/jdir.cpp
qpid/linearstore/jrnl/jerrno.cpp
qpid/linearstore/jrnl/jexception.cpp
- #qpid/linearstore/jrnl/jinf.cpp
qpid/linearstore/jrnl/JournalFile.cpp
- qpid/linearstore/jrnl/JournalFileController.cpp
qpid/linearstore/jrnl/JournalLog.cpp
qpid/linearstore/jrnl/jrec.cpp
- #qpid/linearstore/jrnl/lp_map.cpp
- #qpid/linearstore/jrnl/lpmgr.cpp
+ qpid/linearstore/jrnl/LinearFileController.cpp
qpid/linearstore/jrnl/pmgr.cpp
- qpid/linearstore/jrnl/rmgr.cpp
- #qpid/linearstore/jrnl/rfc.cpp
- #qpid/linearstore/jrnl/rrfc.cpp
+ qpid/linearstore/jrnl/RecoveryManager.cpp
qpid/linearstore/jrnl/time_ns.cpp
qpid/linearstore/jrnl/txn_map.cpp
qpid/linearstore/jrnl/txn_rec.cpp
qpid/linearstore/jrnl/wmgr.cpp
- #qpid/linearstore/jrnl/wrfc.cpp
)
# linearstore source files
Modified: qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/EmptyFilePoolManagerImpl.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/EmptyFilePoolManagerImpl.cpp?rev=1530024&r1=1530023&r2=1530024&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/EmptyFilePoolManagerImpl.cpp (original)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/EmptyFilePoolManagerImpl.cpp Mon Oct 7 18:39:24 2013
@@ -49,8 +49,8 @@ void EmptyFilePoolManagerImpl::findEfpPa
QLS_LOG(info, " * Partition " << (*i)->partitionNumber() << " containing " << filePoolList.size() << " pool" <<
(filePoolList.size()>1 ? "s" : "") << " at \'" << (*i)->partitionDirectory() << "\'");
for (std::vector<qpid::qls_jrnl::EmptyFilePool*>::const_iterator j=filePoolList.begin(); j!=filePoolList.end(); ++j) {
- QLS_LOG(info, " - EFP \'" << (*j)->fileSizeKib() << "k\' containing " << (*j)->numEmptyFiles() <<
- " files of size " << (*j)->fileSizeKib() << " KiB totaling " << (*j)->cumFileSizeKib() << " KiB");
+ QLS_LOG(info, " - EFP \'" << (*j)->dataSize_kib() << "k\' containing " << (*j)->numEmptyFiles() <<
+ " files of size " << (*j)->dataSize_kib() << " KiB totaling " << (*j)->cumFileSize_kib() << " KiB");
}
}
}
Modified: qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/JournalImpl.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/JournalImpl.cpp?rev=1530024&r1=1530023&r2=1530024&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/JournalImpl.cpp (original)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/JournalImpl.cpp Mon Oct 7 18:39:24 2013
@@ -62,7 +62,6 @@ JournalImpl::JournalImpl(qpid::sys::Time
jcntl(journalId, journalDirectory/*, journalBaseFilename*/),
timer(timer_),
getEventsTimerSetFlag(false),
- efpp(0),
// lastReadRid(0),
writeActivityFlag(false),
flushTriggeredFlag(true),
@@ -119,8 +118,8 @@ JournalImpl::initManagement(qpid::manage
_mgmtObject->set_name(_jid);
_mgmtObject->set_directory(_jdir.dirname());
// _mgmtObject->set_baseFileName(_base_filename);
- _mgmtObject->set_readPageSize(JRNL_RMGR_PAGE_SIZE * JRNL_SBLK_SIZE);
- _mgmtObject->set_readPages(JRNL_RMGR_PAGES);
+// _mgmtObject->set_readPageSize(JRNL_RMGR_PAGE_SIZE * JRNL_SBLK_SIZE);
+// _mgmtObject->set_readPages(JRNL_RMGR_PAGES);
// The following will be set on initialize(), but being properties, these must be set to 0 in the meantime
//_mgmtObject->set_initialFileCount(0);
@@ -140,7 +139,6 @@ JournalImpl::initialize(qpid::qls_jrnl::
const uint32_t wcache_pgsize_sblks,
qpid::qls_jrnl::aio_callback* const cbp)
{
- efpp = efpp_;
// efpp->createJournal(_jdir);
// QLS_LOG2(notice, _jid, "Initialized");
// std::ostringstream oss;
@@ -150,7 +148,7 @@ JournalImpl::initialize(qpid::qls_jrnl::
// oss << " wcache_pgsize_sblks=" << wcache_pgsize_sblks;
// oss << " wcache_num_pages=" << wcache_num_pages;
// QLS_LOG2(debug, _jid, oss.str());
- jcntl::initialize(/*num_jfiles, auto_expand, ae_max_jfiles, jfsize_sblks,*/ efpp, wcache_num_pages, wcache_pgsize_sblks, cbp);
+ jcntl::initialize(/*num_jfiles, auto_expand, ae_max_jfiles, jfsize_sblks,*/ efpp_, wcache_num_pages, wcache_pgsize_sblks, cbp);
// QLS_LOG2(debug, _jid, "Initialization complete");
// TODO: replace for linearstore: _lpmgr
/*
@@ -175,6 +173,7 @@ JournalImpl::recover(/*const uint16_t nu
const bool auto_expand,
const uint16_t ae_max_jfiles,
const uint32_t jfsize_sblks,*/
+ boost::shared_ptr<qpid::qls_jrnl::EmptyFilePoolManager> efpm,
const uint16_t wcache_num_pages,
const uint32_t wcache_pgsize_sblks,
qpid::qls_jrnl::aio_callback* const cbp,
@@ -210,10 +209,10 @@ JournalImpl::recover(/*const uint16_t nu
prep_xid_list.push_back(i->xid);
}
- jcntl::recover(/*num_jfiles, auto_expand, ae_max_jfiles, jfsize_sblks,*/ wcache_num_pages, wcache_pgsize_sblks,
+ jcntl::recover(/*num_jfiles, auto_expand, ae_max_jfiles, jfsize_sblks,*/efpm.get(), wcache_num_pages, wcache_pgsize_sblks,
cbp, &prep_xid_list, highest_rid);
} else {
- jcntl::recover(/*num_jfiles, auto_expand, ae_max_jfiles, jfsize_sblks,*/ wcache_num_pages, wcache_pgsize_sblks,
+ jcntl::recover(/*num_jfiles, auto_expand, ae_max_jfiles, jfsize_sblks,*/efpm.get(), wcache_num_pages, wcache_pgsize_sblks,
cbp, 0, highest_rid);
}
@@ -559,9 +558,11 @@ JournalImpl::wr_aio_cb(std::vector<data_
switch (dtokp->wstate())
{
case data_tok::ENQ:
+ std::cout << "<<<>>> JournalImpl::wr_aio_cb() ENQ dtokp rid=" << dtokp->rid() << std::endl << std::flush; // DEBUG
dtokp->getSourceMessage()->enqueueComplete();
break;
case data_tok::DEQ:
+ std::cout << "<<<>>> JournalImpl::wr_aio_cb() DEQ dtokp rid=" << dtokp->rid() << std::endl << std::flush; // DEBUG
/* Don't need to signal until we have a way to ack completion of dequeue in AMQP
dtokp->getSourceMessage()->dequeueComplete();
if ( dtokp->getSourceMessage()->isDequeueComplete() ) // clear id after last dequeue
@@ -607,16 +608,7 @@ JournalImpl::handleIoResult(const iores
{
case qpid::qls_jrnl::RHM_IORES_SUCCESS:
return;
- case qpid::qls_jrnl::RHM_IORES_ENQCAPTHRESH:
- {
- std::ostringstream oss;
- oss << "Enqueue capacity threshold exceeded.";
- QLS_LOG2(warning, _jid, oss.str());
- if (_agent != 0)
- _agent->raiseEvent(qmf::org::apache::qpid::linearstore::EventEnqThresholdExceeded(_jid, "Journal enqueue capacity threshold exceeded"),
- qpid::management::ManagementAgent::SEV_WARN);
- THROW_STORE_FULL_EXCEPTION(oss.str());
- }
+/*
case qpid::qls_jrnl::RHM_IORES_FULL:
{
std::ostringstream oss;
@@ -626,6 +618,7 @@ JournalImpl::handleIoResult(const iores
_agent->raiseEvent(qmf::org::apache::qpid::linearstore::EventFull(_jid, "Journal full"), qpid::management::ManagementAgent::SEV_ERROR);
THROW_STORE_FULL_EXCEPTION(oss.str());
}
+*/
default:
{
std::ostringstream oss;
Modified: qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/JournalImpl.h
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/JournalImpl.h?rev=1530024&r1=1530023&r2=1530024&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/JournalImpl.h (original)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/JournalImpl.h Mon Oct 7 18:39:24 2013
@@ -79,30 +79,16 @@ class JournalImpl : public qpid::broker:
typedef boost::function<void (JournalImpl&)> DeleteCallback;
private:
-// static qpid::sys::Mutex _static_lock;
-// static uint32_t cnt;
-
qpid::sys::Timer& timer;
bool getEventsTimerSetFlag;
boost::intrusive_ptr<qpid::sys::TimerTask> getEventsFireEventsPtr;
qpid::sys::Mutex _getf_lock;
qpid::sys::Mutex _read_lock;
- qpid::qls_jrnl::EmptyFilePool* efpp;
-
-// uint64_t lastReadRid; // rid of last read msg for loadMsgContent() - detects out-of-order read requests
-// std::vector<uint64_t> oooRidList; // list of out-of-order rids (greater than current rid) encountered during read sequence
bool writeActivityFlag;
bool flushTriggeredFlag;
boost::intrusive_ptr<qpid::sys::TimerTask> inactivityFireEventPtr;
- // temp local vars for loadMsgContent below
-// void* _xidp;
-// void* _datap;
-// size_t _dlen;
-// qpid::qls_jrnl::data_tok _dtok;
-// bool _external;
-
qpid::management::ManagementAgent* _agent;
qmf::org::apache::qpid::linearstore::Journal::shared_ptr _mgmtObject;
DeleteCallback deleteCallback;
@@ -112,7 +98,6 @@ class JournalImpl : public qpid::broker:
JournalImpl(qpid::sys::Timer& timer,
const std::string& journalId,
const std::string& journalDirectory,
-// const std::string& journalBaseFilename,
const qpid::sys::Duration getEventsTimeout,
const qpid::sys::Duration flushTimeout,
qpid::management::ManagementAgent* agent,
@@ -122,29 +107,18 @@ class JournalImpl : public qpid::broker:
void initManagement(qpid::management::ManagementAgent* agent);
- void initialize(/*const uint16_t num_jfiles,
- const bool auto_expand,
- const uint16_t ae_max_jfiles,
- const uint32_t jfsize_sblks,*/
- qpid::qls_jrnl::EmptyFilePool* efp,
+ void initialize(qpid::qls_jrnl::EmptyFilePool* efp,
const uint16_t wcache_num_pages,
const uint32_t wcache_pgsize_sblks,
qpid::qls_jrnl::aio_callback* const cbp);
- inline void initialize(/*const uint16_t num_jfiles,
- const bool auto_expand,
- const uint16_t ae_max_jfiles,
- const uint32_t jfsize_sblks,*/
- qpid::qls_jrnl::EmptyFilePool* efp,
+ inline void initialize(qpid::qls_jrnl::EmptyFilePool* efpp,
const uint16_t wcache_num_pages,
const uint32_t wcache_pgsize_sblks) {
- initialize(/*num_jfiles, auto_expand, ae_max_jfiles, jfsize_sblks,*/ efp, wcache_num_pages, wcache_pgsize_sblks, this);
+ initialize(efpp, wcache_num_pages, wcache_pgsize_sblks, this);
}
- void recover(/*const uint16_t num_jfiles,
- const bool auto_expand,
- const uint16_t ae_max_jfiles,
- const uint32_t jfsize_sblks,*/
+ void recover(boost::shared_ptr<qpid::qls_jrnl::EmptyFilePoolManager> efpm,
const uint16_t wcache_num_pages,
const uint32_t wcache_pgsize_sblks,
qpid::qls_jrnl::aio_callback* const cbp,
@@ -152,17 +126,13 @@ class JournalImpl : public qpid::broker:
uint64_t& highest_rid,
uint64_t queue_id);
- inline void recover(/*const uint16_t num_jfiles,
- const bool auto_expand,
- const uint16_t ae_max_jfiles,
- const uint32_t jfsize_sblks,*/
+ inline void recover(boost::shared_ptr<qpid::qls_jrnl::EmptyFilePoolManager> efpm,
const uint16_t wcache_num_pages,
const uint32_t wcache_pgsize_sblks,
boost::ptr_list<PreparedTransaction>* prep_tx_list_ptr,
uint64_t& highest_rid,
uint64_t queue_id) {
- recover(/*num_jfiles, auto_expand, ae_max_jfiles, jfsize_sblks,*/ wcache_num_pages, wcache_pgsize_sblks,
- this, prep_tx_list_ptr, highest_rid, queue_id);
+ recover(efpm, wcache_num_pages, wcache_pgsize_sblks, this, prep_tx_list_ptr, highest_rid, queue_id);
}
void recover_complete();
@@ -197,10 +167,6 @@ class JournalImpl : public qpid::broker:
void stop(bool block_till_aio_cmpl = false);
- // Logging
-// void log(qpid::qls_jrnl::log_level level, const std::string& log_stmt) const;
-// void log(qpid::qls_jrnl::log_level level, const char* const log_stmt) const;
-
// Overrides for get_events timer
qpid::qls_jrnl::iores flush(const bool block_till_aio_cmpl = false);
@@ -249,15 +215,15 @@ class TplJournalImpl : public JournalImp
TplJournalImpl(qpid::sys::Timer& timer,
const std::string& journalId,
const std::string& journalDirectory,
-// const std::string& journalBaseFilename,
const qpid::sys::Duration getEventsTimeout,
const qpid::sys::Duration flushTimeout,
qpid::management::ManagementAgent* agent) :
- JournalImpl(timer, journalId, journalDirectory/*, journalBaseFilename*/, getEventsTimeout, flushTimeout, agent)
+ JournalImpl(timer, journalId, journalDirectory, getEventsTimeout, flushTimeout, agent)
{}
virtual ~TplJournalImpl() {}
+/*
// Special version of read_data_record that ignores transactions - needed when reading the TPL
inline qpid::qls_jrnl::iores read_data_record(void** const datapp, std::size_t& dsize,
void** const xidpp, std::size_t& xidsize, bool& transient, bool& external,
@@ -265,6 +231,7 @@ class TplJournalImpl : public JournalImp
return JournalImpl::read_data_record(datapp, dsize, xidpp, xidsize, transient, external, dtokp, true);
}
inline void read_reset() { _rmgr.invalidate(); }
+*/
}; // class TplJournalImpl
} // namespace msgstore
Modified: qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/MessageStoreImpl.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/MessageStoreImpl.cpp?rev=1530024&r1=1530023&r2=1530024&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/MessageStoreImpl.cpp (original)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/MessageStoreImpl.cpp Mon Oct 7 18:39:24 2013
@@ -63,7 +63,7 @@ MessageStoreImpl::TplRecoverStruct::TplR
MessageStoreImpl::MessageStoreImpl(qpid::broker::Broker* broker_, const char* envpath_) :
defaultEfpPartitionNumber(0),
- defaultEfpFileSizeKib(0),
+ defaultEfpFileSize_kib(0),
truncateFlag(false),
wCachePgSizeSblks(0),
wCacheNumPages(0),
@@ -83,7 +83,7 @@ uint32_t MessageStoreImpl::chkJrnlWrPage
if (p == 0) {
// For zero value, use default
- p = JRNL_WMGR_DEF_PAGE_SIZE * JRNL_SBLK_SIZE / 1024;
+ p = JRNL_WMGR_DEF_PAGE_SIZE_KIB;
QLS_LOG(warning, "parameter " << paramName_ << " (" << param_ << ") must be a power of 2 between 1 and 128; changing this parameter to default value (" << p << ")");
} else if ( p > 128 || (p & (p-1)) ) {
// For any positive value that is not a power of 2, use closest value
@@ -100,22 +100,22 @@ uint32_t MessageStoreImpl::chkJrnlWrPage
uint16_t MessageStoreImpl::getJrnlWrNumPages(const uint32_t wrPageSizeKib_)
{
- uint32_t wrPageSizeSblks = wrPageSizeKib_ * 1024 / JRNL_SBLK_SIZE; // convert from KiB to number sblks
- uint32_t defTotWCacheSize = JRNL_WMGR_DEF_PAGE_SIZE * JRNL_WMGR_DEF_PAGES; // in sblks. Currently 2014 sblks (1 MiB).
+ uint32_t wrPageSizeSblks = wrPageSizeKib_ / JRNL_SBLK_SIZE_KIB; // convert from KiB to number sblks
+ uint32_t defTotWCacheSizeSblks = JRNL_WMGR_DEF_PAGE_SIZE_SBLKS * JRNL_WMGR_DEF_PAGES;
switch (wrPageSizeKib_)
{
case 1:
case 2:
case 4:
// 256 KiB total cache
- return defTotWCacheSize / wrPageSizeSblks / 4;
+ return defTotWCacheSizeSblks / wrPageSizeSblks / 4;
case 8:
case 16:
// 512 KiB total cache
- return defTotWCacheSize / wrPageSizeSblks / 2;
+ return defTotWCacheSizeSblks / wrPageSizeSblks / 2;
default: // 32, 64, 128
// 1 MiB total cache
- return defTotWCacheSize / wrPageSizeSblks;
+ return defTotWCacheSizeSblks / wrPageSizeSblks;
}
}
@@ -125,7 +125,7 @@ qpid::qls_jrnl::efpPartitionNumber_t Mes
return partition_;
}
-qpid::qls_jrnl::efpFileSizeKib_t MessageStoreImpl::chkEfpFileSizeKiB(const qpid::qls_jrnl::efpFileSizeKib_t efpFileSizeKib_,
+qpid::qls_jrnl::efpDataSize_kib_t MessageStoreImpl::chkEfpFileSizeKiB(const qpid::qls_jrnl::efpDataSize_kib_t efpFileSizeKib_,
const std::string& paramName_) {
uint8_t rem = efpFileSizeKib_ % uint64_t(JRNL_SBLK_SIZE_KIB);
if (rem != 0) {
@@ -154,7 +154,7 @@ void MessageStoreImpl::initManagement ()
mgmtObject->set_location(storeDir);
mgmtObject->set_tplIsInitialized(false);
mgmtObject->set_tplDirectory(getTplBaseDir());
- mgmtObject->set_tplWritePageSize(tplWCachePgSizeSblks * JRNL_SBLK_SIZE);
+ mgmtObject->set_tplWritePageSize(tplWCachePgSizeSblks * JRNL_SBLK_SIZE_BYTES);
mgmtObject->set_tplWritePages(tplWCacheNumPages);
agent->addObject(mgmtObject, 0, true);
@@ -172,36 +172,30 @@ bool MessageStoreImpl::init(const qpid::
// Extract and check options
const StoreOptions* opts = static_cast<const StoreOptions*>(options_);
qpid::qls_jrnl::efpPartitionNumber_t efpPartition = chkEfpPartition(opts->efpPartition, "efp-partition");
- qpid::qls_jrnl::efpFileSizeKib_t efpFilePoolSize = chkEfpFileSizeKiB(opts->efpFileSizeKib, "efp-file-size");
+ qpid::qls_jrnl::efpDataSize_kib_t efpFilePoolSize_kib = chkEfpFileSizeKiB(opts->efpFileSizeKib, "efp-file-size");
uint32_t jrnlWrCachePageSizeKib = chkJrnlWrPageCacheSize(opts->wCachePageSizeKib, "wcache-page-size");
uint32_t tplJrnlWrCachePageSizeKib = chkJrnlWrPageCacheSize(opts->tplWCachePageSizeKib, "tpl-wcache-page-size");
// Pass option values to init()
- return init(opts->storeDir, efpPartition, efpFilePoolSize, opts->truncateFlag, jrnlWrCachePageSizeKib, tplJrnlWrCachePageSizeKib);
+ return init(opts->storeDir, efpPartition, efpFilePoolSize_kib, opts->truncateFlag, jrnlWrCachePageSizeKib, tplJrnlWrCachePageSizeKib);
}
// These params, taken from options, are assumed to be correct and verified
bool MessageStoreImpl::init(const std::string& storeDir_,
- /*uint16_t jfiles,
- uint32_t jfileSizePgs,*/
qpid::qls_jrnl::efpPartitionNumber_t efpPartition_,
- qpid::qls_jrnl::efpFileSizeKib_t efpFileSizeKib_,
+ qpid::qls_jrnl::efpDataSize_kib_t efpFileSize_kib_,
const bool truncateFlag_,
uint32_t wCachePageSizeKib_,
- /*uint16_t tplJfiles,
- uint32_t tplJfileSizePgs,*/
uint32_t tplWCachePageSizeKib_)
- /*bool autoJExpand,
- uint16_t autoJExpandMaxFiles)*/
{
if (isInit) return true;
// Set geometry members (converting to correct units where req'd)
defaultEfpPartitionNumber = efpPartition_;
- defaultEfpFileSizeKib = efpFileSizeKib_;
- wCachePgSizeSblks = wCachePageSizeKib_ * 1024 / JRNL_SBLK_SIZE; // convert from KiB to number sblks
+ defaultEfpFileSize_kib = efpFileSize_kib_;
+ wCachePgSizeSblks = wCachePageSizeKib_ / JRNL_SBLK_SIZE_KIB; // convert from KiB to number sblks
wCacheNumPages = getJrnlWrNumPages(wCachePageSizeKib_);
- tplWCachePgSizeSblks = tplWCachePageSizeKib_ * 1024 / JRNL_SBLK_SIZE; // convert from KiB to number sblks
+ tplWCachePgSizeSblks = tplWCachePageSizeKib_ / JRNL_SBLK_SIZE_KIB; // convert from KiB to number sblks
tplWCacheNumPages = getJrnlWrNumPages(tplWCachePageSizeKib_);
if (storeDir_.size()>0) storeDir = storeDir_;
@@ -212,13 +206,13 @@ bool MessageStoreImpl::init(const std::s
QLS_LOG(notice, "Store module initialized; store-dir=" << storeDir_);
QLS_LOG(info, "> Default EFP partition: " << defaultEfpPartitionNumber);
- QLS_LOG(info, "> Default EFP file size: " << defaultEfpFileSizeKib << " (KiB)");
+ QLS_LOG(info, "> Default EFP file size: " << defaultEfpFileSize_kib << " (KiB)");
QLS_LOG(info, "> Default write cache page size: " << wCachePageSizeKib_ << " (KiB)");
QLS_LOG(info, "> Default number of write cache pages: " << wCacheNumPages);
QLS_LOG(info, "> TPL write cache page size: " << tplWCachePageSizeKib_ << " (KiB)");
QLS_LOG(info, "> TPL number of write cache pages: " << tplWCacheNumPages);
QLS_LOG(info, "> EFP partition: " << defaultEfpPartitionNumber);
- QLS_LOG(info, "> EFP file size pool: " << defaultEfpFileSizeKib << " (KiB)");
+ QLS_LOG(info, "> EFP file size pool: " << defaultEfpFileSize_kib << " (KiB)");
return isInit;
}
@@ -273,7 +267,7 @@ void MessageStoreImpl::init()
// NOTE: during normal initialization, agent == 0 because the store is initialized before the management infrastructure.
// However during a truncated initialization in a cluster, agent != 0. We always pass 0 as the agent for the
// TplStore to keep things consistent in a cluster. See https://bugzilla.redhat.com/show_bug.cgi?id=681026
- tplStorePtr.reset(new TplJournalImpl(broker->getTimer(), "TplStore", getTplBaseDir(), /*"tpl",*/ defJournalGetEventsTimeout, defJournalFlushTimeout, 0));
+ tplStorePtr.reset(new TplJournalImpl(broker->getTimer(), "TplStore", getTplBaseDir(), defJournalGetEventsTimeout, defJournalFlushTimeout, 0));
isInit = true;
} catch (const DbException& e) {
if (e.get_errno() == DB_VERSION_MISMATCH)
@@ -350,7 +344,7 @@ void MessageStoreImpl::chkTplStoreInit()
qpid::sys::Mutex::ScopedLock sl(tplInitLock);
if (!tplStorePtr->is_ready()) {
qpid::qls_jrnl::jdir::create_dir(getTplBaseDir());
- tplStorePtr->initialize(/*tplNumJrnlFiles, false, 0, tplJrnlFsizeSblks,*/ getEmptyFilePool(defaultEfpPartitionNumber, defaultEfpFileSizeKib), tplWCacheNumPages, tplWCachePgSizeSblks);
+ tplStorePtr->initialize(/*tplNumJrnlFiles, false, 0, tplJrnlFsizeSblks,*/ getEmptyFilePool(defaultEfpPartitionNumber, defaultEfpFileSize_kib), tplWCacheNumPages, tplWCachePgSizeSblks);
if (mgmtObject.get() != 0) mgmtObject->set_tplIsInitialized(true);
}
}
@@ -403,26 +397,13 @@ void MessageStoreImpl::create(qpid::brok
}
JournalImpl* jQueue = 0;
-// uint16_t localFileCount = numJrnlFiles;
-// bool localAutoExpandFlag = autoJrnlExpand;
-// uint16_t localAutoExpandMaxFileCount = autoJrnlExpandMaxFiles;
-// uint32_t localFileSizeSblks = jrnlFsizeSblks;
-//
-// value = args.get("qpid.file_count");
-// if (value.get() != 0 && !value->empty() && value->convertsTo<int>())
-// localFileCount = chkJrnlNumFilesParam((uint16_t) value->get<int>(), "qpid.file_count");
-//
-// value = args.get("qpid.file_size");
-// if (value.get() != 0 && !value->empty() && value->convertsTo<int>())
-// localFileSizeSblks = chkJrnlFileSizeParam((uint32_t) value->get<int>(), "qpid.file_size", wCachePgSizeSblks) * JRNL_RMGR_PAGE_SIZE;
-
if (queue_.getName().size() == 0)
{
QLS_LOG(error, "Cannot create store for empty (null) queue name - ignoring and attempting to continue.");
return;
}
- jQueue = new JournalImpl(broker->getTimer(), queue_.getName(), getJrnlDir(queue_), /*std::string("JournalData"),*/
+ jQueue = new JournalImpl(broker->getTimer(), queue_.getName(), getJrnlDir(queue_.getName()),
defJournalGetEventsTimeout, defJournalFlushTimeout, agent,
boost::bind(&MessageStoreImpl::journalDeleted, this, _1));
{
@@ -430,32 +411,8 @@ void MessageStoreImpl::create(qpid::brok
journalList[queue_.getName()]=jQueue;
}
-// value = args.get("qpid.auto_expand");
-// if (value.get() != 0 && !value->empty() && value->convertsTo<bool>())
-// localAutoExpandFlag = (bool) value->get<bool>();
-//
-// value = args.get("qpid.auto_expand_max_jfiles");
-// if (value.get() != 0 && !value->empty() && value->convertsTo<int>())
-// localAutoExpandMaxFileCount = (uint16_t) value->get<int>();
-/*
- qpid::framing::FieldTable::ValuePtr value;
- qpid::qls_jrnl::efpPartitionNumber_t localEfpPartition = efpPartition;
- value = args_.get("qpid.efp_partition");
- if (value.get() != 0 && !value->empty() && value->convertsTo<int>()) {
- localEfpPartition = chkEfpPartition((uint32_t)value->get<int>(), "qpid.efp_partition");
- }
-
- qpid::qls_jrnl::efpFileSizeKib_t localEfpFileSizeKib = efpFileSizeKib;
- value = args_.get("qpid.efp_file_size");
- if (value.get() != 0 && !value->empty() && value->convertsTo<int>()) {
- localEfpFileSizeKib = chkEfpFileSizeKiB((uint32_t)value->get<int>(),"qpid.efp_file_size" );
- }
-*/
-
queue_.setExternalQueueStore(dynamic_cast<qpid::broker::ExternalQueueStore*>(jQueue));
try {
- // init will create the deque's for the init...
-// jQueue->initialize(localFileCount, localAutoExpandFlag, localAutoExpandMaxFileCount, localFileSizeSblks, wCacheNumPages, wCachePgSizeSblks);
jQueue->initialize(getEmptyFilePool(args_), wCacheNumPages, wCachePgSizeSblks);
} catch (const qpid::qls_jrnl::jexception& e) {
THROW_STORE_EXCEPTION(std::string("Queue ") + queue_.getName() + ": create() failed: " + e.what());
@@ -471,7 +428,7 @@ void MessageStoreImpl::create(qpid::brok
qpid::qls_jrnl::EmptyFilePool*
MessageStoreImpl::getEmptyFilePool(const qpid::qls_jrnl::efpPartitionNumber_t efpPartitionNumber_,
- const qpid::qls_jrnl::efpFileSizeKib_t efpFileSizeKib_) {
+ const qpid::qls_jrnl::efpDataSize_kib_t efpFileSizeKib_) {
qpid::qls_jrnl::EmptyFilePool* efpp = efpMgr->getEmptyFilePool(efpPartitionNumber_, efpFileSizeKib_);
if (efpp == 0) {
std::ostringstream oss;
@@ -490,7 +447,7 @@ MessageStoreImpl::getEmptyFilePool(const
localEfpPartition = chkEfpPartition((uint32_t)value->get<int>(), "qpid.efp_partition");
}
- qpid::qls_jrnl::efpFileSizeKib_t localEfpFileSizeKib = defaultEfpFileSizeKib;
+ qpid::qls_jrnl::efpDataSize_kib_t localEfpFileSizeKib = defaultEfpFileSize_kib;
value = args_.get("qpid.efp_file_size");
if (value.get() != 0 && !value->empty() && value->convertsTo<int>()) {
localEfpFileSizeKib = chkEfpFileSizeKiB((uint32_t)value->get<int>(),"qpid.efp_file_size" );
@@ -501,21 +458,19 @@ MessageStoreImpl::getEmptyFilePool(const
void MessageStoreImpl::destroy(qpid::broker::PersistableQueue& queue_)
{
QLS_LOG(info, "*** MessageStoreImpl::destroy() queue=\"" << queue_.getName() << "\"");
-/*
checkInit();
- destroy(queueDb, queue);
- deleteBindingsForQueue(queue);
- qpid::broker::ExternalQueueStore* eqs = queue.getExternalQueueStore();
+ destroy(queueDb, queue_);
+ deleteBindingsForQueue(queue_);
+ qpid::broker::ExternalQueueStore* eqs = queue_.getExternalQueueStore();
if (eqs) {
JournalImpl* jQueue = static_cast<JournalImpl*>(eqs);
jQueue->delete_jrnl_files();
- queue.setExternalQueueStore(0); // will delete the journal if exists
+ queue_.setExternalQueueStore(0); // will delete the journal if exists
{
qpid::sys::Mutex::ScopedLock sl(journalListLock);
- journalList.erase(queue.getName());
+ journalList.erase(queue_.getName());
}
}
-*/
}
void MessageStoreImpl::create(const qpid::broker::PersistableExchange& exchange_,
@@ -727,14 +682,13 @@ void MessageStoreImpl::recover(qpid::bro
registry_.recoveryComplete();
}
-void MessageStoreImpl::recoverQueues(TxnCtxt& /*txn*/,
- qpid::broker::RecoveryManager& /*registry*/,
- queue_index& /*queue_index*/,
- txn_list& /*prepared*/,
- message_index& /*messages*/)
+void MessageStoreImpl::recoverQueues(TxnCtxt& txn,
+ qpid::broker::RecoveryManager& registry,
+ queue_index& queue_index,
+ txn_list& prepared,
+ message_index& messages)
{
QLS_LOG(info, "*** MessageStoreImpl::recoverQueues()");
-/*
Cursor queues;
queues.open(queueDb, txn.get());
@@ -757,8 +711,8 @@ void MessageStoreImpl::recoverQueues(Txn
QLS_LOG(error, "Cannot recover empty (null) queue name - ignoring and attempting to continue.");
break;
}
- jQueue = new JournalImpl(broker->getTimer(), queueName, getJrnlHashDir(queueName), std::string("JournalData"),
- defJournalGetEventsTimeout, defJournalFlushTimeout, agent,
+ jQueue = new JournalImpl(broker->getTimer(), queueName, getJrnlDir(queueName), defJournalGetEventsTimeout,
+ defJournalFlushTimeout, agent,
boost::bind(&MessageStoreImpl::journalDeleted, this, _1));
{
qpid::sys::Mutex::ScopedLock sl(journalListLock);
@@ -771,22 +725,23 @@ void MessageStoreImpl::recoverQueues(Txn
long rcnt = 0L; // recovered msg count
long idcnt = 0L; // in-doubt msg count
uint64_t thisHighestRid = 0ULL;
- jQueue->recover(numJrnlFiles, autoJrnlExpand, autoJrnlExpandMaxFiles, jrnlFsizeSblks, wCacheNumPages,
- wCachePgSizeSblks, &prepared, thisHighestRid, key.id); // start recovery
+ jQueue->recover(boost::dynamic_pointer_cast<qpid::qls_jrnl::EmptyFilePoolManager>(efpMgr), wCacheNumPages, wCachePgSizeSblks, &prepared, thisHighestRid, key.id);
-// // Check for changes to queue store settings qpid.file_count and qpid.file_size resulting
-// // from recovery of a store that has had its size changed externally by the resize utility.
-// // If so, update the queue store settings so that QMF queries will reflect the new values.
-// const qpid::framing::FieldTable& storeargs = queue->getSettings().storeSettings;
-// qpid::framing::FieldTable::ValuePtr value;
-// value = storeargs.get("qpid.file_count");
-// if (value.get() != 0 && !value->empty() && value->convertsTo<int>() && (uint16_t)value->get<int>() != jQueue->num_jfiles()) {
-// queue->addArgument("qpid.file_count", jQueue->num_jfiles());
-// }
-// value = storeargs.get("qpid.file_size");
-// if (value.get() != 0 && !value->empty() && value->convertsTo<int>() && (uint32_t)value->get<int>() != jQueue->jfsize_sblks()/JRNL_RMGR_PAGE_SIZE) {
-// queue->addArgument("qpid.file_size", jQueue->jfsize_sblks()/JRNL_RMGR_PAGE_SIZE);
-// }
+ // Check for changes to queue store settings qpid.file_count and qpid.file_size resulting
+ // from recovery of a store that has had its size changed externally by the resize utility.
+ // If so, update the queue store settings so that QMF queries will reflect the new values.
+/*
+ const qpid::framing::FieldTable& storeargs = queue->getSettings().storeSettings;
+ qpid::framing::FieldTable::ValuePtr value;
+ value = storeargs.get("qpid.file_count");
+ if (value.get() != 0 && !value->empty() && value->convertsTo<int>() && (uint16_t)value->get<int>() != jQueue->num_jfiles()) {
+ queue->addArgument("qpid.file_count", jQueue->num_jfiles());
+ }
+ value = storeargs.get("qpid.file_size");
+ if (value.get() != 0 && !value->empty() && value->convertsTo<int>() && (uint32_t)value->get<int>() != jQueue->jfsize_sblks()/JRNL_RMGR_PAGE_SIZE) {
+ queue->addArgument("qpid.file_size", jQueue->jfsize_sblks()/JRNL_RMGR_PAGE_SIZE);
+ }
+*/
if (highestRid == 0ULL)
highestRid = thisHighestRid;
@@ -810,7 +765,6 @@ void MessageStoreImpl::recoverQueues(Txn
QLS_LOG(info, "Most recent persistence id found: 0x" << std::hex << highestRid << std::dec);
queueIdSequence.reset(maxQueueId + 1);
-*/
}
@@ -901,16 +855,15 @@ void MessageStoreImpl::recoverGeneral(Tx
}
void MessageStoreImpl::recoverMessages(TxnCtxt& /*txn*/,
- qpid::broker::RecoveryManager& /*recovery*/,
- qpid::broker::RecoverableQueue::shared_ptr& queue_,
- txn_list& /*prepared*/,
- message_index& /*messages*/,
- long& /*rcnt*/,
- long& /*idcnt*/)
+ qpid::broker::RecoveryManager& recovery,
+ qpid::broker::RecoverableQueue::shared_ptr& queue,
+ txn_list& prepared,
+ message_index& messages,
+ long& rcnt,
+ long& idcnt)
{
- QLS_LOG(info, "*** MessageStoreImpl::recoverMessages() queue=\"" << queue_->getName() << "\"");
-/*
- size_t preambleLength = sizeof(uint32_t)header size;
+ QLS_LOG(info, "*** MessageStoreImpl::recoverMessages() queue=\"" << queue->getName() << "\"");
+ size_t preambleLength = sizeof(uint32_t)/*header size*/;
JournalImpl* jc = static_cast<JournalImpl*>(queue->getExternalQueueStore());
DataTokenImpl dtok;
@@ -1035,7 +988,6 @@ void MessageStoreImpl::recoverMessages(T
} catch (const qpid::qls_jrnl::jexception& e) {
THROW_STORE_EXCEPTION(std::string("Queue ") + queue->getName() + ": recoverMessages() failed: " + e.what());
}
-*/
}
qpid::broker::RecoverableMessage::shared_ptr MessageStoreImpl::getExternMessage(qpid::broker::RecoveryManager& /*recovery*/,
@@ -1079,7 +1031,6 @@ int MessageStoreImpl::enqueueMessage(Txn
void MessageStoreImpl::readTplStore()
{
QLS_LOG(info, "*** MessageStoreImpl::readTplStore()");
-/*
tplRecoverMap.clear();
qpid::qls_jrnl::txn_map& tmap = tplStorePtr->get_txn_map();
DataTokenImpl dtok;
@@ -1148,7 +1099,6 @@ void MessageStoreImpl::readTplStore()
} catch (const qpid::qls_jrnl::jexception& e) {
THROW_STORE_EXCEPTION(std::string("TPL recoverTplStore() failed: ") + e.what());
}
-*/
}
void MessageStoreImpl::recoverTplStore()
@@ -1171,10 +1121,9 @@ void MessageStoreImpl::recoverTplStore()
*/
}
-void MessageStoreImpl::recoverLockedMappings(txn_list& /*txns*/)
+void MessageStoreImpl::recoverLockedMappings(txn_list& txns)
{
QLS_LOG(info, "*** MessageStoreImpl::recoverLockedMappings()");
-/*
if (!tplStorePtr->is_ready())
recoverTplStore();
@@ -1186,7 +1135,6 @@ void MessageStoreImpl::recoverLockedMapp
deq_ptr.reset(new LockedMappings);
txns.push_back(new PreparedTransaction(i->first, enq_ptr, deq_ptr));
}
-*/
}
void MessageStoreImpl::collectPreparedXids(std::set<std::string>& /*xids*/)
@@ -1260,55 +1208,46 @@ void MessageStoreImpl::loadContent(const
void MessageStoreImpl::flush(const qpid::broker::PersistableQueue& queue_)
{
QLS_LOG(info, "*** MessageStoreImpl::flush() queue=\"" << queue_.getName() << "\"");
-/*
- if (queue.getExternalQueueStore() == 0) return;
+ if (queue_.getExternalQueueStore() == 0) return;
checkInit();
- std::string qn = queue.getName();
+ std::string qn = queue_.getName();
try {
- JournalImpl* jc = static_cast<JournalImpl*>(queue.getExternalQueueStore());
+ JournalImpl* jc = static_cast<JournalImpl*>(queue_.getExternalQueueStore());
if (jc) {
// TODO: check if this result should be used...
- mrg::journal::iores res = jc->flush();
+ /*mrg::journal::iores res =*/ jc->flush();
}
} catch (const qpid::qls_jrnl::jexception& e) {
THROW_STORE_EXCEPTION(std::string("Queue ") + qn + ": flush() failed: " + e.what() );
}
-*/
}
-void MessageStoreImpl::enqueue(qpid::broker::TransactionContext* /*ctxt*/,
+void MessageStoreImpl::enqueue(qpid::broker::TransactionContext* ctxt_,
const boost::intrusive_ptr<qpid::broker::PersistableMessage>& msg_,
- const qpid::broker::PersistableQueue& /*queue*/)
+ const qpid::broker::PersistableQueue& queue_)
{
-// QLS_LOG(info, "*** MessageStoreImpl::enqueue() queue=\"" << queue.getName() << "\"");
-/*
+ //QLS_LOG(info, "*** MessageStoreImpl::enqueue() queue=\"" << queue_.getName() << "\"");
checkInit();
- uint64_t queueId (queue.getPersistenceId());
- uint64_t messageId (msg->getPersistenceId());
+ uint64_t queueId (queue_.getPersistenceId());
if (queueId == 0) {
- THROW_STORE_EXCEPTION("Queue not created: " + queue.getName());
+ THROW_STORE_EXCEPTION("Queue not created: " + queue_.getName());
}
TxnCtxt implicit;
TxnCtxt* txn = 0;
- if (ctxt) {
- txn = check(ctxt);
+ if (ctxt_) {
+ txn = check(ctxt_);
} else {
txn = &implicit;
}
- bool newId = false;
- if (messageId == 0) {
- messageId = messageIdSequence.next();
- msg->setPersistenceId(messageId);
- newId = true;
+ if (msg_->getPersistenceId() == 0) {
+ msg_->setPersistenceId(messageIdSequence.next());
}
- store(&queue, txn, msg, newId);
+ store(&queue_, txn, msg_);
// add queue* to the txn map..
- if (ctxt) txn->addXidRecord(queue.getExternalQueueStore());
-*/
- msg_->enqueueComplete();// DEBUG: only while null fns in use
+ if (ctxt_) txn->addXidRecord(queue_.getExternalQueueStore());
}
uint64_t MessageStoreImpl::msgEncode(std::vector<char>& buff_,
@@ -1329,91 +1268,85 @@ uint64_t MessageStoreImpl::msgEncode(std
}
void MessageStoreImpl::store(const qpid::broker::PersistableQueue* queue_,
- TxnCtxt* /*txn*/,
- const boost::intrusive_ptr<qpid::broker::PersistableMessage>& /*message*/,
- bool /*newId*/)
+ TxnCtxt* txn_,
+ const boost::intrusive_ptr<qpid::broker::PersistableMessage>& message_)
{
- QLS_LOG(info, "*** MessageStoreImpl::store() queue=\"" << queue_->getName() << "\"");
-/*
+ //QLS_LOG(info, "*** MessageStoreImpl::store() queue=\"" << queue_->getName() << "\"");
std::vector<char> buff;
- uint64_t size = msgEncode(buff, message);
+ uint64_t size = msgEncode(buff, message_);
try {
- if (queue) {
+ if (queue_) {
boost::intrusive_ptr<DataTokenImpl> dtokp(new DataTokenImpl);
dtokp->addRef();
- dtokp->setSourceMessage(message);
+ dtokp->setSourceMessage(message_);
dtokp->set_external_rid(true);
- dtokp->set_rid(message->getPersistenceId()); // set the messageID into the Journal header (record-id)
+ dtokp->set_rid(message_->getPersistenceId()); // set the messageID into the Journal header (record-id)
- JournalImpl* jc = static_cast<JournalImpl*>(queue->getExternalQueueStore());
- if (txn->getXid().empty()) {
- jc->enqueue_data_record(&buff[0], size, size, dtokp.get(), !message->isPersistent());
+ JournalImpl* jc = static_cast<JournalImpl*>(queue_->getExternalQueueStore());
+ if (txn_->getXid().empty()) {
+ jc->enqueue_data_record(&buff[0], size, size, dtokp.get(), !message_->isPersistent());
} else {
- jc->enqueue_txn_data_record(&buff[0], size, size, dtokp.get(), txn->getXid(), !message->isPersistent());
+ jc->enqueue_txn_data_record(&buff[0], size, size, dtokp.get(), txn_->getXid(), !message_->isPersistent());
}
} else {
THROW_STORE_EXCEPTION(std::string("MessageStoreImpl::store() failed: queue NULL."));
}
} catch (const qpid::qls_jrnl::jexception& e) {
- THROW_STORE_EXCEPTION(std::string("Queue ") + queue->getName() + ": MessageStoreImpl::store() failed: " +
+ THROW_STORE_EXCEPTION(std::string("Queue ") + queue_->getName() + ": MessageStoreImpl::store() failed: " +
e.what());
}
-*/
}
-void MessageStoreImpl::dequeue(qpid::broker::TransactionContext* /*ctxt*/,
+void MessageStoreImpl::dequeue(qpid::broker::TransactionContext* ctxt_,
const boost::intrusive_ptr<qpid::broker::PersistableMessage>& msg_,
- const qpid::broker::PersistableQueue& /*queue*/)
+ const qpid::broker::PersistableQueue& queue_)
{
-// QLS_LOG(info, "*** MessageStoreImpl::dequeue() queue=\"" << queue.getName() << "\"");
-/*
+ //QLS_LOG(info, "*** MessageStoreImpl::dequeue() queue=\"" << queue_.getName() << "\"");
checkInit();
- uint64_t queueId (queue.getPersistenceId());
- uint64_t messageId (msg->getPersistenceId());
+ uint64_t queueId (queue_.getPersistenceId());
+ uint64_t messageId (msg_->getPersistenceId());
if (queueId == 0) {
- THROW_STORE_EXCEPTION("Queue \"" + queue.getName() + "\" has null queue Id (has not been created)");
+ THROW_STORE_EXCEPTION("Queue \"" + queue_.getName() + "\" has null queue Id (has not been created)");
}
if (messageId == 0) {
- THROW_STORE_EXCEPTION("Queue \"" + queue.getName() + "\": Dequeuing message with null persistence Id.");
+ THROW_STORE_EXCEPTION("Queue \"" + queue_.getName() + "\": Dequeuing message with null persistence Id.");
}
TxnCtxt implicit;
TxnCtxt* txn = 0;
- if (ctxt) {
- txn = check(ctxt);
+ if (ctxt_) {
+ txn = check(ctxt_);
} else {
txn = &implicit;
}
// add queue* to the txn map..
- if (ctxt) txn->addXidRecord(queue.getExternalQueueStore());
- async_dequeue(ctxt, msg, queue);
-*/
+ if (ctxt_) txn->addXidRecord(queue_.getExternalQueueStore());
+ async_dequeue(ctxt_, msg_, queue_);
msg_->dequeueComplete();
}
-void MessageStoreImpl::async_dequeue(qpid::broker::TransactionContext* /*ctxt*/,
- const boost::intrusive_ptr<qpid::broker::PersistableMessage>& /*msg*/,
+void MessageStoreImpl::async_dequeue(qpid::broker::TransactionContext* ctxt_,
+ const boost::intrusive_ptr<qpid::broker::PersistableMessage>& msg_,
const qpid::broker::PersistableQueue& queue_)
{
- QLS_LOG(info, "*** MessageStoreImpl::async_dequeue() queue=\"" << queue_.getName() << "\"");
-/*
+ //QLS_LOG(info, "*** MessageStoreImpl::async_dequeue() queue=\"" << queue_.getName() << "\"");
boost::intrusive_ptr<DataTokenImpl> ddtokp(new DataTokenImpl);
- ddtokp->setSourceMessage(msg);
+ ddtokp->setSourceMessage(msg_);
ddtokp->set_external_rid(true);
ddtokp->set_rid(messageIdSequence.next());
- ddtokp->set_dequeue_rid(msg->getPersistenceId());
+ ddtokp->set_dequeue_rid(msg_->getPersistenceId());
ddtokp->set_wstate(DataTokenImpl::ENQ);
std::string tid;
- if (ctxt) {
- TxnCtxt* txn = check(ctxt);
+ if (ctxt_) {
+ TxnCtxt* txn = check(ctxt_);
tid = txn->getXid();
}
// Manually increase the ref count, as raw pointers are used beyond this point
ddtokp->addRef();
try {
- JournalImpl* jc = static_cast<JournalImpl*>(queue.getExternalQueueStore());
+ JournalImpl* jc = static_cast<JournalImpl*>(queue_.getExternalQueueStore());
if (tid.empty()) {
jc->dequeue_data_record(ddtokp.get());
} else {
@@ -1421,9 +1354,8 @@ void MessageStoreImpl::async_dequeue(qpi
}
} catch (const qpid::qls_jrnl::jexception& e) {
ddtokp->release();
- THROW_STORE_EXCEPTION(std::string("Queue ") + queue.getName() + ": async_dequeue() failed: " + e.what());
+ THROW_STORE_EXCEPTION(std::string("Queue ") + queue_.getName() + ": async_dequeue() failed: " + e.what());
}
-*/
}
uint32_t MessageStoreImpl::outstandingQueueAIO(const qpid::broker::PersistableQueue& /*queue_*/)
@@ -1661,11 +1593,10 @@ std::string MessageStoreImpl::getTplBase
return dir.str();
}
-std::string MessageStoreImpl::getJrnlDir(const qpid::broker::PersistableQueue& queue_) //for exmaple /var/rhm/ + queueDir/
+std::string MessageStoreImpl::getJrnlDir(const std::string& queueName_)
{
- /*return getJrnlHashDir(queue_.getName().c_str());*/
std::ostringstream oss;
- oss << getJrnlBaseDir() << queue_.getName();
+ oss << getJrnlBaseDir() << queueName_;
return oss.str();
}
@@ -1679,8 +1610,8 @@ void MessageStoreImpl::journalDeleted(Jo
MessageStoreImpl::StoreOptions::StoreOptions(const std::string& name_) :
qpid::Options(name_),
truncateFlag(defTruncateFlag),
- wCachePageSizeKib(defWCachePageSize),
- tplWCachePageSizeKib(defTplWCachePageSize),
+ wCachePageSizeKib(defWCachePageSizeKib),
+ tplWCachePageSizeKib(defTplWCachePageSizeKib),
efpPartition(defEfpPartition),
efpFileSizeKib(defEfpFileSizeKib)
{
Modified: qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/MessageStoreImpl.h
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/MessageStoreImpl.h?rev=1530024&r1=1530023&r2=1530024&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/MessageStoreImpl.h (original)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/MessageStoreImpl.h Mon Oct 7 18:39:24 2013
@@ -101,10 +101,10 @@ class MessageStoreImpl : public qpid::br
// Default store settings
static const bool defTruncateFlag = false;
- static const uint32_t defWCachePageSize = JRNL_WMGR_DEF_PAGE_SIZE * JRNL_SBLK_SIZE / 1024;
- static const uint32_t defTplWCachePageSize = defWCachePageSize / 8;
+ static const uint32_t defWCachePageSizeKib = JRNL_WMGR_DEF_PAGE_SIZE_KIB;
+ static const uint32_t defTplWCachePageSizeKib = defWCachePageSizeKib / 8;
static const uint16_t defEfpPartition = 1;
- static const uint64_t defEfpFileSizeKib = 512 * JRNL_SBLK_SIZE / 1024;
+ static const uint64_t defEfpFileSizeKib = 512 * JRNL_SBLK_SIZE_KIB;
static const std::string storeTopLevelDir;
static qpid::sys::Duration defJournalGetEventsTimeout;
@@ -133,7 +133,7 @@ class MessageStoreImpl : public qpid::br
IdSequence messageIdSequence;
std::string storeDir;
qpid::qls_jrnl::efpPartitionNumber_t defaultEfpPartitionNumber;
- qpid::qls_jrnl::efpFileSizeKib_t defaultEfpFileSizeKib;
+ qpid::qls_jrnl::efpDataSize_kib_t defaultEfpFileSize_kib;
bool truncateFlag;
uint32_t wCachePgSizeSblks;
uint16_t wCacheNumPages;
@@ -156,7 +156,7 @@ class MessageStoreImpl : public qpid::br
static uint16_t getJrnlWrNumPages(const uint32_t wrPageSizeKiB);
static qpid::qls_jrnl::efpPartitionNumber_t chkEfpPartition(const qpid::qls_jrnl::efpPartitionNumber_t partition,
const std::string& paramName);
- static qpid::qls_jrnl::efpFileSizeKib_t chkEfpFileSizeKiB(const qpid::qls_jrnl::efpFileSizeKib_t efpFileSizeKiB,
+ static qpid::qls_jrnl::efpDataSize_kib_t chkEfpFileSizeKiB(const qpid::qls_jrnl::efpDataSize_kib_t efpFileSizeKiB,
const std::string& paramName);
void init();
@@ -202,8 +202,7 @@ class MessageStoreImpl : public qpid::br
uint64_t msgEncode(std::vector<char>& buff, const boost::intrusive_ptr<qpid::broker::PersistableMessage>& message);
void store(const qpid::broker::PersistableQueue* queue,
TxnCtxt* txn,
- const boost::intrusive_ptr<qpid::broker::PersistableMessage>& message,
- bool newId);
+ const boost::intrusive_ptr<qpid::broker::PersistableMessage>& message);
void async_dequeue(qpid::broker::TransactionContext* ctxt,
const boost::intrusive_ptr<qpid::broker::PersistableMessage>& msg,
const qpid::broker::PersistableQueue& queue);
@@ -231,8 +230,8 @@ class MessageStoreImpl : public qpid::br
// journal functions
void createJrnlQueue(const qpid::broker::PersistableQueue& queue);
- std::string getJrnlDir(const qpid::broker::PersistableQueue& queue); //for exmaple /var/rhm/ + queueDir/
- qpid::qls_jrnl::EmptyFilePool* getEmptyFilePool(const qpid::qls_jrnl::efpPartitionNumber_t p, const qpid::qls_jrnl::efpFileSizeKib_t s);
+ std::string getJrnlDir(const std::string& queueName);
+ qpid::qls_jrnl::EmptyFilePool* getEmptyFilePool(const qpid::qls_jrnl::efpPartitionNumber_t p, const qpid::qls_jrnl::efpDataSize_kib_t s);
qpid::qls_jrnl::EmptyFilePool* getEmptyFilePool(const qpid::framing::FieldTable& args);
std::string getStoreTopLevelDir();
std::string getJrnlBaseDir();
@@ -268,10 +267,10 @@ class MessageStoreImpl : public qpid::br
bool init(const std::string& dir,
qpid::qls_jrnl::efpPartitionNumber_t efpPartition = defEfpPartition,
- qpid::qls_jrnl::efpFileSizeKib_t efpFileSizeKib = defEfpFileSizeKib,
+ qpid::qls_jrnl::efpDataSize_kib_t efpFileSizeKib = defEfpFileSizeKib,
const bool truncateFlag = false,
- uint32_t wCachePageSize = defWCachePageSize,
- uint32_t tplWCachePageSize = defTplWCachePageSize);
+ uint32_t wCachePageSize = defWCachePageSizeKib,
+ uint32_t tplWCachePageSize = defTplWCachePageSizeKib);
void truncateInit();
@@ -279,6 +278,8 @@ class MessageStoreImpl : public qpid::br
void finalize();
+ // --- Implementation of qpid::broker::MessageStore ---
+
void create(qpid::broker::PersistableQueue& queue,
const qpid::framing::FieldTable& args);
@@ -344,6 +345,8 @@ class MessageStoreImpl : public qpid::br
void abort(qpid::broker::TransactionContext& ctxt);
+ // --- Implementation of qpid::management::Managable ---
+
qpid::management::ManagementObject::shared_ptr GetManagementObject (void) const
{ return mgmtObject; }
Added: qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/AtomicCounter.h
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/AtomicCounter.h?rev=1530024&view=auto
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/AtomicCounter.h (added)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/AtomicCounter.h Mon Oct 7 18:39:24 2013
@@ -0,0 +1,137 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#ifndef QPID_LINEARSTORE_ATOMICCOUNTER_H_
+#define QPID_LINEARSTORE_ATOMICCOUNTER_H_
+
+#include "qpid/linearstore/jrnl/slock.h"
+
+namespace qpid {
+namespace qls_jrnl {
+
+template <class T>
+class AtomicCounter
+{
+private:
+ T count;
+ mutable smutex countMutex;
+
+public:
+ AtomicCounter(const T& initValue = T(0)) : count(initValue) {}
+
+ virtual ~AtomicCounter() {}
+
+ T get() const {
+ slock l(countMutex);
+ return count;
+ }
+
+ T increment() {
+ slock l(countMutex);
+ return ++count;
+ }
+
+ T add(const T& a) {
+ slock l(countMutex);
+ count += a;
+ return count;
+ }
+
+ T addLimit(const T& a, const T& limit, const uint32_t jerr) {
+ slock l(countMutex);
+ if (count + a > limit) throw jexception(jerr, "AtomicCounter", "addLimit");
+ count += a;
+ return count;
+ }
+
+ T decrement() {
+ slock l(countMutex);
+ return --count;
+ }
+
+ T decrementLimit(const T& limit = T(0), const uint32_t jerr = jerrno::JERR__UNDERFLOW) {
+ slock l(countMutex);
+ if (count < limit + 1) {
+ throw jexception(jerr, "AtomicCounter", "decrementLimit");
+ }
+ return --count;
+ }
+
+ T subtract(const T& s) {
+ slock l(countMutex);
+ count -= s;
+ return count;
+ }
+
+ T subtractLimit(const T& s, const T& limit = T(0), const uint32_t jerr = jerrno::JERR__UNDERFLOW) {
+ slock l(countMutex);
+ if (count < limit + s) throw jexception(jerr, "AtomicCounter", "subtractLimit");
+ count -= s;
+ return count;
+ }
+
+ bool operator==(const T& o) const {
+ slock l(countMutex);
+ return count == o;
+ }
+
+ bool operator<(const T& o) const {
+ slock l(countMutex);
+ return count < o;
+ }
+
+ bool operator<=(const T& o) const {
+ slock l(countMutex);
+ return count <= o;
+ }
+
+ friend T operator-(const T& a, const AtomicCounter& b) {
+ slock l(b.countMutex);
+ return a - b.count;
+ }
+
+ friend T operator-(const AtomicCounter& a, const T& b) {
+ slock l(a.countMutex);
+ return a.count - b;
+ }
+
+ friend T operator-(const AtomicCounter&a, const AtomicCounter& b) {
+ slock l1(a.countMutex);
+ slock l2(b.countMutex);
+ return a.count - b.count;
+ }
+
+/*
+ friend std::ostream& operator<<(std::ostream& out, const AtomicCounter& a) {
+ T temp; // Use temp so lock is not held while streaming to out.
+ {
+ slock l(a.countMutex);
+ temp = a.count;
+ }
+ out << temp;
+ return out;
+ }
+*/
+};
+
+}} // namespace qpid::qls_jrnl
+
+#endif // QPID_LINEARSTORE_ATOMICCOUNTER_H_
Modified: qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/EmptyFilePool.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/EmptyFilePool.cpp?rev=1530024&r1=1530023&r2=1530024&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/EmptyFilePool.cpp (original)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/EmptyFilePool.cpp Mon Oct 7 18:39:24 2013
@@ -40,7 +40,7 @@ namespace qls_jrnl {
EmptyFilePool::EmptyFilePool(const std::string& efpDirectory_,
const EmptyFilePoolPartition* partitionPtr_) :
efpDirectory(efpDirectory_),
- efpFileSizeKib(fileSizeKbFromDirName(efpDirectory_, partitionPtr_->partitionNumber())),
+ efpDataSize_kib(fileSizeKbFromDirName(efpDirectory_, partitionPtr_->partitionNumber())),
partitionPtr(partitionPtr_)
{}
@@ -50,7 +50,7 @@ void
EmptyFilePool::initialize() {
//std::cout << "Reading " << efpDirectory << std::endl; // DEBUG
std::vector<std::string> dirList;
- jdir::read_dir(efpDirectory, dirList, false, true, false);
+ jdir::read_dir(efpDirectory, dirList, false, true, false, false);
for (std::vector<std::string>::iterator i = dirList.begin(); i != dirList.end(); ++i) {
size_t dotPos = i->rfind(".");
if (dotPos != std::string::npos) {
@@ -65,9 +65,24 @@ EmptyFilePool::initialize() {
//std::cout << "Found " << emptyFileList.size() << " files" << std::endl; // DEBUG
}
-efpFileSizeKib_t
-EmptyFilePool::fileSizeKib() const {
- return efpFileSizeKib;
+efpDataSize_kib_t
+EmptyFilePool::dataSize_kib() const {
+ return efpDataSize_kib;
+}
+
+efpFileSize_kib_t
+EmptyFilePool::fileSize_kib() const {
+ return efpDataSize_kib + (QLS_JRNL_FHDR_RES_SIZE_SBLKS * JRNL_SBLK_SIZE_KIB);
+}
+
+efpDataSize_sblks_t
+EmptyFilePool::dataSize_sblks() const {
+ return efpDataSize_kib / JRNL_SBLK_SIZE_KIB;
+}
+
+efpFileSize_sblks_t
+EmptyFilePool::fileSize_sblks() const {
+ return (efpDataSize_kib / JRNL_SBLK_SIZE_KIB) + QLS_JRNL_FHDR_RES_SIZE_SBLKS;
}
efpFileCount_t
@@ -76,10 +91,10 @@ EmptyFilePool::numEmptyFiles() const {
return efpFileCount_t(emptyFileList.size());
}
-efpFileSizeKib_t
-EmptyFilePool::cumFileSizeKib() const {
+efpDataSize_kib_t
+EmptyFilePool::cumFileSize_kib() const {
slock l(emptyFileListMutex);
- return efpFileSizeKib_t(emptyFileList.size()) * efpFileSizeKib;
+ return efpDataSize_kib_t(emptyFileList.size()) * efpDataSize_kib;
}
efpPartitionNumber_t
@@ -94,7 +109,7 @@ EmptyFilePool::getPartition() const {
const efpIdentity_t
EmptyFilePool::getIdentity() const {
- return efpIdentity_t(partitionPtr->partitionNumber(), efpFileSizeKib);
+ return efpIdentity_t(partitionPtr->partitionNumber(), efpDataSize_kib);
}
std::string
@@ -112,9 +127,9 @@ EmptyFilePool::takeEmptyFile(const std::
bool
EmptyFilePool::returnEmptyFile(const JournalFile* srcFile) {
- std::string emptyFileName(efpDirectory + srcFile->fileName());
+ std::string emptyFileName(efpDirectory + srcFile->getFileName());
// TODO: reset file here
- if (::rename(srcFile->fqFileName().c_str(), emptyFileName.c_str())) {
+ if (::rename(srcFile->getFqFileName().c_str(), emptyFileName.c_str())) {
std::ostringstream oss;
oss << "file=\"" << srcFile << "\" dest=\"" << emptyFileName << "\"" << FORMAT_SYSERR(errno);
throw jexception(jerrno::JERR_JDIR_FMOVE, oss.str(), "EmptyFilePool", "returnEmptyFile");
@@ -152,14 +167,13 @@ EmptyFilePool::popEmptyFile() {
void
EmptyFilePool::createEmptyFile() {
- file_hdr_t fh;
- ::file_hdr_create(&fh, QLS_FILE_MAGIC, QLS_JRNL_VERSION, QLS_JRNL_FHDRSIZESBLKS, partitionPtr->partitionNumber(),
- efpFileSizeKib);
+ ::file_hdr_t fh;
+ ::file_hdr_create(&fh, QLS_FILE_MAGIC, QLS_JRNL_VERSION, QLS_JRNL_FHDR_RES_SIZE_SBLKS, partitionPtr->partitionNumber(), efpDataSize_kib);
std::string efpfn = getEfpFileName();
std::ofstream ofs(efpfn.c_str(), std::ofstream::out | std::ofstream::binary);
if (ofs.good()) {
- ofs.write((char*)&fh, sizeof(file_hdr_t));
- uint64_t rem = ((efpFileSizeKib + (QLS_JRNL_FHDRSIZESBLKS * JRNL_SBLK_SIZE_KIB)) * 1024) - sizeof(file_hdr_t);
+ ofs.write((char*)&fh, sizeof(::file_hdr_t));
+ uint64_t rem = ((efpDataSize_kib + (QLS_JRNL_FHDR_RES_SIZE_SBLKS * JRNL_SBLK_SIZE_KIB)) * 1024) - sizeof(::file_hdr_t);
while (rem--)
ofs.put('\0');
ofs.close();
@@ -180,8 +194,8 @@ EmptyFilePool::validateEmptyFile(const s
oss << "stat: file=\"" << emptyFileName_ << "\"" << FORMAT_SYSERR(errno);
throw jexception(jerrno::JERR_JDIR_STAT, oss.str(), "EmptyFilePool", "validateEmptyFile");
}
- efpFileSizeKib_t expectedSize = (JRNL_SBLK_SIZE_KIB + efpFileSizeKib) * 1024;
- if ((efpFileSizeKib_t)s.st_size != expectedSize) {
+ efpDataSize_kib_t expectedSize = (JRNL_SBLK_SIZE_KIB + efpDataSize_kib) * 1024;
+ if ((efpDataSize_kib_t)s.st_size != expectedSize) {
//std::cout << "ERROR: File " << emptyFileName << ": Incorrect size: Expected=" << expectedSize << "; actual=" << s.st_size << std::endl; // DEBUG
return false;
}
@@ -194,8 +208,8 @@ EmptyFilePool::validateEmptyFile(const s
const uint8_t fhFileNameBuffLen = 50;
char fhFileNameBuff[fhFileNameBuffLen];
- file_hdr_t fh;
- ifs.read((char*)&fh, sizeof(file_hdr_t));
+ ::file_hdr_t fh;
+ ifs.read((char*)&fh, sizeof(::file_hdr_t));
uint16_t fhFileNameLen = fh._queue_name_len > fhFileNameBuffLen ? fhFileNameBuffLen : fh._queue_name_len;
ifs.read(fhFileNameBuff, fhFileNameLen);
std::string fhFileName(fhFileNameBuff, fhFileNameLen);
@@ -204,7 +218,7 @@ EmptyFilePool::validateEmptyFile(const s
if (fh._rhdr._magic != QLS_FILE_MAGIC ||
fh._rhdr._version != QLS_JRNL_VERSION ||
fh._efp_partition != partitionPtr->partitionNumber() ||
- fh._file_size_kib != efpFileSizeKib ||
+ fh._file_size_kib != efpDataSize_kib ||
!::is_file_hdr_reset(&fh))
{
//std::cout << "ERROR: File " << emptyFileName << ": Invalid file header" << std::endl;
@@ -227,7 +241,7 @@ EmptyFilePool::getEfpFileName() {
// protected
// static
-efpFileSizeKib_t
+efpDataSize_kib_t
EmptyFilePool::fileSizeKbFromDirName(const std::string& dirName_,
const efpPartitionNumber_t partitionNumber_) {
// Check for dirName format 'NNNk', where NNN is a number, convert NNN into an integer. NNN cannot be 0.
@@ -243,7 +257,7 @@ EmptyFilePool::fileSizeKbFromDirName(con
valid = n[charNum] == 'k';
}
}
- efpFileSizeKib_t s = ::atol(n.c_str());
+ efpDataSize_kib_t s = ::atol(n.c_str());
if (!valid || s == 0 || s % JRNL_SBLK_SIZE_KIB != 0) {
std::ostringstream oss;
oss << "Partition: " << partitionNumber_ << "; EFP directory: \'" << n << "\'";
Modified: qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/EmptyFilePool.h
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/EmptyFilePool.h?rev=1530024&r1=1530023&r2=1530024&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/EmptyFilePool.h (original)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/EmptyFilePool.h Mon Oct 7 18:39:24 2013
@@ -47,7 +47,7 @@ protected:
typedef emptyFileList_t::iterator emptyFileListItr_t;
const std::string efpDirectory;
- const efpFileSizeKib_t efpFileSizeKib;
+ const efpDataSize_kib_t efpDataSize_kib;
const EmptyFilePoolPartition* partitionPtr;
private:
@@ -60,9 +60,12 @@ public:
virtual ~EmptyFilePool();
void initialize();
- efpFileSizeKib_t fileSizeKib() const;
+ efpDataSize_kib_t dataSize_kib() const;
+ efpFileSize_kib_t fileSize_kib() const;
+ efpDataSize_sblks_t dataSize_sblks() const;
+ efpFileSize_sblks_t fileSize_sblks() const;
efpFileCount_t numEmptyFiles() const;
- efpFileSizeKib_t cumFileSizeKib() const;
+ efpDataSize_kib_t cumFileSize_kib() const;
efpPartitionNumber_t getPartitionNumber() const;
const EmptyFilePoolPartition* getPartition() const;
const efpIdentity_t getIdentity() const;
@@ -76,8 +79,8 @@ protected:
void createEmptyFile();
bool validateEmptyFile(const std::string& emptyFileName_) const;
std::string getEfpFileName();
- static efpFileSizeKib_t fileSizeKbFromDirName(const std::string& dirName_,
- const efpPartitionNumber_t partitionNumber_);
+ static efpDataSize_kib_t fileSizeKbFromDirName(const std::string& dirName_,
+ const efpPartitionNumber_t partitionNumber_);
};
}} // namespace qpid::qls_jrnl
Modified: qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/EmptyFilePoolManager.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/EmptyFilePoolManager.cpp?rev=1530024&r1=1530023&r2=1530024&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/EmptyFilePoolManager.cpp (original)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/EmptyFilePoolManager.cpp Mon Oct 7 18:39:24 2013
@@ -49,7 +49,7 @@ void
EmptyFilePoolManager::findEfpPartitions() {
//std::cout << "*** Reading " << qlsStorePath << std::endl; // DEBUG
std::vector<std::string> dirList;
- jdir::read_dir(qlsStorePath, dirList, true, false, true);
+ jdir::read_dir(qlsStorePath, dirList, true, false, true, false);
for (std::vector<std::string>::iterator i = dirList.begin(); i != dirList.end(); ++i) {
if ((*i)[0] == 'p' && i->length() == 4) { // Filter: look only at names pNNN
efpPartitionNumber_t pn = ::atoi(i->c_str() + 1);
@@ -90,15 +90,15 @@ EmptyFilePoolManager::getEfpPartition(co
void
EmptyFilePoolManager::getEfpPartitionNumbers(std::vector<efpPartitionNumber_t>& partitionNumberList,
- const efpFileSizeKib_t efpFileSizeKb) const {
+ const efpDataSize_kib_t efpFileSizeKb) const {
slock l(partitionMapMutex);
for (partitionMapConstItr_t i=partitionMap.begin(); i!=partitionMap.end(); ++i) {
if (efpFileSizeKb == 0) {
partitionNumberList.push_back(i->first);
} else {
- std::vector<efpFileSizeKib_t> efpFileSizeList;
+ std::vector<efpDataSize_kib_t> efpFileSizeList;
i->second->getEmptyFilePoolSizesKb(efpFileSizeList);
- for (std::vector<efpFileSizeKib_t>::iterator j=efpFileSizeList.begin(); j!=efpFileSizeList.end(); ++j) {
+ for (std::vector<efpDataSize_kib_t>::iterator j=efpFileSizeList.begin(); j!=efpFileSizeList.end(); ++j) {
if (*j == efpFileSizeKb) {
partitionNumberList.push_back(i->first);
break;
@@ -110,15 +110,15 @@ EmptyFilePoolManager::getEfpPartitionNum
void
EmptyFilePoolManager::getEfpPartitions(std::vector<EmptyFilePoolPartition*>& partitionList,
- const efpFileSizeKib_t efpFileSizeKb) {
+ const efpDataSize_kib_t efpFileSizeKb) {
slock l(partitionMapMutex);
for (partitionMapConstItr_t i=partitionMap.begin(); i!=partitionMap.end(); ++i) {
if (efpFileSizeKb == 0) {
partitionList.push_back(i->second);
} else {
- std::vector<efpFileSizeKib_t> efpFileSizeList;
+ std::vector<efpDataSize_kib_t> efpFileSizeList;
i->second->getEmptyFilePoolSizesKb(efpFileSizeList);
- for (std::vector<efpFileSizeKib_t>::iterator j=efpFileSizeList.begin(); j!=efpFileSizeList.end(); ++j) {
+ for (std::vector<efpDataSize_kib_t>::iterator j=efpFileSizeList.begin(); j!=efpFileSizeList.end(); ++j) {
if (*j == efpFileSizeKb) {
partitionList.push_back(i->second);
break;
@@ -129,7 +129,7 @@ EmptyFilePoolManager::getEfpPartitions(s
}
void
-EmptyFilePoolManager::getEfpFileSizes(std::vector<efpFileSizeKib_t>& efpFileSizeList,
+EmptyFilePoolManager::getEfpFileSizes(std::vector<efpDataSize_kib_t>& efpFileSizeList,
const efpPartitionNumber_t efpPartitionNumber) const {
if (efpPartitionNumber == 0) {
for (partitionMapConstItr_t i=partitionMap.begin(); i!=partitionMap.end(); ++i) {
@@ -160,7 +160,7 @@ EmptyFilePoolManager::getEmptyFilePools(
EmptyFilePool*
EmptyFilePoolManager::getEmptyFilePool(const efpPartitionNumber_t partitionNumber,
- const efpFileSizeKib_t efpFileSizeKib) {
+ const efpDataSize_kib_t efpFileSizeKib) {
EmptyFilePoolPartition* efppp = getEfpPartition(partitionNumber);
if (efppp != 0)
return efppp->getEmptyFilePool(efpFileSizeKib);
Modified: qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/EmptyFilePoolManager.h
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/EmptyFilePoolManager.h?rev=1530024&r1=1530023&r2=1530024&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/EmptyFilePoolManager.h (original)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/EmptyFilePoolManager.h Mon Oct 7 18:39:24 2013
@@ -48,13 +48,13 @@ public:
uint16_t getNumEfpPartitions() const;
EmptyFilePoolPartition* getEfpPartition(const efpPartitionNumber_t partitionNumber);
- void getEfpPartitionNumbers(std::vector<efpPartitionNumber_t>& partitionNumberList, const efpFileSizeKib_t efpFileSizeKb = 0) const;
- void getEfpPartitions(std::vector<EmptyFilePoolPartition*>& partitionList, const efpFileSizeKib_t efpFileSizeKb = 0);
+ void getEfpPartitionNumbers(std::vector<efpPartitionNumber_t>& partitionNumberList, const efpDataSize_kib_t efpFileSizeKb = 0) const;
+ void getEfpPartitions(std::vector<EmptyFilePoolPartition*>& partitionList, const efpDataSize_kib_t efpFileSizeKb = 0);
- void getEfpFileSizes(std::vector<efpFileSizeKib_t>& efpFileSizeList, const efpPartitionNumber_t efpPartitionNumber = 0) const;
+ void getEfpFileSizes(std::vector<efpDataSize_kib_t>& efpFileSizeList, const efpPartitionNumber_t efpPartitionNumber = 0) const;
void getEmptyFilePools(std::vector<EmptyFilePool*>& emptyFilePoolList, const efpPartitionNumber_t efpPartitionNumber = 0);
- EmptyFilePool* getEmptyFilePool(const efpPartitionNumber_t partitionNumber, const efpFileSizeKib_t efpFileSizeKb);
+ EmptyFilePool* getEmptyFilePool(const efpPartitionNumber_t partitionNumber, const efpDataSize_kib_t efpFileSizeKb);
EmptyFilePool* getEmptyFilePool(const efpIdentity_t efpIdentity);
};
Modified: qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/EmptyFilePoolPartition.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/EmptyFilePoolPartition.cpp?rev=1530024&r1=1530023&r2=1530024&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/EmptyFilePoolPartition.cpp (original)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/EmptyFilePoolPartition.cpp Mon Oct 7 18:39:24 2013
@@ -63,7 +63,7 @@ void
EmptyFilePoolPartition::findEmptyFilePools() {
//std::cout << "Reading " << partitionDir << std::endl; // DEBUG
std::vector<std::string> dirList;
- jdir::read_dir(partitionDir, dirList, true, false, false);
+ jdir::read_dir(partitionDir, dirList, true, false, false, false);
bool foundEfpDir = false;
for (std::vector<std::string>::iterator i = dirList.begin(); i != dirList.end(); ++i) {
if (i->compare(efpTopLevelDir) == 0) {
@@ -75,15 +75,14 @@ EmptyFilePoolPartition::findEmptyFilePoo
std::string efpDir(partitionDir + "/" + efpTopLevelDir);
//std::cout << "Reading " << efpDir << std::endl; // DEBUG
dirList.clear();
- jdir::read_dir(efpDir, dirList, true, false, false);
+ jdir::read_dir(efpDir, dirList, true, false, false, true);
for (std::vector<std::string>::iterator i = dirList.begin(); i != dirList.end(); ++i) {
- std::string efpSizeDir(efpDir + "/" + (*i));
EmptyFilePool* efpp = 0;
try {
- efpp = new EmptyFilePool(efpSizeDir, this);
+ efpp = new EmptyFilePool(*i, this);
{
slock l(efpMapMutex);
- efpMap[efpp->fileSizeKib()] = efpp;
+ efpMap[efpp->dataSize_kib()] = efpp;
}
}
catch (const std::exception& e) {
@@ -110,7 +109,7 @@ EmptyFilePoolPartition::partitionDirecto
}
EmptyFilePool*
-EmptyFilePoolPartition::getEmptyFilePool(const efpFileSizeKib_t efpFileSizeKb) {
+EmptyFilePoolPartition::getEmptyFilePool(const efpDataSize_kib_t efpFileSizeKb) {
efpMapItr_t i = efpMap.find(efpFileSizeKb);
if (i == efpMap.end())
return 0;
@@ -118,7 +117,7 @@ EmptyFilePoolPartition::getEmptyFilePool
}
void
-EmptyFilePoolPartition::getEmptyFilePoolSizesKb(std::vector<efpFileSizeKib_t>& efpFileSizesKbList) const {
+EmptyFilePoolPartition::getEmptyFilePoolSizesKb(std::vector<efpDataSize_kib_t>& efpFileSizesKbList) const {
for (efpMapConstItr_t i=efpMap.begin(); i!=efpMap.end(); ++i) {
efpFileSizesKbList.push_back(i->first);
}
Modified: qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/EmptyFilePoolPartition.h
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/EmptyFilePoolPartition.h?rev=1530024&r1=1530023&r2=1530024&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/EmptyFilePoolPartition.h (original)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/EmptyFilePoolPartition.h Mon Oct 7 18:39:24 2013
@@ -44,7 +44,7 @@ class EmptyFilePoolPartition
public:
static const std::string efpTopLevelDir;
protected:
- typedef std::map<efpFileSizeKib_t, EmptyFilePool*> efpMap_t;
+ typedef std::map<efpDataSize_kib_t, EmptyFilePool*> efpMap_t;
typedef efpMap_t::iterator efpMapItr_t;
typedef efpMap_t::const_iterator efpMapConstItr_t;
@@ -62,8 +62,8 @@ public:
efpPartitionNumber_t partitionNumber() const;
std::string partitionDirectory() const;
- EmptyFilePool* getEmptyFilePool(const efpFileSizeKib_t efpFileSizeKb);
- void getEmptyFilePoolSizesKb(std::vector<efpFileSizeKib_t>& efpFileSizesKbList) const;
+ EmptyFilePool* getEmptyFilePool(const efpDataSize_kib_t efpFileSizeKb);
+ void getEmptyFilePoolSizesKb(std::vector<efpDataSize_kib_t>& efpFileSizesKbList) const;
void getEmptyFilePools(std::vector<EmptyFilePool*>& efpList);
};
Modified: qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/EmptyFilePoolTypes.h
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/EmptyFilePoolTypes.h?rev=1530024&r1=1530023&r2=1530024&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/EmptyFilePoolTypes.h (original)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/EmptyFilePoolTypes.h Mon Oct 7 18:39:24 2013
@@ -23,14 +23,18 @@
#define QPID_QLS_JRNL_EMPTYFILEPOOLTYPES_H_
#include <stdint.h>
+#include <utility> // std::pair
namespace qpid {
namespace qls_jrnl {
- typedef uint64_t efpFileSizeKib_t;
+ typedef uint64_t efpDataSize_kib_t; // Size of data part of file (excluding file header) in kib
+ typedef uint64_t efpFileSize_kib_t; // Size of file (header + data) in kib
+ typedef uint32_t efpDataSize_sblks_t; // Size of data part of file (excluding file header) in sblks
+ typedef uint32_t efpFileSize_sblks_t; // Size of file (header + data) in sblks
typedef uint32_t efpFileCount_t;
typedef uint16_t efpPartitionNumber_t;
- typedef std::pair<efpPartitionNumber_t, efpFileSizeKib_t> efpIdentity_t;
+ typedef std::pair<efpPartitionNumber_t, efpDataSize_kib_t> efpIdentity_t;
}} // namespace qpid::qls_jrnl
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org