You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kp...@apache.org on 2014/01/09 18:26:11 UTC
svn commit: r1556892 - in /qpid/trunk/qpid/cpp/src/qpid/linearstore: ./
journal/ journal/utils/
Author: kpvdr
Date: Thu Jan 9 17:26:10 2014
New Revision: 1556892
URL: http://svn.apache.org/r1556892
Log:
QPID-5460: [linearstore] Recovery of store which contains prepared but incomplete transactions results in message loss
Modified:
qpid/trunk/qpid/cpp/src/qpid/linearstore/ISSUES
qpid/trunk/qpid/cpp/src/qpid/linearstore/JournalImpl.cpp
qpid/trunk/qpid/cpp/src/qpid/linearstore/MessageStoreImpl.cpp
qpid/trunk/qpid/cpp/src/qpid/linearstore/MessageStoreImpl.h
qpid/trunk/qpid/cpp/src/qpid/linearstore/journal/RecoveryManager.cpp
qpid/trunk/qpid/cpp/src/qpid/linearstore/journal/jcfg.h
qpid/trunk/qpid/cpp/src/qpid/linearstore/journal/txn_map.cpp
qpid/trunk/qpid/cpp/src/qpid/linearstore/journal/txn_map.h
qpid/trunk/qpid/cpp/src/qpid/linearstore/journal/utils/deq_hdr.c
qpid/trunk/qpid/cpp/src/qpid/linearstore/journal/utils/file_hdr.c
qpid/trunk/qpid/cpp/src/qpid/linearstore/journal/utils/file_hdr.h
qpid/trunk/qpid/cpp/src/qpid/linearstore/journal/wmgr.cpp
Modified: qpid/trunk/qpid/cpp/src/qpid/linearstore/ISSUES
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/linearstore/ISSUES?rev=1556892&r1=1556891&r2=1556892&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/linearstore/ISSUES (original)
+++ qpid/trunk/qpid/cpp/src/qpid/linearstore/ISSUES Thu Jan 9 17:26:10 2014
@@ -58,8 +58,9 @@ Current bugs and performance issues:
4. (UNABLE TO REPRODUCE) BZ 1036026 - Unable to create durable queue - framing error - possibly caused by running both stores at the same time
5. (UNABLE TO REPRODUCE) BZ 1038599 - Abort when deleting used queue after restart - may be dup of QPID-5387 (BZ 1036071)
6. BZ 1039522 - Crash during recovery - JournalFile::getFqFileName() -JERR_JREC_BADRECTAIL
-6. BZ 1039525 - Crash during recovery - journal::jexception - JERR_JREC_BADRECTAIL
-7. (FIXED) QPID-5442 (BZ 1039949) - DTX test failure - missing XIDs
+7. BZ 1039525 - Crash during recovery - journal::jexception - JERR_JREC_BADRECTAIL
+8. (FIXED) QPID-5442 (BZ 1039949) - DTX test failure - missing XIDs
+9. (FIXED) QPID-5460 (BZ 1051097) - Transactional messages lost during recovery
Code tidy-up
------------
Modified: qpid/trunk/qpid/cpp/src/qpid/linearstore/JournalImpl.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/linearstore/JournalImpl.cpp?rev=1556892&r1=1556891&r2=1556892&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/linearstore/JournalImpl.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/linearstore/JournalImpl.cpp Thu Jan 9 17:26:10 2014
@@ -193,6 +193,7 @@ JournalImpl::recover(boost::shared_ptr<
}
*/
+ // TODO: This is ugly, find a way for RecoveryManager to use boost::ptr_list<PreparedTransaction>* directly
if (prep_tx_list_ptr) {
// Create list of prepared xids
std::vector<std::string> prep_xid_list;
@@ -209,8 +210,8 @@ JournalImpl::recover(boost::shared_ptr<
if (prep_tx_list_ptr)
{
for (PreparedTransaction::list::iterator i = prep_tx_list_ptr->begin(); i != prep_tx_list_ptr->end(); i++) {
- ::qpid::linearstore::journal::txn_data_list tdl = _tmap.get_tdata_list(i->xid); // tdl will be empty if xid not found
- for (::qpid::linearstore::journal::tdl_itr tdl_itr = tdl.begin(); tdl_itr < tdl.end(); tdl_itr++) {
+ ::qpid::linearstore::journal::txn_data_list_t tdl = _tmap.get_tdata_list(i->xid); // tdl will be empty if xid not found
+ for (::qpid::linearstore::journal::tdl_itr_t tdl_itr = tdl.begin(); tdl_itr < tdl.end(); tdl_itr++) {
if (tdl_itr->enq_flag_) { // enqueue op
i->enqueues->add(queue_id, tdl_itr->rid_);
} else { // dequeue op
Modified: qpid/trunk/qpid/cpp/src/qpid/linearstore/MessageStoreImpl.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/linearstore/MessageStoreImpl.cpp?rev=1556892&r1=1556891&r2=1556892&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/linearstore/MessageStoreImpl.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/linearstore/MessageStoreImpl.cpp Thu Jan 9 17:26:10 2014
@@ -341,7 +341,7 @@ void MessageStoreImpl::chkTplStoreInit()
qpid::sys::Mutex::ScopedLock sl(tplInitLock);
if (!tplStorePtr->is_ready()) {
qpid::linearstore::journal::jdir::create_dir(getTplBaseDir());
- tplStorePtr->initialize(/*tplNumJrnlFiles, false, 0, tplJrnlFsizeSblks,*/ getEmptyFilePool(defaultEfpPartitionNumber, defaultEfpFileSize_kib), tplWCacheNumPages, tplWCachePgSizeSblks);
+ tplStorePtr->initialize(getEmptyFilePool(defaultEfpPartitionNumber, defaultEfpFileSize_kib), tplWCacheNumPages, tplWCachePgSizeSblks);
if (mgmtObject.get() != 0) mgmtObject->set_tplIsInitialized(true);
}
}
@@ -584,6 +584,13 @@ void MessageStoreImpl::recover(qpid::bro
txn_list prepared;
recoverLockedMappings(prepared);
+ std::ostringstream oss;
+ oss << "Recovered transaction prepared list:";
+ for (txn_list::iterator i = prepared.begin(); i != prepared.end(); i++) {
+ oss << std::endl << " " << str2hexnum(i->xid);
+ }
+ QLS_LOG(debug, oss.str());
+
queue_index queues;//id->queue
exchange_index exchanges;//id->exchange
message_index messages;//id->message
@@ -620,39 +627,20 @@ void MessageStoreImpl::recover(qpid::bro
}
std::string xid = pt.xid;
- qpid::linearstore::journal::txn_data_list tdl = txn_map_ref.get_tdata_list(xid);
+ qpid::linearstore::journal::txn_data_list_t tdl = txn_map_ref.get_tdata_list(xid);
if (tdl.size() == 0) THROW_STORE_EXCEPTION("XID not found in txn_map");
- uint16_t enqCnt = 0UL;
- uint16_t deqCnt = 0UL;
- uint16_t tpcCnt = 0UL;
- uint16_t abortCnt = 0UL;
- uint64_t rid = 0ULL;
- for (qpid::linearstore::journal::tdl_itr j=tdl.begin(); j!=tdl.end(); ++j) {
- if (j->enq_flag_) {
- ++enqCnt;
- rid = j->rid_;
- } else {
- ++deqCnt;
- }
- if (!j->commit_flag_) {
- ++abortCnt;
- }
- if (j->tpc_flag_) {
- ++tpcCnt;
- }
- }
- if (tpcCnt > 0 && tpcCnt != tdl.size()) THROW_STORE_EXCEPTION("MessageStoreImpl::recover(): Inconsistent TPL 2PC count");
- bool commitFlag = abortCnt == 0;
+ qpid::linearstore::journal::txn_op_stats_t txn_op_stats(tdl);
+ bool commitFlag = txn_op_stats.abortCnt == 0;
// If a record is found that is dequeued but not committed/aborted from tplStore, then a complete() call
// was interrupted part way through committing/aborting the impacted queues. Complete this process.
- bool incomplTplTxnFlag = deqCnt > 0;
+ bool incomplTplTxnFlag = txn_op_stats.deqCnt > 0;
- if (tpcCnt > 0) {
+ if (txn_op_stats.tpcCnt > 0) {
// Dtx (2PC) transaction
TPCTxnCtxt* tpcc = new TPCTxnCtxt(xid, &messageIdSequence);
std::auto_ptr<qpid::broker::TPCTransactionContext> txn(tpcc);
- tpcc->recoverDtok(rid, xid);
+ tpcc->recoverDtok(txn_op_stats.rid, xid);
tpcc->prepare(tplStorePtr.get());
qpid::broker::RecoverableTransaction::shared_ptr dtx;
@@ -676,7 +664,7 @@ void MessageStoreImpl::recover(qpid::bro
} else {
// Local (1PC) transaction
boost::shared_ptr<TxnCtxt> opcc(new TxnCtxt(xid, &messageIdSequence));
- opcc->recoverDtok(rid, xid);
+ opcc->recoverDtok(txn_op_stats.rid, xid);
opcc->prepare(tplStorePtr.get());
if (pt.enqueues.get()) {
@@ -919,7 +907,7 @@ void MessageStoreImpl::recoverMessages(T
msg = getExternMessage(recovery, dtok.rid(), headerSize); // large message external to jrnl
} else {
headerSize = qpid::framing::Buffer(data, preambleLength).getLong();
- qpid::framing::Buffer headerBuff(data+ preambleLength, headerSize); /// do we want read size or header size ????
+ qpid::framing::Buffer headerBuff(data+ preambleLength, headerSize);
msg = recovery.recoverMessage(headerBuff);
}
msg->setPersistenceId(dtok.rid());
@@ -944,45 +932,30 @@ void MessageStoreImpl::recoverMessages(T
} else {
uint64_t rid = dtok.rid();
std::string xid(i->xid);
- qpid::linearstore::journal::txn_data_list tdl = txn_map_ref.get_tdata_list(xid);
+ qpid::linearstore::journal::txn_data_list_t tdl = txn_map_ref.get_tdata_list(xid);
if (tdl.size() == 0) THROW_STORE_EXCEPTION("XID not found in txn_map");
- uint16_t enqCnt = 0UL;
- uint16_t deqCnt = 0UL;
- uint16_t tpcCnt = 0UL;
- uint16_t abortCnt = 0UL;
- for (qpid::linearstore::journal::tdl_itr j=tdl.begin(); j!=tdl.end(); ++j) {
- if (j->enq_flag_) {
- ++enqCnt;
- } else {
- ++deqCnt;
- }
- if (!j->commit_flag_) {
- ++abortCnt;
- }
- if (j->tpc_flag_) {
- ++tpcCnt;
- }
- }
- if (tpcCnt > 0 && tpcCnt != tdl.size()) THROW_STORE_EXCEPTION("MessageStoreImpl::recoverMessages(): Inconsistent TPL 2PC count");
- if (deqCnt > 0 || tpcCnt == 0) {
+ qpid::linearstore::journal::txn_op_stats_t txn_op_stats(tdl);
+ if (txn_op_stats.deqCnt > 0 || txn_op_stats.tpcCnt == 0) {
if (jc->is_enqueued(rid, true)) {
// Enqueue is non-tx, dequeue tx
assert(jc->is_locked(rid)); // This record MUST be locked by a txn dequeue
- if (abortCnt > 0) {
+ if (txn_op_stats.abortCnt > 0) {
rcnt++;
queue->recover(msg); // recover message in abort case only
}
} else {
// Enqueue and/or dequeue tx
qpid::linearstore::journal::txn_map& tmap = jc->get_txn_map();
- qpid::linearstore::journal::txn_data_list txnList = tmap.get_tdata_list(xid); // txnList will be empty if xid not found
+ qpid::linearstore::journal::txn_data_list_t txnList = tmap.get_tdata_list(xid); // txnList will be empty if xid not found
bool enq = false;
bool deq = false;
- for (qpid::linearstore::journal::tdl_itr j = txnList.begin(); j<txnList.end(); j++) {
- if (j->enq_flag_ && j->rid_ == rid) enq = true;
- else if (!j->enq_flag_ && j->drid_ == rid) deq = true;
+ for (qpid::linearstore::journal::tdl_itr_t j = txnList.begin(); j<txnList.end(); j++) {
+ if (j->enq_flag_ && j->rid_ == rid)
+ enq = true;
+ else if (!j->enq_flag_ && j->drid_ == rid)
+ deq = true;
}
- if (enq && !deq && abortCnt == 0) {
+ if (enq && !deq && txn_op_stats.abortCnt == 0) {
rcnt++;
queue->recover(msg); // recover txn message in commit case only
}
@@ -1101,27 +1074,10 @@ void MessageStoreImpl::collectPreparedXi
std::vector<std::string> xidList;
tplStorePtr->get_txn_map().xid_list(xidList);
for (std::vector<std::string>::const_iterator i=xidList.begin(); i!=xidList.end(); ++i) {
- qpid::linearstore::journal::txn_data_list tdl = tplStorePtr->get_txn_map().get_tdata_list(*i);
- uint16_t enqCnt = 0UL;
- uint16_t deqCnt = 0UL;
- uint16_t tpcCnt = 0UL;
- uint16_t abortCnt = 0UL;
- for (qpid::linearstore::journal::tdl_itr j=tdl.begin(); j!=tdl.end(); ++j) {
- if (j->enq_flag_) {
- ++enqCnt;
- } else {
- ++deqCnt;
- }
- if (!j->commit_flag_) {
- ++abortCnt;
- }
- if (j->tpc_flag_) {
- ++tpcCnt;
- }
- }
- if (tpcCnt > 0) {
- if (tpcCnt != tdl.size()) THROW_STORE_EXCEPTION("MessageStoreImpl::collectPreparedXids: Inconsistent TPL 2PC count");
- if (enqCnt - deqCnt > 0) {
+ qpid::linearstore::journal::txn_data_list_t tdl = tplStorePtr->get_txn_map().get_tdata_list(*i);
+ qpid::linearstore::journal::txn_op_stats_t txn_op_stats(tdl);
+ if (txn_op_stats.tpcCnt > 0) {
+ if (txn_op_stats.enqCnt - txn_op_stats.deqCnt > 0) {
xids.insert(*i);
}
}
@@ -1554,6 +1510,15 @@ void MessageStoreImpl::journalDeleted(Jo
journalList.erase(j_.id());
}
+std::string MessageStoreImpl::str2hexnum(const std::string& str) {
+ std::ostringstream oss;
+ oss << "(" << str.size() << ")0x" << std::hex;
+ for (unsigned i=str.size(); i>0; --i) {
+ oss << std::setfill('0') << std::setw(2) << (uint16_t)(uint8_t)str[i-1];
+ }
+ return oss.str();
+}
+
MessageStoreImpl::StoreOptions::StoreOptions(const std::string& name_) :
qpid::Options(name_),
truncateFlag(defTruncateFlag),
Modified: qpid/trunk/qpid/cpp/src/qpid/linearstore/MessageStoreImpl.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/linearstore/MessageStoreImpl.h?rev=1556892&r1=1556891&r2=1556892&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/linearstore/MessageStoreImpl.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/linearstore/MessageStoreImpl.h Thu Jan 9 17:26:10 2014
@@ -235,18 +235,7 @@ class MessageStoreImpl : public qpid::br
}
void chkTplStoreInit();
- // debug aid for printing XIDs that may contain non-printable chars
- static std::string xid2str(const std::string xid) {
- std::ostringstream oss;
- oss << std::hex << std::setfill('0');
- for (unsigned i=0; i<xid.size(); i++) {
- if (isprint(xid[i]))
- oss << xid[i];
- else
- oss << "/" << std::setw(2) << (int)((char)xid[i]);
- }
- return oss.str();
- }
+ static std::string str2hexnum(const std::string& str);
public:
typedef boost::shared_ptr<MessageStoreImpl> shared_ptr;
Modified: qpid/trunk/qpid/cpp/src/qpid/linearstore/journal/RecoveryManager.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/linearstore/journal/RecoveryManager.cpp?rev=1556892&r1=1556891&r2=1556892&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/linearstore/journal/RecoveryManager.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/linearstore/journal/RecoveryManager.cpp Thu Jan 9 17:26:10 2014
@@ -117,14 +117,13 @@ void RecoveryManager::analyzeJournals(co
std::vector<std::string>::const_iterator pitr =
std::find(preparedTransactionListPtr->begin(), preparedTransactionListPtr->end(), *itr);
if (pitr == preparedTransactionListPtr->end()) { // not found in prepared list
- txn_data_list tdl = transactionMapRef_.get_remove_tdata_list(*itr); // tdl will be empty if xid not found
+ txn_data_list_t tdl = transactionMapRef_.get_remove_tdata_list(*itr); // tdl will be empty if xid not found
// Unlock any affected enqueues in emap
- for (tdl_itr i=tdl.begin(); i<tdl.end(); i++) {
+ for (tdl_itr_t i=tdl.begin(); i<tdl.end(); i++) {
if (i->enq_flag_) { // enq op - decrement enqueue count
fileNumberMap_[i->pfid_]->decrEnqueuedRecordCount();
} else if (enqueueMapRef_.is_enqueued(i->drid_, true)) { // deq op - unlock enq record
- int16_t ret = enqueueMapRef_.unlock(i->drid_);
- if (ret < enq_map::EMAP_OK) { // fail
+ if (enqueueMapRef_.unlock(i->drid_) < enq_map::EMAP_OK) { // fail
// enq_map::unlock()'s only error is enq_map::EMAP_RID_NOT_FOUND
std::ostringstream oss;
oss << std::hex << "_emap.unlock(): drid=0x\"" << i->drid_;
@@ -669,8 +668,8 @@ bool RecoveryManager::getNextRecordHeade
throw jexception(jerrno::JERR_RCVM_NULLXID, "ABT", "RecoveryManager", "getNextRecordHeader");
}
std::string xid((char*)xidp, ar.xid_size());
- txn_data_list tdl = transactionMapRef_.get_remove_tdata_list(xid); // tdl will be empty if xid not found
- for (tdl_itr itr = tdl.begin(); itr != tdl.end(); itr++) {
+ txn_data_list_t tdl = transactionMapRef_.get_remove_tdata_list(xid); // tdl will be empty if xid not found
+ for (tdl_itr_t itr = tdl.begin(); itr != tdl.end(); itr++) {
if (itr->enq_flag_) {
fileNumberMap_[itr->pfid_]->decrEnqueuedRecordCount();
} else {
@@ -697,8 +696,8 @@ bool RecoveryManager::getNextRecordHeade
throw jexception(jerrno::JERR_RCVM_NULLXID, "CMT", "RecoveryManager", "getNextRecordHeader");
}
std::string xid((char*)xidp, cr.xid_size());
- txn_data_list tdl = transactionMapRef_.get_remove_tdata_list(xid); // tdl will be empty if xid not found
- for (tdl_itr itr = tdl.begin(); itr != tdl.end(); itr++) {
+ txn_data_list_t tdl = transactionMapRef_.get_remove_tdata_list(xid); // tdl will be empty if xid not found
+ for (tdl_itr_t itr = tdl.begin(); itr != tdl.end(); itr++) {
if (itr->enq_flag_) { // txn enqueue
//std::cout << "[rid=0x" << std::hex << itr->rid_ << std::dec << " fid=" << itr->pfid_ << " fpos=0x" << std::hex << itr->foffs_ << "]" << std::dec << std::flush; // DEBUG
if (enqueueMapRef_.insert_pfid(itr->rid_, itr->pfid_, itr->foffs_) < enq_map::EMAP_OK) { // fail
@@ -766,8 +765,8 @@ void RecoveryManager::prepareRecordList(
std::vector<std::string> xidList;
transactionMapRef_.xid_list(xidList);
for (std::vector<std::string>::const_iterator j=xidList.begin(); j!=xidList.end(); ++j) {
- qpid::linearstore::journal::txn_data_list tdsl = transactionMapRef_.get_tdata_list(*j);
- for (qpid::linearstore::journal::tdl_itr k=tdsl.begin(); k!=tdsl.end(); ++k) {
+ qpid::linearstore::journal::txn_data_list_t tdsl = transactionMapRef_.get_tdata_list(*j);
+ for (qpid::linearstore::journal::tdl_itr_t k=tdsl.begin(); k!=tdsl.end(); ++k) {
if (k->enq_flag_) {
recordIdList_.push_back(RecoveredRecordData_t(k->rid_, k->pfid_, k->foffs_, true));
}
Modified: qpid/trunk/qpid/cpp/src/qpid/linearstore/journal/jcfg.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/linearstore/journal/jcfg.h?rev=1556892&r1=1556891&r2=1556892&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/linearstore/journal/jcfg.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/linearstore/journal/jcfg.h Thu Jan 9 17:26:10 2014
@@ -43,8 +43,8 @@
#define QLS_WMGR_MAXWAITUS 100 /**< Max. wait time (us) before submitting AIO */
#define QLS_JRNL_FILE_EXTENSION ".jrnl" /**< Extension for journal data files */
-#define QLS_TXA_MAGIC 0x61534c51 /**< ("RHMa" in little endian) Magic for dtx abort hdrs */
-#define QLS_TXC_MAGIC 0x63534c51 /**< ("RHMc" in little endian) Magic for dtx commit hdrs */
+#define QLS_TXA_MAGIC 0x61534c51 /**< ("QLSa" in little endian) Magic for dtx abort hdrs */
+#define QLS_TXC_MAGIC 0x63534c51 /**< ("QLSc" in little endian) Magic for dtx commit hdrs */
#define QLS_DEQ_MAGIC 0x64534c51 /**< ("QLSd" in little endian) Magic for deq rec hdrs */
#define QLS_ENQ_MAGIC 0x65534c51 /**< ("QLSe" in little endian) Magic for enq rec hdrs */
#define QLS_FILE_MAGIC 0x66534c51 /**< ("QLSf" in little endian) Magic for file hdrs */
Modified: qpid/trunk/qpid/cpp/src/qpid/linearstore/journal/txn_map.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/linearstore/journal/txn_map.cpp?rev=1556892&r1=1556891&r2=1556892&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/linearstore/journal/txn_map.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/linearstore/journal/txn_map.cpp Thu Jan 9 17:26:10 2014
@@ -51,6 +51,38 @@ txn_data_t::txn_data_t(const uint64_t ri
aio_compl_(false)
{}
+txn_op_stats_t::txn_op_stats_t(const txn_data_list_t& tdl) :
+ enqCnt(0U),
+ deqCnt(0U),
+ tpcCnt(0U),
+ abortCnt(0U),
+ commitCnt(0U),
+ rid(0ULL)
+{
+ for (tdl_const_itr_t i=tdl.begin(); i!=tdl.end(); ++i) {
+ if (i->enq_flag_) {
+ ++enqCnt;
+ rid = i->rid_;
+ } else {
+ ++deqCnt;
+ if (i->commit_flag_) {
+ ++commitCnt;
+ } else {
+ ++abortCnt;
+ }
+ }
+ if (i->tpc_flag_) {
+ ++tpcCnt;
+ }
+ }
+ if (tpcCnt > 0 && tpcCnt != tdl.size()) {
+ throw jexception("Inconsistent 2PC count"); // TODO: complete exception details
+ }
+ if (abortCnt > 0 && commitCnt > 0) {
+ throw jexception("Both abort and commit in same transaction"); // TODO: complete exception details
+ }
+}
+
txn_map::txn_map():
_map()/*,
_pfid_txn_cnt()*/
@@ -66,7 +98,7 @@ txn_map::insert_txn_data(const std::stri
xmap_itr itr = _map.find(xid);
if (itr == _map.end()) // not found in map
{
- txn_data_list list;
+ txn_data_list_t list;
list.push_back(td);
std::pair<xmap_itr, bool> ret = _map.insert(xmap_param(xid, list));
if (!ret.second) // duplicate
@@ -77,14 +109,14 @@ txn_map::insert_txn_data(const std::stri
return ok;
}
-const txn_data_list
+const txn_data_list_t
txn_map::get_tdata_list(const std::string& xid)
{
slock s(_mutex);
return get_tdata_list_nolock(xid);
}
-const txn_data_list
+const txn_data_list_t
txn_map::get_tdata_list_nolock(const std::string& xid)
{
xmap_itr itr = _map.find(xid);
@@ -93,14 +125,14 @@ txn_map::get_tdata_list_nolock(const std
return itr->second;
}
-const txn_data_list
+const txn_data_list_t
txn_map::get_remove_tdata_list(const std::string& xid)
{
slock s(_mutex);
xmap_itr itr = _map.find(xid);
if (itr == _map.end()) // not found in map
return _empty_data_list;
- txn_data_list list = itr->second;
+ txn_data_list_t list = itr->second;
_map.erase(itr);
return list;
}
@@ -132,7 +164,7 @@ txn_map::cnt(const bool enq_flag)
uint32_t c = 0;
for (xmap_itr i = _map.begin(); i != _map.end(); i++)
{
- for (tdl_itr j = i->second.begin(); j < i->second.end(); j++)
+ for (tdl_itr_t j = i->second.begin(); j < i->second.end(); j++)
{
if (j->enq_flag_ == enq_flag)
c++;
@@ -149,7 +181,7 @@ txn_map::is_txn_synced(const std::string
if (itr == _map.end()) // not found in map
return TMAP_XID_NOT_FOUND;
bool is_synced = true;
- for (tdl_itr litr = itr->second.begin(); litr < itr->second.end(); litr++)
+ for (tdl_itr_t litr = itr->second.begin(); litr < itr->second.end(); litr++)
{
if (!litr->aio_compl_)
{
@@ -167,7 +199,7 @@ txn_map::set_aio_compl(const std::string
xmap_itr itr = _map.find(xid);
if (itr == _map.end()) // xid not found in map
return TMAP_XID_NOT_FOUND;
- for (tdl_itr litr = itr->second.begin(); litr < itr->second.end(); litr++)
+ for (tdl_itr_t litr = itr->second.begin(); litr < itr->second.end(); litr++)
{
if (litr->rid_ == rid)
{
@@ -185,8 +217,8 @@ txn_map::data_exists(const std::string&
bool found = false;
{
slock s(_mutex);
- txn_data_list tdl = get_tdata_list_nolock(xid);
- tdl_itr itr = tdl.begin();
+ txn_data_list_t tdl = get_tdata_list_nolock(xid);
+ tdl_itr_t itr = tdl.begin();
while (itr != tdl.end() && !found)
{
found = itr->rid_ == rid;
@@ -204,8 +236,8 @@ txn_map::is_enq(const uint64_t rid)
slock s(_mutex);
for (xmap_itr i = _map.begin(); i != _map.end() && !found; i++)
{
- txn_data_list list = i->second;
- for (tdl_itr j = list.begin(); j < list.end() && !found; j++)
+ txn_data_list_t list = i->second;
+ for (tdl_itr_t j = list.begin(); j < list.end() && !found; j++)
{
if (j->enq_flag_)
found = j->rid_ == rid;
Modified: qpid/trunk/qpid/cpp/src/qpid/linearstore/journal/txn_map.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/linearstore/journal/txn_map.h?rev=1556892&r1=1556891&r2=1556892&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/linearstore/journal/txn_map.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/linearstore/journal/txn_map.h Thu Jan 9 17:26:10 2014
@@ -53,8 +53,20 @@ namespace journal {
const bool tpc_flag,
const bool commit_flag);
} txn_data_t;
- typedef std::vector<txn_data_t> txn_data_list;
- typedef txn_data_list::iterator tdl_itr;
+ typedef std::vector<txn_data_t> txn_data_list_t;
+ typedef txn_data_list_t::iterator tdl_itr_t;
+ typedef txn_data_list_t::const_iterator tdl_const_itr_t;
+
+ typedef struct txn_op_stats_t
+ {
+ uint16_t enqCnt;
+ uint16_t deqCnt;
+ uint16_t tpcCnt;
+ uint16_t abortCnt;
+ uint16_t commitCnt;
+ uint64_t rid;
+ txn_op_stats_t(const txn_data_list_t& tdl);
+ } txn_op_stats_t;
/**
* \class txn_map
@@ -102,21 +114,21 @@ namespace journal {
static int16_t TMAP_SYNCED;
private:
- typedef std::pair<std::string, txn_data_list> xmap_param;
- typedef std::map<std::string, txn_data_list> xmap;
+ typedef std::pair<std::string, txn_data_list_t> xmap_param;
+ typedef std::map<std::string, txn_data_list_t> xmap;
typedef xmap::iterator xmap_itr;
xmap _map;
smutex _mutex;
- const txn_data_list _empty_data_list;
+ const txn_data_list_t _empty_data_list;
public:
txn_map();
virtual ~txn_map();
bool insert_txn_data(const std::string& xid, const txn_data_t& td);
- const txn_data_list get_tdata_list(const std::string& xid);
- const txn_data_list get_remove_tdata_list(const std::string& xid);
+ const txn_data_list_t get_tdata_list(const std::string& xid);
+ const txn_data_list_t get_remove_tdata_list(const std::string& xid);
bool in_map(const std::string& xid);
uint32_t enq_cnt();
uint32_t deq_cnt();
@@ -130,7 +142,7 @@ namespace journal {
void xid_list(std::vector<std::string>& xv);
private:
uint32_t cnt(const bool enq_flag);
- const txn_data_list get_tdata_list_nolock(const std::string& xid);
+ const txn_data_list_t get_tdata_list_nolock(const std::string& xid);
};
}}}
Modified: qpid/trunk/qpid/cpp/src/qpid/linearstore/journal/utils/deq_hdr.c
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/linearstore/journal/utils/deq_hdr.c?rev=1556892&r1=1556891&r2=1556892&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/linearstore/journal/utils/deq_hdr.c (original)
+++ qpid/trunk/qpid/cpp/src/qpid/linearstore/journal/utils/deq_hdr.c Thu Jan 9 17:26:10 2014
@@ -41,6 +41,6 @@ bool is_txn_coml_commit(const deq_hdr_t
}
void set_txn_coml_commit(deq_hdr_t *dh, const bool commit) {
- dh->_rhdr._uflag = commit ? dh->_rhdr._uflag | DEQ_HDR_TXNCMPLCOMMIT_MASK :
- dh->_rhdr._uflag & (~DEQ_HDR_TXNCMPLCOMMIT_MASK);
+ dh->_rhdr._uflag = commit ? dh->_rhdr._uflag | DEQ_HDR_TXNCMPLCOMMIT_MASK : // set flag bit
+ dh->_rhdr._uflag & (~DEQ_HDR_TXNCMPLCOMMIT_MASK); // unset flag bit
}
Modified: qpid/trunk/qpid/cpp/src/qpid/linearstore/journal/utils/file_hdr.c
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/linearstore/journal/utils/file_hdr.c?rev=1556892&r1=1556891&r2=1556892&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/linearstore/journal/utils/file_hdr.c (original)
+++ qpid/trunk/qpid/cpp/src/qpid/linearstore/journal/utils/file_hdr.c Thu Jan 9 17:26:10 2014
@@ -75,6 +75,7 @@ void file_hdr_copy(file_hdr_t* dest, con
void file_hdr_reset(file_hdr_t* target) {
target->_rhdr._uflag = 0;
+ target->_rhdr._serial = 0;
target->_rhdr._rid = 0;
target->_fro = 0;
target->_ts_sec = 0;
@@ -85,6 +86,7 @@ void file_hdr_reset(file_hdr_t* target)
int is_file_hdr_reset(file_hdr_t* target) {
return target->_rhdr._uflag == 0 &&
+ target->_rhdr._serial == 0 &&
target->_rhdr._rid == 0 &&
target->_ts_sec == 0 &&
target->_ts_nsec == 0 &&
Modified: qpid/trunk/qpid/cpp/src/qpid/linearstore/journal/utils/file_hdr.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/linearstore/journal/utils/file_hdr.h?rev=1556892&r1=1556891&r2=1556892&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/linearstore/journal/utils/file_hdr.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/linearstore/journal/utils/file_hdr.h Thu Jan 9 17:26:10 2014
@@ -51,7 +51,7 @@ extern "C"{
* +---+---+---+---+---+---+---+---+ |
* | rid | |
* +---+---+---+---+---+---+---+---+ -+
- * | fs | partn | reserved |
+ * | fhs | partn | reserved |
* +---+---+---+---+---+---+---+---+
* | data-size |
* +---+---+---+---+---+---+---+---+
@@ -70,7 +70,7 @@ extern "C"{
*
* ver = Journal version
* rid = Record ID
- * fs = File header size in sblks (defined by JRNL_SBLK_SIZE)
+ * fhs = File header size in sblks (defined by JRNL_SBLK_SIZE)
* partn = EFP partition from which this file came
* fro = First Record Offset
* qnl = Length of the queue name in octets.
Modified: qpid/trunk/qpid/cpp/src/qpid/linearstore/journal/wmgr.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/linearstore/journal/wmgr.cpp?rev=1556892&r1=1556891&r2=1556892&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/linearstore/journal/wmgr.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/linearstore/journal/wmgr.cpp Thu Jan 9 17:26:10 2014
@@ -426,8 +426,8 @@ wmgr::abort(data_tok* dtokp,
// Delete this txn from tmap, unlock any locked records in emap
std::string xid((const char*)xid_ptr, xid_len);
- txn_data_list tdl = _tmap.get_remove_tdata_list(xid); // tdl will be empty if xid not found
- for (tdl_itr itr = tdl.begin(); itr != tdl.end(); itr++)
+ txn_data_list_t tdl = _tmap.get_remove_tdata_list(xid); // tdl will be empty if xid not found
+ for (tdl_itr_t itr = tdl.begin(); itr != tdl.end(); itr++)
{
if (!itr->enq_flag_)
_emap.unlock(itr->drid_); // ignore rid not found error
@@ -525,8 +525,8 @@ wmgr::commit(data_tok* dtokp,
// Delete this txn from tmap, process records into emap
std::string xid((const char*)xid_ptr, xid_len);
- txn_data_list tdl = _tmap.get_remove_tdata_list(xid); // tdl will be empty if xid not found
- for (tdl_itr itr = tdl.begin(); itr != tdl.end(); itr++)
+ txn_data_list_t tdl = _tmap.get_remove_tdata_list(xid); // tdl will be empty if xid not found
+ for (tdl_itr_t itr = tdl.begin(); itr != tdl.end(); itr++)
{
if (itr->enq_flag_) // txn enqueue
{
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org