You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kp...@apache.org on 2017/02/08 21:05:48 UTC
qpid-cpp git commit: QPID-7666: Added wcache-num-pages (and
tpl-wcache-num-pages) to linearstore. Changed defaults for regular queues to
wcache-page-size=16 (kiB) and set wcache-num-pages default to 16. Added
wcache-page-size and wcache-num-pages to qpid
Repository: qpid-cpp
Updated Branches:
refs/heads/master a619d4549 -> 34cfb7f0a
QPID-7666: Added wcache-num-pages (and tpl-wcache-num-pages) to linearstore. Changed defaults for regular queues to wcache-page-size=16 (kiB) and set wcache-num-pages default to 16. Added wcache-page-size and wcache-num-pages to qpid-config.
Project: http://git-wip-us.apache.org/repos/asf/qpid-cpp/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-cpp/commit/34cfb7f0
Tree: http://git-wip-us.apache.org/repos/asf/qpid-cpp/tree/34cfb7f0
Diff: http://git-wip-us.apache.org/repos/asf/qpid-cpp/diff/34cfb7f0
Branch: refs/heads/master
Commit: 34cfb7f0a82283fce6cea7b882712ed0929de629
Parents: a619d45
Author: Kim van der Riet <kp...@apache.org>
Authored: Wed Feb 8 16:05:10 2017 -0500
Committer: Kim van der Riet <kp...@apache.org>
Committed: Wed Feb 8 16:05:10 2017 -0500
----------------------------------------------------------------------
management/python/bin/qpid-config | 17 +++++
src/qpid/linearstore/JournalImpl.cpp | 19 ++---
src/qpid/linearstore/JournalImpl.h | 10 ++-
src/qpid/linearstore/JournalLogImpl.h | 2 +-
src/qpid/linearstore/MessageStoreImpl.cpp | 100 +++++++++++++++----------
src/qpid/linearstore/MessageStoreImpl.h | 16 ++--
src/qpid/linearstore/journal/jcfg.h | 7 +-
7 files changed, 109 insertions(+), 62 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-cpp/blob/34cfb7f0/management/python/bin/qpid-config
----------------------------------------------------------------------
diff --git a/management/python/bin/qpid-config b/management/python/bin/qpid-config
index 793b15a..3055419 100755
--- a/management/python/bin/qpid-config
+++ b/management/python/bin/qpid-config
@@ -115,6 +115,8 @@ class Config:
self._fileSize = None
self._efp_partition_num = None
self._efp_pool_file_size = None
+ self._wcache_page_size = None
+ self._wcache_num_pages = None
self._maxQueueSize = None
self._maxQueueCount = None
self._limitPolicy = None
@@ -149,6 +151,8 @@ FILECOUNT = "qpid.file_count"
FILESIZE = "qpid.file_size"
EFP_PARTITION_NUM = "qpid.efp_partition_num"
EFP_POOL_FILE_SIZE = "qpid.efp_pool_file_size"
+WCACHE_PAGE_SIZE = "qpid.wcache-page-size"
+WCACHE_NUM_PAGES = "qpid.wcache-num-pages"
MAX_QUEUE_SIZE = "qpid.max_size"
MAX_QUEUE_COUNT = "qpid.max_count"
POLICY_TYPE = "qpid.policy_type"
@@ -170,6 +174,7 @@ REPLICATE = "qpid.replicate"
#list
SPECIAL_ARGS=[
FILECOUNT,FILESIZE,EFP_PARTITION_NUM,EFP_POOL_FILE_SIZE,
+ WCACHE_PAGE_SIZE,WCACHE_NUM_PAGES,
MAX_QUEUE_SIZE,MAX_QUEUE_COUNT,POLICY_TYPE,
LVQ_KEY,MSG_SEQUENCE,IVE,
FLOW_STOP_COUNT,FLOW_RESUME_COUNT,FLOW_STOP_SIZE,FLOW_RESUME_SIZE,
@@ -230,6 +235,8 @@ def OptionsAndArguments(argv):
group3.add_option("--file-size", action="store", type="int", metavar="<n>", help="[legactystore] File size in pages (64KiB/page)")
group3.add_option("--efp-partition-num", action="store", type="int", metavar="<n>", help="[linearstore] EFP partition number")
group3.add_option("--efp-pool-file-size", action="store", type="int", metavar="<n>", help="[linearstore] EFP file size (KiB)")
+ group3.add_option("--wcache-page-size", action="store", type="int", metavar="<n>", help="[linearstore] Per-queue buffer page size (kiB), value may be powers of 2 starting at 4 (4, 8, 16, 32...)")
+ group3.add_option("--wcache-num-pages", action="store", type="int", metavar="<n>", help="[linearstore] Per-queue num buffer pages")
group3.add_option("--max-queue-size", action="store", type="int", metavar="<n>", help="Maximum in-memory queue size as bytes")
group3.add_option("--max-queue-count", action="store", type="int", metavar="<n>", help="Maximum in-memory queue size as a number of messages")
group3.add_option("--limit-policy", action="store", choices=["none", "reject", "ring", "ring-strict"], metavar="<policy>", help="Action to take when queue limit is reached")
@@ -312,6 +319,10 @@ def OptionsAndArguments(argv):
config._efp_partition_num = opts.efp_partition_num
if opts.efp_pool_file_size is not None:
config._efp_pool_file_size = opts.efp_pool_file_size
+ if opts.wcache_page_size is not None:
+ config._wcache_page_size = opts.wcache_page_size
+ if opts.wcache_num_pages is not None:
+ config._wcache_num_pages = opts.wcache_num_pages
if opts.max_queue_size is not None:
config._maxQueueSize = opts.max_queue_size
if opts.max_queue_count is not None:
@@ -544,6 +555,8 @@ class BrokerManager:
if FILECOUNT in args: print "--file-count=%s" % args[FILECOUNT],
if EFP_PARTITION_NUM in args: print "--efp-partition-num=%s" % args[EFP_PARTITION_NUM],
if EFP_POOL_FILE_SIZE in args: print "--efp-pool-file-size=%s" % args[EFP_POOL_FILE_SIZE],
+ if WCACHE_PAGE_SIZE in args: print "--wcache-page-size=%s" % args[WCACHE_PAGE_SIZE],
+ if WCACHE_NUM_PAGES in args: print "--wcache-num-pages=%s" % args[WCACHE_NUM_PAGES],
if MAX_QUEUE_SIZE in args: print "--max-queue-size=%s" % args[MAX_QUEUE_SIZE],
if MAX_QUEUE_COUNT in args: print "--max-queue-count=%s" % args[MAX_QUEUE_COUNT],
if POLICY_TYPE in args: print "--limit-policy=%s" % args[POLICY_TYPE].replace("_", "-"),
@@ -629,6 +642,10 @@ class BrokerManager:
declArgs[EFP_PARTITION_NUM] = config._efp_partition_num
if config._efp_pool_file_size:
declArgs[EFP_POOL_FILE_SIZE] = config._efp_pool_file_size
+ if config._wcache_page_size:
+ declArgs[WCACHE_PAGE_SIZE] = config._wcache_page_size
+ if config._wcache_num_pages:
+ declArgs[WCACHE_NUM_PAGES] = config._wcache_num_pages
if config._maxQueueSize is not None:
declArgs[MAX_QUEUE_SIZE] = config._maxQueueSize
http://git-wip-us.apache.org/repos/asf/qpid-cpp/blob/34cfb7f0/src/qpid/linearstore/JournalImpl.cpp
----------------------------------------------------------------------
diff --git a/src/qpid/linearstore/JournalImpl.cpp b/src/qpid/linearstore/JournalImpl.cpp
index b2d4127..5539f39 100644
--- a/src/qpid/linearstore/JournalImpl.cpp
+++ b/src/qpid/linearstore/JournalImpl.cpp
@@ -78,7 +78,6 @@ JournalImpl::JournalImpl(::qpid::sys::Timer& timer_,
initManagement(a);
- QLS_LOG2(info, _jid, "Created");
std::ostringstream oss;
oss << "Journal directory = \"" << journalDirectory << "\"";
QLS_LOG2(debug, _jid, oss.str());
@@ -99,7 +98,7 @@ JournalImpl::~JournalImpl()
_mgmtObject.reset();
}
- QLS_LOG2(info, _jid, "Destroyed");
+ QLS_LOG2(info, _jid, "Stopped");
}
void
@@ -132,19 +131,17 @@ void
JournalImpl::initialize(::qpid::linearstore::journal::EmptyFilePool* efpp_,
const uint16_t wcache_num_pages,
const uint32_t wcache_pgsize_sblks,
- ::qpid::linearstore::journal::aio_callback* const cbp)
+ ::qpid::linearstore::journal::aio_callback* const cbp,
+ const std::string& nonDefaultParamsMsg)
{
// efpp->createJournal(_jdir);
// QLS_LOG2(info, _jid, "Initialized");
-// std::ostringstream oss;
-//// oss << "Initialize; num_jfiles=" << num_jfiles << " jfsize_sblks=" << jfsize_sblks;
-// oss << "Initialize; efpPartitionNumber=" << efpp_->getPartitionNumber();
-// oss << " efpFileSizeKb=" << efpp_->fileSizeKib();
-// oss << " wcache_pgsize_sblks=" << wcache_pgsize_sblks;
-// oss << " wcache_num_pages=" << wcache_num_pages;
-// QLS_LOG2(debug, _jid, oss.str());
jcntl::initialize(efpp_, wcache_num_pages, wcache_pgsize_sblks, cbp);
-// QLS_LOG2(debug, _jid, "Initialization complete");
+ if (nonDefaultParamsMsg.size() > 0) {
+ QLS_LOG2(info, _jid, "Created, parameters:" << nonDefaultParamsMsg);
+ } else {
+ QLS_LOG2(info, _jid, "Created");
+ }
// TODO: replace for linearstore: _lpmgr
/*
if (_mgmtObject.get() != 0)
http://git-wip-us.apache.org/repos/asf/qpid-cpp/blob/34cfb7f0/src/qpid/linearstore/JournalImpl.h
----------------------------------------------------------------------
diff --git a/src/qpid/linearstore/JournalImpl.h b/src/qpid/linearstore/JournalImpl.h
index 6675792..7c9d28e 100644
--- a/src/qpid/linearstore/JournalImpl.h
+++ b/src/qpid/linearstore/JournalImpl.h
@@ -108,15 +108,17 @@ class JournalImpl : public ::qpid::broker::ExternalQueueStore,
void initManagement(::qpid::management::ManagementAgent* agent);
- void initialize(::qpid::linearstore::journal::EmptyFilePool* efp,
+ void initialize(::qpid::linearstore::journal::EmptyFilePool* efpp,
const uint16_t wcache_num_pages,
const uint32_t wcache_pgsize_sblks,
- ::qpid::linearstore::journal::aio_callback* const cbp);
+ ::qpid::linearstore::journal::aio_callback* const cbp,
+ const std::string& nonDefaultParamsMsg);
inline void initialize(::qpid::linearstore::journal::EmptyFilePool* efpp,
const uint16_t wcache_num_pages,
- const uint32_t wcache_pgsize_sblks) {
- initialize(efpp, wcache_num_pages, wcache_pgsize_sblks, this);
+ const uint32_t wcache_pgsize_sblks,
+ const std::string& nonDefaultParamsMsg) {
+ initialize(efpp, wcache_num_pages, wcache_pgsize_sblks, this, nonDefaultParamsMsg);
}
void recover(boost::shared_ptr< ::qpid::linearstore::journal::EmptyFilePoolManager> efpm,
http://git-wip-us.apache.org/repos/asf/qpid-cpp/blob/34cfb7f0/src/qpid/linearstore/JournalLogImpl.h
----------------------------------------------------------------------
diff --git a/src/qpid/linearstore/JournalLogImpl.h b/src/qpid/linearstore/JournalLogImpl.h
index 846eaac..90e914c 100644
--- a/src/qpid/linearstore/JournalLogImpl.h
+++ b/src/qpid/linearstore/JournalLogImpl.h
@@ -25,7 +25,7 @@
#include "qpid/linearstore/journal/JournalLog.h"
#define QLS_LOG(level, msg) QPID_LOG(level, "Linear Store: " << msg)
-#define QLS_LOG2(level, queue, msg) QPID_LOG(level, "Linear Store: Journal \"" << queue << "\":" << msg)
+#define QLS_LOG2(level, queue, msg) QPID_LOG(level, "Linear Store: Journal \"" << queue << "\": " << msg)
namespace qpid {
namespace linearstore {
http://git-wip-us.apache.org/repos/asf/qpid-cpp/blob/34cfb7f0/src/qpid/linearstore/MessageStoreImpl.cpp
----------------------------------------------------------------------
diff --git a/src/qpid/linearstore/MessageStoreImpl.cpp b/src/qpid/linearstore/MessageStoreImpl.cpp
index 4aa0b40..e3a8b4c 100644
--- a/src/qpid/linearstore/MessageStoreImpl.cpp
+++ b/src/qpid/linearstore/MessageStoreImpl.cpp
@@ -21,6 +21,7 @@
#include "qpid/linearstore/MessageStoreImpl.h"
+#include <cmath>
#include "qpid/broker/Broker.h"
#include "qpid/framing/FieldValue.h"
#include "qpid/linearstore/BindingDbt.h"
@@ -72,44 +73,29 @@ MessageStoreImpl::MessageStoreImpl(qpid::broker::Broker* broker_, const char* en
::srand(::time(NULL));
}
-uint32_t MessageStoreImpl::chkJrnlWrPageCacheSize(const uint32_t param_, const std::string& paramName_)
-{
+uint32_t MessageStoreImpl::chkJrnlWrPageCacheSize(const uint32_t param_, const std::string& paramName_) {
uint32_t p = param_;
if (p == 0) {
// For zero value, use default
p = QLS_WMGR_DEF_PAGE_SIZE_KIB;
- QLS_LOG(warning, "parameter " << paramName_ << " (" << param_ << ") must be a power of 2 between 1 and 128; changing this parameter to default value (" << p << ")");
- } else if ( p > 128 || p < 4 || (p & (p-1)) ) {
- // For any positive value that is not a power of 2, use closest value
- if (p < 6) p = 4;
- else if (p < 12) p = 8;
- else if (p < 24) p = 16;
- else if (p < 48) p = 32;
- else if (p < 96) p = 64;
- else p = 128;
+ } else if ( p < 4 ) {
+ p = 4;
+ } else if (p & (p-1)) {
+ p = std::pow(2, std::round(std::log2(p)));
QLS_LOG(warning, "parameter " << paramName_ << " (" << param_ << ") must be a power of 2 between 4 and 128; changing this parameter to closest allowable value (" << p << ")");
}
return p;
}
-uint16_t MessageStoreImpl::getJrnlWrNumPages(const uint32_t wrPageSizeKib_)
-{
- uint32_t wrPageSizeSblks = wrPageSizeKib_ / QLS_SBLK_SIZE_KIB; // convert from KiB to number sblks
- uint32_t defTotWCacheSizeSblks = QLS_WMGR_DEF_PAGE_SIZE_SBLKS * QLS_WMGR_DEF_PAGES;
- switch (wrPageSizeKib_)
- {
- case 4:
- // 256 KiB total cache
- return defTotWCacheSizeSblks / wrPageSizeSblks / 4;
- case 8:
- case 16:
- // 512 KiB total cache
- return defTotWCacheSizeSblks / wrPageSizeSblks / 2;
- default: // 32, 64, 128
- // 1 MiB total cache
- return defTotWCacheSizeSblks / wrPageSizeSblks;
- }
+uint16_t MessageStoreImpl::chkJrnlWrCacheNumPages(const uint16_t param_, const std::string& paramName_) {
+ uint16_t p = param_;
+
+ if (p >= 4)
+ return p;
+ p = 4;
+ QLS_LOG(warning, "parameter " << paramName_ << " must have a minimum value of 4. Changing this parameter from " << param_ << " to " << p << ".");
+ return p;
}
qpid::linearstore::journal::efpPartitionNumber_t MessageStoreImpl::chkEfpPartition(const qpid::linearstore::journal::efpPartitionNumber_t partition_,
@@ -167,12 +153,21 @@ bool MessageStoreImpl::init(const qpid::Options* options_)
qpid::linearstore::journal::efpPartitionNumber_t efpPartition = chkEfpPartition(opts->efpPartition, "efp-partition");
qpid::linearstore::journal::efpDataSize_kib_t efpFilePoolSize_kib = chkEfpFileSizeKiB(opts->efpFileSizeKib, "efp-file-size");
uint32_t jrnlWrCachePageSizeKib = chkJrnlWrPageCacheSize(opts->wCachePageSizeKib, "wcache-page-size");
+ uint16_t jrnlWrCacheNumPages = chkJrnlWrCacheNumPages(opts->wCacheNumPages, "wcache-num-pages");
uint32_t tplJrnlWrCachePageSizeKib = chkJrnlWrPageCacheSize(opts->tplWCachePageSizeKib, "tpl-wcache-page-size");
+ uint16_t tplJrnlWrCacheNumPages = chkJrnlWrCacheNumPages(opts->tplWCacheNumPages, "tpl-wcache-num-pages");
journalFlushTimeout = opts->journalFlushTimeout;
// Pass option values to init()
- return init(opts->storeDir, efpPartition, efpFilePoolSize_kib, opts->truncateFlag, jrnlWrCachePageSizeKib,
- tplJrnlWrCachePageSizeKib, opts->overwriteBeforeReturnFlag);
+ return init(opts->storeDir,
+ efpPartition,
+ efpFilePoolSize_kib,
+ opts->truncateFlag,
+ jrnlWrCachePageSizeKib,
+ jrnlWrCacheNumPages,
+ tplJrnlWrCachePageSizeKib,
+ tplJrnlWrCacheNumPages,
+ opts->overwriteBeforeReturnFlag);
}
// These params, taken from options, are assumed to be correct and verified
@@ -181,7 +176,9 @@ bool MessageStoreImpl::init(const std::string& storeDir_,
qpid::linearstore::journal::efpDataSize_kib_t efpFileSize_kib_,
const bool truncateFlag_,
uint32_t wCachePageSizeKib_,
+ uint16_t wCacheNumPages_,
uint32_t tplWCachePageSizeKib_,
+ uint16_t tplWCacheNumPages_,
const bool overwriteBeforeReturnFlag_)
{
if (isInit) return true;
@@ -191,9 +188,9 @@ bool MessageStoreImpl::init(const std::string& storeDir_,
defaultEfpPartitionNumber = efpPartition_;
defaultEfpFileSize_kib = efpFileSize_kib_;
wCachePgSizeSblks = wCachePageSizeKib_ / QLS_SBLK_SIZE_KIB; // convert from KiB to number sblks
- wCacheNumPages = getJrnlWrNumPages(wCachePageSizeKib_);
+ wCacheNumPages = wCacheNumPages_;
tplWCachePgSizeSblks = tplWCachePageSizeKib_ / QLS_SBLK_SIZE_KIB; // convert from KiB to number sblks
- tplWCacheNumPages = getJrnlWrNumPages(tplWCachePageSizeKib_);
+ tplWCacheNumPages = tplWCacheNumPages_;
if (storeDir_.size()>0) storeDir = storeDir_;
if (truncateFlag_)
@@ -349,7 +346,7 @@ void MessageStoreImpl::chkTplStoreInit()
qpid::sys::Mutex::ScopedLock sl(tplInitLock);
if (!tplStorePtr->is_ready()) {
qpid::linearstore::journal::jdir::create_dir(getTplBaseDir());
- tplStorePtr->initialize(getEmptyFilePool(defaultEfpPartitionNumber, defaultEfpFileSize_kib), tplWCacheNumPages, tplWCachePgSizeSblks);
+ tplStorePtr->initialize(getEmptyFilePool(defaultEfpPartitionNumber, defaultEfpFileSize_kib), tplWCacheNumPages, tplWCachePgSizeSblks, "");
if (mgmtObject.get() != 0) mgmtObject->set_tplIsInitialized(true);
}
}
@@ -395,7 +392,7 @@ MessageStoreImpl::~MessageStoreImpl()
void MessageStoreImpl::create(qpid::broker::PersistableQueue& queue_,
const qpid::framing::FieldTable& args_)
{
- QLS_LOG(info, "*** MessageStoreImpl::create() queue=\"" << queue_.getName() << "\""); // DEBUG
+ QLS_LOG(debug, "*** MessageStoreImpl::create() queue=\"" << queue_.getName() << "\"");
checkInit();
if (queue_.getPersistenceId()) {
THROW_STORE_EXCEPTION("Queue already created: " + queue_.getName());
@@ -408,6 +405,23 @@ void MessageStoreImpl::create(qpid::broker::PersistableQueue& queue_,
return;
}
+ // Check if QMF has changed default values for write cache parameters for this queue
+ std::ostringstream oss; // Used to report non-standard options as they are discovered
+
+ uint16_t localWCacheNumPages = wCacheNumPages;
+ qpid::framing::FieldTable::ValuePtr value = args_.get("qpid.wcache-num-pages");
+ if (value.get() != 0 && !value->empty() && value->convertsTo<int>()) {
+ localWCacheNumPages = chkJrnlWrCacheNumPages(u_int16_t(value->get<int>()), "qpid.wcache-num-pages");
+ oss << " qpid.wcache-num-pages=" << localWCacheNumPages;
+ }
+
+ uint32_t localWCachePgSizeSblks = wCachePgSizeSblks;
+ value = args_.get("qpid.wcache-page-size");
+ if (value.get() != 0 && !value->empty() && value->convertsTo<int>()) {
+ localWCachePgSizeSblks = chkJrnlWrPageCacheSize(u_int32_t(value->get<int>()), "qpid.wcache-page-size") / QLS_SBLK_SIZE_KIB;
+ oss << " qpid.wcache-page-size=" << localWCachePgSizeSblks;
+ }
+
jQueue = new JournalImpl(broker->getTimer(), queue_.getName(), getJrnlDir(queue_.getName()), jrnlLog,
defJournalGetEventsTimeoutNs, journalFlushTimeout, agent,
boost::bind(&MessageStoreImpl::journalDeleted, this, _1));
@@ -418,7 +432,8 @@ void MessageStoreImpl::create(qpid::broker::PersistableQueue& queue_,
queue_.setExternalQueueStore(dynamic_cast<qpid::broker::ExternalQueueStore*>(jQueue));
try {
- jQueue->initialize(getEmptyFilePool(args_), wCacheNumPages, wCachePgSizeSblks);
+ journal::EmptyFilePool* efpp = getEmptyFilePool(args_, oss);
+ jQueue->initialize(efpp, localWCacheNumPages, localWCachePgSizeSblks, oss.str());
} catch (const qpid::linearstore::journal::jexception& e) {
THROW_STORE_EXCEPTION(std::string("Queue ") + queue_.getName() + ": create() failed: " + e.what());
}
@@ -444,18 +459,20 @@ MessageStoreImpl::getEmptyFilePool(const qpid::linearstore::journal::efpPartitio
}
qpid::linearstore::journal::EmptyFilePool*
-MessageStoreImpl::getEmptyFilePool(const qpid::framing::FieldTable& args_) {
+MessageStoreImpl::getEmptyFilePool(const qpid::framing::FieldTable& args_, std::ostringstream& oss) {
qpid::framing::FieldTable::ValuePtr value;
qpid::linearstore::journal::efpPartitionNumber_t localEfpPartition = defaultEfpPartitionNumber;
value = args_.get("qpid.efp_partition_num");
if (value.get() != 0 && !value->empty() && value->convertsTo<int>()) {
localEfpPartition = chkEfpPartition((uint32_t)value->get<int>(), "qpid.efp_partition_num");
+ oss << " qpid.efp_partition_num=" << localEfpPartition;
}
qpid::linearstore::journal::efpDataSize_kib_t localEfpFileSizeKib = defaultEfpFileSize_kib;
value = args_.get("qpid.efp_pool_file_size");
if (value.get() != 0 && !value->empty() && value->convertsTo<int>()) {
localEfpFileSizeKib = chkEfpFileSizeKiB((uint32_t)value->get<int>(), "qpid.efp_pool_file_size");
+ oss << " qpid.efp_pool_file_size=" << localEfpFileSizeKib;
}
return getEmptyFilePool(localEfpPartition, localEfpFileSizeKib);
}
@@ -1522,7 +1539,9 @@ MessageStoreImpl::StoreOptions::StoreOptions(const std::string& name_) :
qpid::Options(name_),
truncateFlag(defTruncateFlag),
wCachePageSizeKib(defWCachePageSizeKib),
+ wCacheNumPages(defWCacheNumPages),
tplWCachePageSizeKib(defTplWCachePageSizeKib),
+ tplWCacheNumPages(defTplWCacheNumPages),
efpPartition(defEfpPartition),
efpFileSizeKib(defEfpFileSizeKib),
overwriteBeforeReturnFlag(defOverwriteBeforeReturnFlag),
@@ -1537,12 +1556,17 @@ MessageStoreImpl::StoreOptions::StoreOptions(const std::string& name_) :
"the existing store files for recovery.")
("wcache-page-size", qpid::optValue(wCachePageSizeKib, "N"),
"Size of the pages in the write page cache in KiB. "
- "Allowable values - powers of 2: 1, 2, 4, ... , 128. "
+ "Allowable values - powers of 2 starting at 4 (4, 8, 16, 32...) "
"Lower values decrease latency at the expense of throughput.")
+ ("wcache-num-pages", qpid::optValue(wCacheNumPages, "N"),
+ "Number of pages in the write page cache. Minimum value: 4.")
("tpl-wcache-page-size", qpid::optValue(tplWCachePageSizeKib, "N"),
"Size of the pages in the transaction prepared list write page cache in KiB. "
- "Allowable values - powers of 2: 4, 8, 16, 32, 64, 128. "
+ "Allowable values - powers of 2 starting at: 4 (4, 8, 16, 32...) "
"Lower values decrease latency at the expense of throughput.")
+ ("tpl-wcache-num-pages", qpid::optValue(tplWCacheNumPages, "N"),
+ "Number of pages in the transaction prepared list write page cache. "
+ "Minimum value: 4.")
("efp-partition", qpid::optValue(efpPartition, "N"),
"Empty File Pool partition to use for finding empty journal files")
("efp-file-size", qpid::optValue(efpFileSizeKib, "N"),
http://git-wip-us.apache.org/repos/asf/qpid-cpp/blob/34cfb7f0/src/qpid/linearstore/MessageStoreImpl.h
----------------------------------------------------------------------
diff --git a/src/qpid/linearstore/MessageStoreImpl.h b/src/qpid/linearstore/MessageStoreImpl.h
index 8b11e11..0025100 100644
--- a/src/qpid/linearstore/MessageStoreImpl.h
+++ b/src/qpid/linearstore/MessageStoreImpl.h
@@ -80,7 +80,9 @@ class MessageStoreImpl : public qpid::broker::MessageStore, public qpid::managem
std::string storeDir;
bool truncateFlag;
uint32_t wCachePageSizeKib;
+ uint16_t wCacheNumPages;
uint32_t tplWCachePageSizeKib;
+ uint16_t tplWCacheNumPages;
uint16_t efpPartition;
uint64_t efpFileSizeKib;
bool overwriteBeforeReturnFlag;
@@ -101,7 +103,9 @@ class MessageStoreImpl : public qpid::broker::MessageStore, public qpid::managem
// Default store settings
static const bool defTruncateFlag = false;
static const uint32_t defWCachePageSizeKib = QLS_WMGR_DEF_PAGE_SIZE_KIB;
- static const uint32_t defTplWCachePageSizeKib = defWCachePageSizeKib / 8;
+ static const uint16_t defWCacheNumPages = QLS_WMGR_DEF_NUM_PAGES;
+ static const uint32_t defTplWCachePageSizeKib = QLS_WMGR_DEF_TPL_PAGE_SIZE_KIB;
+ static const uint16_t defTplWCacheNumPages = QLS_WMGR_DEF_TPL_NUM_PAGES;
static const uint16_t defEfpPartition = 1;
static const uint64_t defEfpFileSizeKib = 512 * QLS_SBLK_SIZE_KIB;
static const bool defOverwriteBeforeReturnFlag = false;
@@ -153,9 +157,9 @@ class MessageStoreImpl : public qpid::broker::MessageStore, public qpid::managem
// Parameter validation and calculation
static uint32_t chkJrnlWrPageCacheSize(const uint32_t param,
- const std::string& paramName/*,
- const uint16_t jrnlFsizePgs*/);
- static uint16_t getJrnlWrNumPages(const uint32_t wrPageSizeKiB);
+ const std::string& paramName);
+ static uint16_t chkJrnlWrCacheNumPages(const uint16_t param,
+ const std::string& paramName);
static qpid::linearstore::journal::efpPartitionNumber_t chkEfpPartition(const qpid::linearstore::journal::efpPartitionNumber_t partition,
const std::string& paramName);
static qpid::linearstore::journal::efpDataSize_kib_t chkEfpFileSizeKiB(const qpid::linearstore::journal::efpDataSize_kib_t efpFileSizeKiB,
@@ -233,7 +237,7 @@ class MessageStoreImpl : public qpid::broker::MessageStore, public qpid::managem
void createJrnlQueue(const qpid::broker::PersistableQueue& queue);
std::string getJrnlDir(const std::string& queueName);
qpid::linearstore::journal::EmptyFilePool* getEmptyFilePool(const qpid::linearstore::journal::efpPartitionNumber_t p, const qpid::linearstore::journal::efpDataSize_kib_t s);
- qpid::linearstore::journal::EmptyFilePool* getEmptyFilePool(const qpid::framing::FieldTable& args);
+ qpid::linearstore::journal::EmptyFilePool* getEmptyFilePool(const qpid::framing::FieldTable& args, std::ostringstream& oss);
std::string getStoreTopLevelDir();
std::string getJrnlBaseDir();
std::string getBdbBaseDir();
@@ -258,7 +262,9 @@ class MessageStoreImpl : public qpid::broker::MessageStore, public qpid::managem
qpid::linearstore::journal::efpDataSize_kib_t efpFileSizeKib = defEfpFileSizeKib,
const bool truncateFlag = false,
uint32_t wCachePageSize = defWCachePageSizeKib,
+ uint16_t wCacheNumPages = defWCacheNumPages,
uint32_t tplWCachePageSize = defTplWCachePageSizeKib,
+ uint16_t tplWCacheNumPages = defTplWCacheNumPages,
const bool overwriteBeforeReturnFlag_ = false);
void truncateInit();
http://git-wip-us.apache.org/repos/asf/qpid-cpp/blob/34cfb7f0/src/qpid/linearstore/journal/jcfg.h
----------------------------------------------------------------------
diff --git a/src/qpid/linearstore/journal/jcfg.h b/src/qpid/linearstore/journal/jcfg.h
index b33a419..322b45d 100644
--- a/src/qpid/linearstore/journal/jcfg.h
+++ b/src/qpid/linearstore/journal/jcfg.h
@@ -38,9 +38,10 @@
#define QLS_SBLK_SIZE_DBLKS (QLS_SBLK_SIZE_BYTES / QLS_DBLK_SIZE_BYTES) /**< Disk softblock size in multiples of QLS_DBLK_SIZE_BYTES */
#define QLS_SBLK_SIZE_KIB (QLS_SBLK_SIZE_BYTES / 1024) /**< Disk softblock size in KiB */
-#define QLS_WMGR_DEF_PAGE_SIZE_KIB 32 /**< Journal write page size in KiB (default) */
-#define QLS_WMGR_DEF_PAGE_SIZE_SBLKS (QLS_WMGR_DEF_PAGE_SIZE_KIB / QLS_SBLK_SIZE_KIB) /**< Journal write page size in softblocks (default) */
-#define QLS_WMGR_DEF_PAGES 32 /**< Number of pages to use in wmgr (default) */
+#define QLS_WMGR_DEF_PAGE_SIZE_KIB 16 /**< Journal buffer page size in KiB (default) */
+#define QLS_WMGR_DEF_NUM_PAGES 16 /**< Number of buffer pages to use in wmgr (default) */
+#define QLS_WMGR_DEF_TPL_PAGE_SIZE_KIB 4 /**< TPL buffer page size in KiB (default) */
+#define QLS_WMGR_DEF_TPL_NUM_PAGES 16 /**< Number of TPL buffer pages to use in wmgr (default) */
#define QLS_WMGR_MAXDTOKPP 1024 /**< Max. dtoks (data blocks) per page in wmgr */
#define QLS_WMGR_MAXWAITUS 100 /**< Max. wait time (us) before submitting AIO */
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org