You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ch...@apache.org on 2013/01/07 21:17:19 UTC
svn commit: r1429990 - in /qpid/trunk/qpid/cpp/src/qpid/legacystore:
JournalImpl.cpp JournalImpl.h MessageStoreImpl.cpp MessageStoreImpl.h
StorePlugin.cpp
Author: chug
Date: Mon Jan 7 20:17:19 2013
New Revision: 1429990
URL: http://svn.apache.org/viewvc?rev=1429990&view=rev
Log:
QPID-1726 ASF licensed Qpid store
Update legacystore from changes in source svn revisions 4514 to 4528.
Modified:
qpid/trunk/qpid/cpp/src/qpid/legacystore/JournalImpl.cpp
qpid/trunk/qpid/cpp/src/qpid/legacystore/JournalImpl.h
qpid/trunk/qpid/cpp/src/qpid/legacystore/MessageStoreImpl.cpp
qpid/trunk/qpid/cpp/src/qpid/legacystore/MessageStoreImpl.h
qpid/trunk/qpid/cpp/src/qpid/legacystore/StorePlugin.cpp
Modified: qpid/trunk/qpid/cpp/src/qpid/legacystore/JournalImpl.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/legacystore/JournalImpl.cpp?rev=1429990&r1=1429989&r2=1429990&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/legacystore/JournalImpl.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/legacystore/JournalImpl.cpp Mon Jan 7 20:17:19 2013
@@ -68,7 +68,6 @@ JournalImpl::JournalImpl(qpid::sys::Time
_dlen(0),
_dtok(),
_external(false),
- _mgmtObject(),
deleteCallback(onDelete)
{
getEventsFireEventsPtr = new GetEventsFireEvent(this, getEventsTimeout);
@@ -97,8 +96,9 @@ JournalImpl::~JournalImpl()
inactivityFireEventPtr->cancel();
free_read_buffers();
- if (_mgmtObject != 0) {
+ if (_mgmtObject.get() != 0) {
_mgmtObject->resourceDestroy();
+ _mgmtObject.reset();
}
log(LOG_NOTICE, "Destroyed");
@@ -148,7 +148,7 @@ JournalImpl::initialize(const u_int16_t
jcntl::initialize(num_jfiles, auto_expand, ae_max_jfiles, jfsize_sblks, wcache_num_pages, wcache_pgsize_sblks, cbp);
log(LOG_DEBUG, "Initialization complete");
- if (_mgmtObject != 0)
+ if (_mgmtObject.get() != 0)
{
_mgmtObject->set_initialFileCount(_lpmgr.num_jfiles());
_mgmtObject->set_autoExpand(_lpmgr.is_ae());
@@ -182,7 +182,7 @@ JournalImpl::recover(const u_int16_t num
oss1 << " wcache_num_pages=" << wcache_num_pages;
log(LOG_DEBUG, oss1.str());
- if (_mgmtObject != 0)
+ if (_mgmtObject.get() != 0)
{
_mgmtObject->set_initialFileCount(_lpmgr.num_jfiles());
_mgmtObject->set_autoExpand(_lpmgr.is_ae());
@@ -227,7 +227,7 @@ JournalImpl::recover(const u_int16_t num
oss2 << "; journal now read-only.";
log(LOG_DEBUG, oss2.str());
- if (_mgmtObject != 0)
+ if (_mgmtObject.get() != 0)
{
_mgmtObject->inc_recordDepth(_emap.size());
_mgmtObject->inc_enqueues(_emap.size());
@@ -348,7 +348,7 @@ JournalImpl::enqueue_data_record(const v
{
handleIoResult(jcntl::enqueue_data_record(data_buff, tot_data_len, this_data_len, dtokp, transient));
- if (_mgmtObject != 0)
+ if (_mgmtObject.get() != 0)
{
_mgmtObject->inc_enqueues();
_mgmtObject->inc_recordDepth();
@@ -361,7 +361,7 @@ JournalImpl::enqueue_extern_data_record(
{
handleIoResult(jcntl::enqueue_extern_data_record(tot_data_len, dtokp, transient));
- if (_mgmtObject != 0)
+ if (_mgmtObject.get() != 0)
{
_mgmtObject->inc_enqueues();
_mgmtObject->inc_recordDepth();
@@ -372,11 +372,11 @@ void
JournalImpl::enqueue_txn_data_record(const void* const data_buff, const size_t tot_data_len,
const size_t this_data_len, data_tok* dtokp, const std::string& xid, const bool transient)
{
- bool txn_incr = _mgmtObject != 0 ? _tmap.in_map(xid) : false;
+ bool txn_incr = _mgmtObject.get() != 0 ? _tmap.in_map(xid) : false;
handleIoResult(jcntl::enqueue_txn_data_record(data_buff, tot_data_len, this_data_len, dtokp, xid, transient));
- if (_mgmtObject != 0)
+ if (_mgmtObject.get() != 0)
{
if (!txn_incr) // If this xid was not in _tmap, it will be now...
_mgmtObject->inc_txn();
@@ -390,11 +390,11 @@ void
JournalImpl::enqueue_extern_txn_data_record(const size_t tot_data_len, data_tok* dtokp,
const std::string& xid, const bool transient)
{
- bool txn_incr = _mgmtObject != 0 ? _tmap.in_map(xid) : false;
+ bool txn_incr = _mgmtObject.get() != 0 ? _tmap.in_map(xid) : false;
handleIoResult(jcntl::enqueue_extern_txn_data_record(tot_data_len, dtokp, xid, transient));
- if (_mgmtObject != 0)
+ if (_mgmtObject.get() != 0)
{
if (!txn_incr) // If this xid was not in _tmap, it will be now...
_mgmtObject->inc_txn();
@@ -409,7 +409,7 @@ JournalImpl::dequeue_data_record(data_to
{
handleIoResult(jcntl::dequeue_data_record(dtokp, txn_coml_commit));
- if (_mgmtObject != 0)
+ if (_mgmtObject.get() != 0)
{
_mgmtObject->inc_dequeues();
_mgmtObject->inc_txnDequeues();
@@ -420,11 +420,11 @@ JournalImpl::dequeue_data_record(data_to
void
JournalImpl::dequeue_txn_data_record(data_tok* const dtokp, const std::string& xid, const bool txn_coml_commit)
{
- bool txn_incr = _mgmtObject != 0 ? _tmap.in_map(xid) : false;
+ bool txn_incr = _mgmtObject.get() != 0 ? _tmap.in_map(xid) : false;
handleIoResult(jcntl::dequeue_txn_data_record(dtokp, xid, txn_coml_commit));
- if (_mgmtObject != 0)
+ if (_mgmtObject.get() != 0)
{
if (!txn_incr) // If this xid was not in _tmap, it will be now...
_mgmtObject->inc_txn();
@@ -439,7 +439,7 @@ JournalImpl::txn_abort(data_tok* const d
{
handleIoResult(jcntl::txn_abort(dtokp, xid));
- if (_mgmtObject != 0)
+ if (_mgmtObject.get() != 0)
{
_mgmtObject->dec_txn();
_mgmtObject->inc_txnAborts();
@@ -451,7 +451,7 @@ JournalImpl::txn_commit(data_tok* const
{
handleIoResult(jcntl::txn_commit(dtokp, xid));
- if (_mgmtObject != 0)
+ if (_mgmtObject.get() != 0)
{
_mgmtObject->dec_txn();
_mgmtObject->inc_txnCommits();
@@ -466,8 +466,9 @@ JournalImpl::stop(bool block_till_aio_cm
ifep->cancel();
jcntl::stop(block_till_aio_cmpl);
- if (_mgmtObject != 0) {
+ if (_mgmtObject.get() != 0) {
_mgmtObject->resourceDestroy();
+ _mgmtObject.reset();
}
}
Modified: qpid/trunk/qpid/cpp/src/qpid/legacystore/JournalImpl.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/legacystore/JournalImpl.h?rev=1429990&r1=1429989&r2=1429990&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/legacystore/JournalImpl.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/legacystore/JournalImpl.h Mon Jan 7 20:17:19 2013
@@ -166,7 +166,7 @@ class JournalImpl : public qpid::broker:
bool loadMsgContent(u_int64_t rid, std::string& data, size_t length, size_t offset = 0);
// Overrides for write inactivity timer
- void enqueue_data_record(const void* const data_buffGetManagementObject, const size_t tot_data_len,
+ void enqueue_data_record(const void* const data_buff, const size_t tot_data_len,
const size_t this_data_len, mrg::journal::data_tok* dtokp,
const bool transient = false);
@@ -227,10 +227,10 @@ class JournalImpl : public qpid::broker:
// Management instrumentation callbacks overridden from jcntl
inline void instr_incr_outstanding_aio_cnt() {
- if (_mgmtObject != 0) _mgmtObject->inc_outstandingAIOs();
+ if (_mgmtObject.get() != 0) _mgmtObject->inc_outstandingAIOs();
}
inline void instr_decr_outstanding_aio_cnt() {
- if (_mgmtObject != 0) _mgmtObject->dec_outstandingAIOs();
+ if (_mgmtObject.get() != 0) _mgmtObject->dec_outstandingAIOs();
}
}; // class JournalImpl
Modified: qpid/trunk/qpid/cpp/src/qpid/legacystore/MessageStoreImpl.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/legacystore/MessageStoreImpl.cpp?rev=1429990&r1=1429989&r2=1429990&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/legacystore/MessageStoreImpl.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/legacystore/MessageStoreImpl.cpp Mon Jan 7 20:17:19 2013
@@ -57,7 +57,7 @@ MessageStoreImpl::TplRecoverStruct::TplR
tpc_flag(_tpc_flag)
{}
-MessageStoreImpl::MessageStoreImpl(qpid::sys::Timer& timer_, const char* envpath) :
+MessageStoreImpl::MessageStoreImpl(qpid::broker::Broker* broker_, const char* envpath) :
numJrnlFiles(0),
autoJrnlExpand(false),
autoJrnlExpandMaxFiles(0),
@@ -72,7 +72,7 @@ MessageStoreImpl::MessageStoreImpl(qpid:
highestRid(0),
isInit(false),
envPath(envpath),
- timer(timer_),
+ broker(broker_),
mgmtObject(),
agent(0)
{}
@@ -218,7 +218,7 @@ void MessageStoreImpl::chkJrnlAutoExpand
autoJrnlExpandMaxFiles = p;
}
-void MessageStoreImpl::initManagement (qpid::broker::Broker* broker)
+void MessageStoreImpl::initManagement ()
{
if (broker != 0) {
agent = broker->getManagementAgent();
@@ -364,7 +364,7 @@ void MessageStoreImpl::init()
// NOTE: during normal initialization, agent == 0 because the store is initialized before the management infrastructure.
// However during a truncated initialization in a cluster, agent != 0. We always pass 0 as the agent for the
// TplStore to keep things consistent in a cluster. See https://bugzilla.redhat.com/show_bug.cgi?id=681026
- tplStorePtr.reset(new TplJournalImpl(timer, "TplStore", getTplBaseDir(), "tpl", defJournalGetEventsTimeout, defJournalFlushTimeout, 0));
+ tplStorePtr.reset(new TplJournalImpl(broker->getTimer(), "TplStore", getTplBaseDir(), "tpl", defJournalGetEventsTimeout, defJournalFlushTimeout, 0));
isInit = true;
} catch (const DbException& e) {
if (e.get_errno() == DB_VERSION_MISMATCH)
@@ -402,8 +402,9 @@ void MessageStoreImpl::finalize()
}
}
- if (mgmtObject != 0) {
+ if (mgmtObject.get() != 0) {
mgmtObject->resourceDestroy();
+ mgmtObject.reset();
}
}
@@ -443,7 +444,7 @@ void MessageStoreImpl::chkTplStoreInit()
if (!tplStorePtr->is_ready()) {
journal::jdir::create_dir(getTplBaseDir());
tplStorePtr->initialize(tplNumJrnlFiles, false, 0, tplJrnlFsizeSblks, tplWCacheNumPages, tplWCachePgSizeSblks);
- if (mgmtObject != 0) mgmtObject->set_tplIsInitialized(true);
+ if (mgmtObject.get() != 0) mgmtObject->set_tplIsInitialized(true);
}
}
@@ -479,8 +480,9 @@ MessageStoreImpl::~MessageStoreImpl()
QPID_LOG(error, "Unknown error in MessageStoreImpl::~MessageStoreImpl()");
}
- if (mgmtObject != 0) {
+ if (mgmtObject.get() != 0) {
mgmtObject->resourceDestroy();
+ mgmtObject.reset();
}
}
@@ -513,7 +515,7 @@ void MessageStoreImpl::create(qpid::brok
return;
}
- jQueue = new JournalImpl(timer, queue.getName(), getJrnlDir(queue), std::string("JournalData"),
+ jQueue = new JournalImpl(broker->getTimer(), queue.getName(), getJrnlDir(queue), std::string("JournalData"),
defJournalGetEventsTimeout, defJournalFlushTimeout, agent,
boost::bind(&MessageStoreImpl::journalDeleted, this, _1));
{
@@ -705,7 +707,7 @@ void MessageStoreImpl::recover(qpid::bro
//recover transactions:
for (txn_list::iterator i = prepared.begin(); i != prepared.end(); i++) {
const PreparedTransaction pt = *i;
- if (mgmtObject != 0) {
+ if (mgmtObject.get() != 0) {
mgmtObject->inc_tplTransactionDepth();
mgmtObject->inc_tplTxnPrepares();
}
@@ -799,7 +801,7 @@ void MessageStoreImpl::recoverQueues(Txn
QPID_LOG(error, "Cannot recover empty (null) queue name - ignoring and attempting to continue.");
break;
}
- jQueue = new JournalImpl(timer, queueName, getJrnlHashDir(queueName), std::string("JournalData"),
+ jQueue = new JournalImpl(broker->getTimer(), queueName, getJrnlHashDir(queueName), std::string("JournalData"),
defJournalGetEventsTimeout, defJournalFlushTimeout, agent,
boost::bind(&MessageStoreImpl::journalDeleted, this, _1));
{
@@ -979,6 +981,8 @@ void MessageStoreImpl::recoverMessages(T
// At some future point if delivery attempts are stored, then this call would
// become optional depending on that information.
msg->setRedelivered();
+ // Reset the TTL for the recovered message
+ msg->computeExpiration(broker->getExpiryPolicy());
u_int32_t contentOffset = headerSize + preambleLength;
u_int64_t contentSize = readSize - contentOffset;
@@ -1436,7 +1440,7 @@ void MessageStoreImpl::completed(TxnCtxt
tplStorePtr->dequeue_txn_data_record(txn.getDtok(), txn.getXid(), commit);
}
txn.complete(commit);
- if (mgmtObject != 0) {
+ if (mgmtObject.get() != 0) {
mgmtObject->dec_tplTransactionDepth();
if (commit)
mgmtObject->inc_tplTxnCommits();
@@ -1490,7 +1494,7 @@ void MessageStoreImpl::localPrepare(TxnC
ctxt->prepare(tplStorePtr.get());
// make sure all the data is written to disk before returning
ctxt->sync();
- if (mgmtObject != 0) {
+ if (mgmtObject.get() != 0) {
mgmtObject->inc_tplTransactionDepth();
mgmtObject->inc_tplTxnPrepares();
}
Modified: qpid/trunk/qpid/cpp/src/qpid/legacystore/MessageStoreImpl.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/legacystore/MessageStoreImpl.h?rev=1429990&r1=1429989&r2=1429990&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/legacystore/MessageStoreImpl.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/legacystore/MessageStoreImpl.h Mon Jan 7 20:17:19 2013
@@ -150,7 +150,7 @@ class MessageStoreImpl : public qpid::br
u_int64_t highestRid;
bool isInit;
const char* envPath;
- qpid::sys::Timer& timer;
+ qpid::broker::Broker* broker;
qmf::org::apache::qpid::legacystore::Store::shared_ptr mgmtObject;
qpid::management::ManagementAgent* agent;
@@ -273,7 +273,7 @@ class MessageStoreImpl : public qpid::br
public:
typedef boost::shared_ptr<MessageStoreImpl> shared_ptr;
- MessageStoreImpl(qpid::sys::Timer& timer, const char* envpath = 0);
+ MessageStoreImpl(qpid::broker::Broker* broker, const char* envpath = 0);
virtual ~MessageStoreImpl();
@@ -292,7 +292,7 @@ class MessageStoreImpl : public qpid::br
void truncateInit(const bool saveStoreContent = false);
- void initManagement (qpid::broker::Broker* broker);
+ void initManagement ();
void finalize();
@@ -364,9 +364,7 @@ class MessageStoreImpl : public qpid::br
qpid::management::ManagementObject::shared_ptr GetManagementObject (void) const
{ return mgmtObject; }
- using qpid::management::Manageable::ManagementMethod;
-
- inline qpid::management::Manageable::status_t ManagementMethod (u_int32_t, qpid::management::Args&)
+ inline qpid::management::Manageable::status_t ManagementMethod (u_int32_t, qpid::management::Args&, std::string&)
{ return qpid::management::Manageable::STATUS_OK; }
std::string getStoreDir() const;
Modified: qpid/trunk/qpid/cpp/src/qpid/legacystore/StorePlugin.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/legacystore/StorePlugin.cpp?rev=1429990&r1=1429989&r2=1429990&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/legacystore/StorePlugin.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/legacystore/StorePlugin.cpp Mon Jan 7 20:17:19 2013
@@ -44,7 +44,7 @@ struct StorePlugin : public Plugin {
{
Broker* broker = dynamic_cast<Broker*>(&target);
if (!broker) return;
- store.reset(new MessageStoreImpl(broker->getTimer()));
+ store.reset(new MessageStoreImpl(broker));
DataDir& dataDir = broker->getDataDir ();
if (options.storeDir.empty ())
{
@@ -65,7 +65,7 @@ struct StorePlugin : public Plugin {
if (!broker) return;
if (!store) return;
QPID_LOG(info, "Enabling management instrumentation for the store.");
- store->initManagement(broker);
+ store->initManagement();
}
void finalize()
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org