You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kp...@apache.org on 2014/01/09 18:26:11 UTC

svn commit: r1556892 - in /qpid/trunk/qpid/cpp/src/qpid/linearstore: ./ journal/ journal/utils/

Author: kpvdr
Date: Thu Jan  9 17:26:10 2014
New Revision: 1556892

URL: http://svn.apache.org/r1556892
Log:
QPID-5460: [linearstore] Recovery of store which contains prepared but incomplete transactions results in message loss

Modified:
    qpid/trunk/qpid/cpp/src/qpid/linearstore/ISSUES
    qpid/trunk/qpid/cpp/src/qpid/linearstore/JournalImpl.cpp
    qpid/trunk/qpid/cpp/src/qpid/linearstore/MessageStoreImpl.cpp
    qpid/trunk/qpid/cpp/src/qpid/linearstore/MessageStoreImpl.h
    qpid/trunk/qpid/cpp/src/qpid/linearstore/journal/RecoveryManager.cpp
    qpid/trunk/qpid/cpp/src/qpid/linearstore/journal/jcfg.h
    qpid/trunk/qpid/cpp/src/qpid/linearstore/journal/txn_map.cpp
    qpid/trunk/qpid/cpp/src/qpid/linearstore/journal/txn_map.h
    qpid/trunk/qpid/cpp/src/qpid/linearstore/journal/utils/deq_hdr.c
    qpid/trunk/qpid/cpp/src/qpid/linearstore/journal/utils/file_hdr.c
    qpid/trunk/qpid/cpp/src/qpid/linearstore/journal/utils/file_hdr.h
    qpid/trunk/qpid/cpp/src/qpid/linearstore/journal/wmgr.cpp

Modified: qpid/trunk/qpid/cpp/src/qpid/linearstore/ISSUES
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/linearstore/ISSUES?rev=1556892&r1=1556891&r2=1556892&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/linearstore/ISSUES (original)
+++ qpid/trunk/qpid/cpp/src/qpid/linearstore/ISSUES Thu Jan  9 17:26:10 2014
@@ -58,8 +58,9 @@ Current bugs and performance issues:
 4. (UNABLE TO REPRODUCE) BZ 1036026 - Unable to create durable queue - framing error - possibly caused by running both stores at the same time
 5. (UNABLE TO REPRODUCE) BZ 1038599 - Abort when deleting used queue after restart - may be dup of QPID-5387 (BZ 1036071)
 6. BZ 1039522 - Crash during recovery - JournalFile::getFqFileName() -JERR_JREC_BADRECTAIL
-6. BZ 1039525 - Crash during recovery - journal::jexception - JERR_JREC_BADRECTAIL
-7. (FIXED) QPID-5442 (BZ 1039949) - DTX test failure - missing XIDs
+7. BZ 1039525 - Crash during recovery - journal::jexception - JERR_JREC_BADRECTAIL
+8. (FIXED) QPID-5442 (BZ 1039949) - DTX test failure - missing XIDs
+9. (FIXED) QPID-5460 (BZ 1051097) - Transactional messages lost during recovery
 
 Code tidy-up
 ------------

Modified: qpid/trunk/qpid/cpp/src/qpid/linearstore/JournalImpl.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/linearstore/JournalImpl.cpp?rev=1556892&r1=1556891&r2=1556892&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/linearstore/JournalImpl.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/linearstore/JournalImpl.cpp Thu Jan  9 17:26:10 2014
@@ -193,6 +193,7 @@ JournalImpl::recover(boost::shared_ptr< 
     }
 */
 
+    // TODO: This is ugly, find a way for RecoveryManager to use boost::ptr_list<PreparedTransaction>* directly
     if (prep_tx_list_ptr) {
         // Create list of prepared xids
         std::vector<std::string> prep_xid_list;
@@ -209,8 +210,8 @@ JournalImpl::recover(boost::shared_ptr< 
     if (prep_tx_list_ptr)
     {
         for (PreparedTransaction::list::iterator i = prep_tx_list_ptr->begin(); i != prep_tx_list_ptr->end(); i++) {
-            ::qpid::linearstore::journal::txn_data_list tdl = _tmap.get_tdata_list(i->xid); // tdl will be empty if xid not found
-            for (::qpid::linearstore::journal::tdl_itr tdl_itr = tdl.begin(); tdl_itr < tdl.end(); tdl_itr++) {
+            ::qpid::linearstore::journal::txn_data_list_t tdl = _tmap.get_tdata_list(i->xid); // tdl will be empty if xid not found
+            for (::qpid::linearstore::journal::tdl_itr_t tdl_itr = tdl.begin(); tdl_itr < tdl.end(); tdl_itr++) {
                 if (tdl_itr->enq_flag_) { // enqueue op
                     i->enqueues->add(queue_id, tdl_itr->rid_);
                 } else { // dequeue op

Modified: qpid/trunk/qpid/cpp/src/qpid/linearstore/MessageStoreImpl.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/linearstore/MessageStoreImpl.cpp?rev=1556892&r1=1556891&r2=1556892&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/linearstore/MessageStoreImpl.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/linearstore/MessageStoreImpl.cpp Thu Jan  9 17:26:10 2014
@@ -341,7 +341,7 @@ void MessageStoreImpl::chkTplStoreInit()
     qpid::sys::Mutex::ScopedLock sl(tplInitLock);
     if (!tplStorePtr->is_ready()) {
         qpid::linearstore::journal::jdir::create_dir(getTplBaseDir());
-        tplStorePtr->initialize(/*tplNumJrnlFiles, false, 0, tplJrnlFsizeSblks,*/ getEmptyFilePool(defaultEfpPartitionNumber, defaultEfpFileSize_kib), tplWCacheNumPages, tplWCachePgSizeSblks);
+        tplStorePtr->initialize(getEmptyFilePool(defaultEfpPartitionNumber, defaultEfpFileSize_kib), tplWCacheNumPages, tplWCachePgSizeSblks);
         if (mgmtObject.get() != 0) mgmtObject->set_tplIsInitialized(true);
     }
 }
@@ -584,6 +584,13 @@ void MessageStoreImpl::recover(qpid::bro
     txn_list prepared;
     recoverLockedMappings(prepared);
 
+    std::ostringstream oss;
+    oss << "Recovered transaction prepared list:";
+    for (txn_list::iterator i = prepared.begin(); i != prepared.end(); i++) {
+        oss << std::endl << "     " << str2hexnum(i->xid);
+    }
+    QLS_LOG(debug, oss.str());
+
     queue_index queues;//id->queue
     exchange_index exchanges;//id->exchange
     message_index messages;//id->message
@@ -620,39 +627,20 @@ void MessageStoreImpl::recover(qpid::bro
         }
 
         std::string xid = pt.xid;
-        qpid::linearstore::journal::txn_data_list tdl = txn_map_ref.get_tdata_list(xid);
+        qpid::linearstore::journal::txn_data_list_t tdl = txn_map_ref.get_tdata_list(xid);
         if (tdl.size() == 0) THROW_STORE_EXCEPTION("XID not found in txn_map");
-        uint16_t enqCnt = 0UL;
-        uint16_t deqCnt = 0UL;
-        uint16_t tpcCnt = 0UL;
-        uint16_t abortCnt = 0UL;
-        uint64_t rid = 0ULL;
-        for (qpid::linearstore::journal::tdl_itr j=tdl.begin(); j!=tdl.end(); ++j) {
-            if (j->enq_flag_) {
-                ++enqCnt;
-                rid = j->rid_;
-            } else {
-                ++deqCnt;
-            }
-            if (!j->commit_flag_) {
-                ++abortCnt;
-            }
-            if (j->tpc_flag_) {
-                ++tpcCnt;
-            }
-        }
-        if (tpcCnt > 0 && tpcCnt != tdl.size()) THROW_STORE_EXCEPTION("MessageStoreImpl::recover(): Inconsistent TPL 2PC count");
-        bool commitFlag = abortCnt == 0;
+        qpid::linearstore::journal::txn_op_stats_t txn_op_stats(tdl);
+        bool commitFlag = txn_op_stats.abortCnt == 0;
 
         // If a record is found that is dequeued but not committed/aborted from tplStore, then a complete() call
         // was interrupted part way through committing/aborting the impacted queues. Complete this process.
-        bool incomplTplTxnFlag = deqCnt > 0;
+        bool incomplTplTxnFlag = txn_op_stats.deqCnt > 0;
 
-        if (tpcCnt > 0) {
+        if (txn_op_stats.tpcCnt > 0) {
             // Dtx (2PC) transaction
             TPCTxnCtxt* tpcc = new TPCTxnCtxt(xid, &messageIdSequence);
             std::auto_ptr<qpid::broker::TPCTransactionContext> txn(tpcc);
-            tpcc->recoverDtok(rid, xid);
+            tpcc->recoverDtok(txn_op_stats.rid, xid);
             tpcc->prepare(tplStorePtr.get());
 
             qpid::broker::RecoverableTransaction::shared_ptr dtx;
@@ -676,7 +664,7 @@ void MessageStoreImpl::recover(qpid::bro
         } else {
             // Local (1PC) transaction
             boost::shared_ptr<TxnCtxt> opcc(new TxnCtxt(xid, &messageIdSequence));
-            opcc->recoverDtok(rid, xid);
+            opcc->recoverDtok(txn_op_stats.rid, xid);
             opcc->prepare(tplStorePtr.get());
 
             if (pt.enqueues.get()) {
@@ -919,7 +907,7 @@ void MessageStoreImpl::recoverMessages(T
                     msg = getExternMessage(recovery, dtok.rid(), headerSize); // large message external to jrnl
                 } else {
                     headerSize = qpid::framing::Buffer(data, preambleLength).getLong();
-                    qpid::framing::Buffer headerBuff(data+ preambleLength, headerSize); /// do we want read size or header size ????
+                    qpid::framing::Buffer headerBuff(data+ preambleLength, headerSize);
                     msg = recovery.recoverMessage(headerBuff);
                 }
                 msg->setPersistenceId(dtok.rid());
@@ -944,45 +932,30 @@ void MessageStoreImpl::recoverMessages(T
                 } else {
                     uint64_t rid = dtok.rid();
                     std::string xid(i->xid);
-                    qpid::linearstore::journal::txn_data_list tdl = txn_map_ref.get_tdata_list(xid);
+                    qpid::linearstore::journal::txn_data_list_t tdl = txn_map_ref.get_tdata_list(xid);
                     if (tdl.size() == 0) THROW_STORE_EXCEPTION("XID not found in txn_map");
-                    uint16_t enqCnt = 0UL;
-                    uint16_t deqCnt = 0UL;
-                    uint16_t tpcCnt = 0UL;
-                    uint16_t abortCnt = 0UL;
-                    for (qpid::linearstore::journal::tdl_itr j=tdl.begin(); j!=tdl.end(); ++j) {
-                        if (j->enq_flag_) {
-                            ++enqCnt;
-                        } else {
-                            ++deqCnt;
-                        }
-                        if (!j->commit_flag_) {
-                            ++abortCnt;
-                        }
-                        if (j->tpc_flag_) {
-                            ++tpcCnt;
-                        }
-                    }
-                    if (tpcCnt > 0 && tpcCnt != tdl.size()) THROW_STORE_EXCEPTION("MessageStoreImpl::recoverMessages(): Inconsistent TPL 2PC count");
-                    if (deqCnt > 0 || tpcCnt == 0) {
+                    qpid::linearstore::journal::txn_op_stats_t txn_op_stats(tdl);
+                    if (txn_op_stats.deqCnt > 0 || txn_op_stats.tpcCnt == 0) {
                         if (jc->is_enqueued(rid, true)) {
                             // Enqueue is non-tx, dequeue tx
                             assert(jc->is_locked(rid)); // This record MUST be locked by a txn dequeue
-                            if (abortCnt > 0) {
+                            if (txn_op_stats.abortCnt > 0) {
                                 rcnt++;
                                 queue->recover(msg); // recover message in abort case only
                             }
                         } else {
                             // Enqueue and/or dequeue tx
                             qpid::linearstore::journal::txn_map& tmap = jc->get_txn_map();
-                            qpid::linearstore::journal::txn_data_list txnList = tmap.get_tdata_list(xid); // txnList will be empty if xid not found
+                            qpid::linearstore::journal::txn_data_list_t txnList = tmap.get_tdata_list(xid); // txnList will be empty if xid not found
                             bool enq = false;
                             bool deq = false;
-                            for (qpid::linearstore::journal::tdl_itr j = txnList.begin(); j<txnList.end(); j++) {
-                                if (j->enq_flag_ && j->rid_ == rid) enq = true;
-                                else if (!j->enq_flag_ && j->drid_ == rid) deq = true;
+                            for (qpid::linearstore::journal::tdl_itr_t j = txnList.begin(); j<txnList.end(); j++) {
+                                if (j->enq_flag_ && j->rid_ == rid)
+                                    enq = true;
+                                else if (!j->enq_flag_ && j->drid_ == rid)
+                                    deq = true;
                             }
-                            if (enq && !deq && abortCnt == 0) {
+                            if (enq && !deq && txn_op_stats.abortCnt == 0) {
                                 rcnt++;
                                 queue->recover(msg); // recover txn message in commit case only
                             }
@@ -1101,27 +1074,10 @@ void MessageStoreImpl::collectPreparedXi
     std::vector<std::string> xidList;
     tplStorePtr->get_txn_map().xid_list(xidList);
     for (std::vector<std::string>::const_iterator i=xidList.begin(); i!=xidList.end(); ++i) {
-        qpid::linearstore::journal::txn_data_list tdl = tplStorePtr->get_txn_map().get_tdata_list(*i);
-        uint16_t enqCnt = 0UL;
-        uint16_t deqCnt = 0UL;
-        uint16_t tpcCnt = 0UL;
-        uint16_t abortCnt = 0UL;
-        for (qpid::linearstore::journal::tdl_itr j=tdl.begin(); j!=tdl.end(); ++j) {
-            if (j->enq_flag_) {
-                ++enqCnt;
-            } else {
-                ++deqCnt;
-            }
-            if (!j->commit_flag_) {
-                ++abortCnt;
-            }
-            if (j->tpc_flag_) {
-                ++tpcCnt;
-            }
-        }
-        if (tpcCnt > 0) {
-            if (tpcCnt != tdl.size()) THROW_STORE_EXCEPTION("MessageStoreImpl::collectPreparedXids: Inconsistent TPL 2PC count");
-            if (enqCnt - deqCnt > 0) {
+        qpid::linearstore::journal::txn_data_list_t tdl = tplStorePtr->get_txn_map().get_tdata_list(*i);
+        qpid::linearstore::journal::txn_op_stats_t txn_op_stats(tdl);
+        if (txn_op_stats.tpcCnt > 0) {
+            if (txn_op_stats.enqCnt - txn_op_stats.deqCnt > 0) {
                 xids.insert(*i);
             }
         }
@@ -1554,6 +1510,15 @@ void MessageStoreImpl::journalDeleted(Jo
     journalList.erase(j_.id());
 }
 
+std::string MessageStoreImpl::str2hexnum(const std::string& str) {
+    std::ostringstream oss;
+    oss << "(" << str.size() << ")0x" << std::hex;
+    for (unsigned i=str.size(); i>0; --i) {
+        oss << std::setfill('0') << std::setw(2) << (uint16_t)(uint8_t)str[i-1];
+    }
+    return oss.str();
+}
+
 MessageStoreImpl::StoreOptions::StoreOptions(const std::string& name_) :
                                              qpid::Options(name_),
                                              truncateFlag(defTruncateFlag),

Modified: qpid/trunk/qpid/cpp/src/qpid/linearstore/MessageStoreImpl.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/linearstore/MessageStoreImpl.h?rev=1556892&r1=1556891&r2=1556892&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/linearstore/MessageStoreImpl.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/linearstore/MessageStoreImpl.h Thu Jan  9 17:26:10 2014
@@ -235,18 +235,7 @@ class MessageStoreImpl : public qpid::br
     }
     void chkTplStoreInit();
 
-    // debug aid for printing XIDs that may contain non-printable chars
-    static std::string xid2str(const std::string xid) {
-        std::ostringstream oss;
-        oss << std::hex << std::setfill('0');
-        for (unsigned i=0; i<xid.size(); i++) {
-            if (isprint(xid[i]))
-                oss << xid[i];
-            else
-                oss << "/" << std::setw(2) << (int)((char)xid[i]);
-        }
-        return oss.str();
-    }
+    static std::string str2hexnum(const std::string& str);
 
   public:
     typedef boost::shared_ptr<MessageStoreImpl> shared_ptr;

Modified: qpid/trunk/qpid/cpp/src/qpid/linearstore/journal/RecoveryManager.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/linearstore/journal/RecoveryManager.cpp?rev=1556892&r1=1556891&r2=1556892&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/linearstore/journal/RecoveryManager.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/linearstore/journal/RecoveryManager.cpp Thu Jan  9 17:26:10 2014
@@ -117,14 +117,13 @@ void RecoveryManager::analyzeJournals(co
                 std::vector<std::string>::const_iterator pitr =
                         std::find(preparedTransactionListPtr->begin(), preparedTransactionListPtr->end(), *itr);
                 if (pitr == preparedTransactionListPtr->end()) { // not found in prepared list
-                    txn_data_list tdl = transactionMapRef_.get_remove_tdata_list(*itr); // tdl will be empty if xid not found
+                    txn_data_list_t tdl = transactionMapRef_.get_remove_tdata_list(*itr); // tdl will be empty if xid not found
                     // Unlock any affected enqueues in emap
-                    for (tdl_itr i=tdl.begin(); i<tdl.end(); i++) {
+                    for (tdl_itr_t i=tdl.begin(); i<tdl.end(); i++) {
                         if (i->enq_flag_) { // enq op - decrement enqueue count
                             fileNumberMap_[i->pfid_]->decrEnqueuedRecordCount();
                         } else if (enqueueMapRef_.is_enqueued(i->drid_, true)) { // deq op - unlock enq record
-                            int16_t ret = enqueueMapRef_.unlock(i->drid_);
-                            if (ret < enq_map::EMAP_OK) { // fail
+                            if (enqueueMapRef_.unlock(i->drid_) < enq_map::EMAP_OK) { // fail
                                 // enq_map::unlock()'s only error is enq_map::EMAP_RID_NOT_FOUND
                                 std::ostringstream oss;
                                 oss << std::hex << "_emap.unlock(): drid=0x\"" << i->drid_;
@@ -669,8 +668,8 @@ bool RecoveryManager::getNextRecordHeade
                     throw jexception(jerrno::JERR_RCVM_NULLXID, "ABT", "RecoveryManager", "getNextRecordHeader");
                 }
                 std::string xid((char*)xidp, ar.xid_size());
-                txn_data_list tdl = transactionMapRef_.get_remove_tdata_list(xid); // tdl will be empty if xid not found
-                for (tdl_itr itr = tdl.begin(); itr != tdl.end(); itr++) {
+                txn_data_list_t tdl = transactionMapRef_.get_remove_tdata_list(xid); // tdl will be empty if xid not found
+                for (tdl_itr_t itr = tdl.begin(); itr != tdl.end(); itr++) {
                     if (itr->enq_flag_) {
                         fileNumberMap_[itr->pfid_]->decrEnqueuedRecordCount();
                     } else {
@@ -697,8 +696,8 @@ bool RecoveryManager::getNextRecordHeade
                     throw jexception(jerrno::JERR_RCVM_NULLXID, "CMT", "RecoveryManager", "getNextRecordHeader");
                 }
                 std::string xid((char*)xidp, cr.xid_size());
-                txn_data_list tdl = transactionMapRef_.get_remove_tdata_list(xid); // tdl will be empty if xid not found
-                for (tdl_itr itr = tdl.begin(); itr != tdl.end(); itr++) {
+                txn_data_list_t tdl = transactionMapRef_.get_remove_tdata_list(xid); // tdl will be empty if xid not found
+                for (tdl_itr_t itr = tdl.begin(); itr != tdl.end(); itr++) {
                     if (itr->enq_flag_) { // txn enqueue
 //std::cout << "[rid=0x" << std::hex << itr->rid_ << std::dec << " fid=" << itr->pfid_ << " fpos=0x" << std::hex << itr->foffs_ << "]" << std::dec << std::flush; // DEBUG
                         if (enqueueMapRef_.insert_pfid(itr->rid_, itr->pfid_, itr->foffs_) < enq_map::EMAP_OK) { // fail
@@ -766,8 +765,8 @@ void RecoveryManager::prepareRecordList(
     std::vector<std::string> xidList;
     transactionMapRef_.xid_list(xidList);
     for (std::vector<std::string>::const_iterator j=xidList.begin(); j!=xidList.end(); ++j) {
-        qpid::linearstore::journal::txn_data_list tdsl = transactionMapRef_.get_tdata_list(*j);
-        for (qpid::linearstore::journal::tdl_itr k=tdsl.begin(); k!=tdsl.end(); ++k) {
+        qpid::linearstore::journal::txn_data_list_t tdsl = transactionMapRef_.get_tdata_list(*j);
+        for (qpid::linearstore::journal::tdl_itr_t k=tdsl.begin(); k!=tdsl.end(); ++k) {
             if (k->enq_flag_) {
                 recordIdList_.push_back(RecoveredRecordData_t(k->rid_, k->pfid_, k->foffs_, true));
             }

Modified: qpid/trunk/qpid/cpp/src/qpid/linearstore/journal/jcfg.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/linearstore/journal/jcfg.h?rev=1556892&r1=1556891&r2=1556892&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/linearstore/journal/jcfg.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/linearstore/journal/jcfg.h Thu Jan  9 17:26:10 2014
@@ -43,8 +43,8 @@
 #define QLS_WMGR_MAXWAITUS              100         /**< Max. wait time (us) before submitting AIO */
 
 #define QLS_JRNL_FILE_EXTENSION         ".jrnl"     /**< Extension for journal data files */
-#define QLS_TXA_MAGIC                   0x61534c51  /**< ("RHMa" in little endian) Magic for dtx abort hdrs */
-#define QLS_TXC_MAGIC                   0x63534c51  /**< ("RHMc" in little endian) Magic for dtx commit hdrs */
+#define QLS_TXA_MAGIC                   0x61534c51  /**< ("QLSa" in little endian) Magic for dtx abort hdrs */
+#define QLS_TXC_MAGIC                   0x63534c51  /**< ("QLSc" in little endian) Magic for dtx commit hdrs */
 #define QLS_DEQ_MAGIC                   0x64534c51  /**< ("QLSd" in little endian) Magic for deq rec hdrs */
 #define QLS_ENQ_MAGIC                   0x65534c51  /**< ("QLSe" in little endian) Magic for enq rec hdrs */
 #define QLS_FILE_MAGIC                  0x66534c51  /**< ("QLSf" in little endian) Magic for file hdrs */

Modified: qpid/trunk/qpid/cpp/src/qpid/linearstore/journal/txn_map.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/linearstore/journal/txn_map.cpp?rev=1556892&r1=1556891&r2=1556892&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/linearstore/journal/txn_map.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/linearstore/journal/txn_map.cpp Thu Jan  9 17:26:10 2014
@@ -51,6 +51,38 @@ txn_data_t::txn_data_t(const uint64_t ri
         aio_compl_(false)
 {}
 
+txn_op_stats_t::txn_op_stats_t(const txn_data_list_t& tdl) :
+        enqCnt(0U),
+        deqCnt(0U),
+        tpcCnt(0U),
+        abortCnt(0U),
+        commitCnt(0U),
+        rid(0ULL)
+{
+    for (tdl_const_itr_t i=tdl.begin(); i!=tdl.end(); ++i) {
+        if (i->enq_flag_) {
+            ++enqCnt;
+            rid = i->rid_;
+        } else {
+            ++deqCnt;
+            if (i->commit_flag_) {
+                ++commitCnt;
+            } else {
+                ++abortCnt;
+            }
+        }
+        if (i->tpc_flag_) {
+            ++tpcCnt;
+        }
+    }
+    if (tpcCnt > 0 && tpcCnt != tdl.size()) {
+        throw jexception("Inconsistent 2PC count"); // TODO: complete exception details
+    }
+    if (abortCnt > 0 && commitCnt > 0) {
+        throw jexception("Both abort and commit in same transaction"); // TODO: complete exception details
+    }
+}
+
 txn_map::txn_map():
         _map()/*,
         _pfid_txn_cnt()*/
@@ -66,7 +98,7 @@ txn_map::insert_txn_data(const std::stri
     xmap_itr itr = _map.find(xid);
     if (itr == _map.end()) // not found in map
     {
-        txn_data_list list;
+        txn_data_list_t list;
         list.push_back(td);
         std::pair<xmap_itr, bool> ret = _map.insert(xmap_param(xid, list));
         if (!ret.second) // duplicate
@@ -77,14 +109,14 @@ txn_map::insert_txn_data(const std::stri
     return ok;
 }
 
-const txn_data_list
+const txn_data_list_t
 txn_map::get_tdata_list(const std::string& xid)
 {
     slock s(_mutex);
     return get_tdata_list_nolock(xid);
 }
 
-const txn_data_list
+const txn_data_list_t
 txn_map::get_tdata_list_nolock(const std::string& xid)
 {
     xmap_itr itr = _map.find(xid);
@@ -93,14 +125,14 @@ txn_map::get_tdata_list_nolock(const std
     return itr->second;
 }
 
-const txn_data_list
+const txn_data_list_t
 txn_map::get_remove_tdata_list(const std::string& xid)
 {
     slock s(_mutex);
     xmap_itr itr = _map.find(xid);
     if (itr == _map.end()) // not found in map
         return _empty_data_list;
-    txn_data_list list = itr->second;
+    txn_data_list_t list = itr->second;
     _map.erase(itr);
     return list;
 }
@@ -132,7 +164,7 @@ txn_map::cnt(const bool enq_flag)
     uint32_t c = 0;
     for (xmap_itr i = _map.begin(); i != _map.end(); i++)
     {
-        for (tdl_itr j = i->second.begin(); j < i->second.end(); j++)
+        for (tdl_itr_t j = i->second.begin(); j < i->second.end(); j++)
         {
             if (j->enq_flag_ == enq_flag)
                 c++;
@@ -149,7 +181,7 @@ txn_map::is_txn_synced(const std::string
     if (itr == _map.end()) // not found in map
         return TMAP_XID_NOT_FOUND;
     bool is_synced = true;
-    for (tdl_itr litr = itr->second.begin(); litr < itr->second.end(); litr++)
+    for (tdl_itr_t litr = itr->second.begin(); litr < itr->second.end(); litr++)
     {
         if (!litr->aio_compl_)
         {
@@ -167,7 +199,7 @@ txn_map::set_aio_compl(const std::string
     xmap_itr itr = _map.find(xid);
     if (itr == _map.end()) // xid not found in map
         return TMAP_XID_NOT_FOUND;
-    for (tdl_itr litr = itr->second.begin(); litr < itr->second.end(); litr++)
+    for (tdl_itr_t litr = itr->second.begin(); litr < itr->second.end(); litr++)
     {
         if (litr->rid_ == rid)
         {
@@ -185,8 +217,8 @@ txn_map::data_exists(const std::string& 
     bool found = false;
     {
         slock s(_mutex);
-        txn_data_list tdl = get_tdata_list_nolock(xid);
-        tdl_itr itr = tdl.begin();
+        txn_data_list_t tdl = get_tdata_list_nolock(xid);
+        tdl_itr_t itr = tdl.begin();
         while (itr != tdl.end() && !found)
         {
             found = itr->rid_ == rid;
@@ -204,8 +236,8 @@ txn_map::is_enq(const uint64_t rid)
         slock s(_mutex);
         for (xmap_itr i = _map.begin(); i != _map.end() && !found; i++)
         {
-            txn_data_list list = i->second;
-            for (tdl_itr j = list.begin(); j < list.end() && !found; j++)
+            txn_data_list_t list = i->second;
+            for (tdl_itr_t j = list.begin(); j < list.end() && !found; j++)
             {
                 if (j->enq_flag_)
                     found = j->rid_ == rid;

Modified: qpid/trunk/qpid/cpp/src/qpid/linearstore/journal/txn_map.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/linearstore/journal/txn_map.h?rev=1556892&r1=1556891&r2=1556892&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/linearstore/journal/txn_map.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/linearstore/journal/txn_map.h Thu Jan  9 17:26:10 2014
@@ -53,8 +53,20 @@ namespace journal {
                    const bool tpc_flag,
                    const bool commit_flag);
     } txn_data_t;
-    typedef std::vector<txn_data_t> txn_data_list;
-    typedef txn_data_list::iterator tdl_itr;
+    typedef std::vector<txn_data_t> txn_data_list_t;
+    typedef txn_data_list_t::iterator tdl_itr_t;
+    typedef txn_data_list_t::const_iterator tdl_const_itr_t;
+
+    typedef struct txn_op_stats_t
+    {
+        uint16_t enqCnt;
+        uint16_t deqCnt;
+        uint16_t tpcCnt;
+        uint16_t abortCnt;
+        uint16_t commitCnt;
+        uint64_t rid;
+        txn_op_stats_t(const txn_data_list_t& tdl);
+    } txn_op_stats_t;
 
     /**
     * \class txn_map
@@ -102,21 +114,21 @@ namespace journal {
         static int16_t TMAP_SYNCED;
 
     private:
-        typedef std::pair<std::string, txn_data_list> xmap_param;
-        typedef std::map<std::string, txn_data_list> xmap;
+        typedef std::pair<std::string, txn_data_list_t> xmap_param;
+        typedef std::map<std::string, txn_data_list_t> xmap;
         typedef xmap::iterator xmap_itr;
 
         xmap _map;
         smutex _mutex;
-        const txn_data_list _empty_data_list;
+        const txn_data_list_t _empty_data_list;
 
     public:
         txn_map();
         virtual ~txn_map();
 
         bool insert_txn_data(const std::string& xid, const txn_data_t& td);
-        const txn_data_list get_tdata_list(const std::string& xid);
-        const txn_data_list get_remove_tdata_list(const std::string& xid);
+        const txn_data_list_t get_tdata_list(const std::string& xid);
+        const txn_data_list_t get_remove_tdata_list(const std::string& xid);
         bool in_map(const std::string& xid);
         uint32_t enq_cnt();
         uint32_t deq_cnt();
@@ -130,7 +142,7 @@ namespace journal {
         void xid_list(std::vector<std::string>& xv);
     private:
         uint32_t cnt(const bool enq_flag);
-        const txn_data_list get_tdata_list_nolock(const std::string& xid);
+        const txn_data_list_t get_tdata_list_nolock(const std::string& xid);
     };
 
 }}}

Modified: qpid/trunk/qpid/cpp/src/qpid/linearstore/journal/utils/deq_hdr.c
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/linearstore/journal/utils/deq_hdr.c?rev=1556892&r1=1556891&r2=1556892&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/linearstore/journal/utils/deq_hdr.c (original)
+++ qpid/trunk/qpid/cpp/src/qpid/linearstore/journal/utils/deq_hdr.c Thu Jan  9 17:26:10 2014
@@ -41,6 +41,6 @@ bool is_txn_coml_commit(const deq_hdr_t 
 }
 
 void set_txn_coml_commit(deq_hdr_t *dh, const bool commit) {
-    dh->_rhdr._uflag = commit ? dh->_rhdr._uflag | DEQ_HDR_TXNCMPLCOMMIT_MASK :
-                                dh->_rhdr._uflag & (~DEQ_HDR_TXNCMPLCOMMIT_MASK);
+    dh->_rhdr._uflag = commit ? dh->_rhdr._uflag | DEQ_HDR_TXNCMPLCOMMIT_MASK : // set flag bit
+                                dh->_rhdr._uflag & (~DEQ_HDR_TXNCMPLCOMMIT_MASK); // unset flag bit
 }

Modified: qpid/trunk/qpid/cpp/src/qpid/linearstore/journal/utils/file_hdr.c
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/linearstore/journal/utils/file_hdr.c?rev=1556892&r1=1556891&r2=1556892&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/linearstore/journal/utils/file_hdr.c (original)
+++ qpid/trunk/qpid/cpp/src/qpid/linearstore/journal/utils/file_hdr.c Thu Jan  9 17:26:10 2014
@@ -75,6 +75,7 @@ void file_hdr_copy(file_hdr_t* dest, con
 
 void file_hdr_reset(file_hdr_t* target) {
     target->_rhdr._uflag = 0;
+    target->_rhdr._serial = 0;
     target->_rhdr._rid = 0;
     target->_fro = 0;
     target->_ts_sec = 0;
@@ -85,6 +86,7 @@ void file_hdr_reset(file_hdr_t* target) 
 
 int is_file_hdr_reset(file_hdr_t* target) {
     return target->_rhdr._uflag == 0 &&
+           target->_rhdr._serial == 0 &&
            target->_rhdr._rid == 0 &&
            target->_ts_sec == 0 &&
            target->_ts_nsec == 0 &&

Modified: qpid/trunk/qpid/cpp/src/qpid/linearstore/journal/utils/file_hdr.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/linearstore/journal/utils/file_hdr.h?rev=1556892&r1=1556891&r2=1556892&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/linearstore/journal/utils/file_hdr.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/linearstore/journal/utils/file_hdr.h Thu Jan  9 17:26:10 2014
@@ -51,7 +51,7 @@ extern "C"{
  * +---+---+---+---+---+---+---+---+   |
  * |              rid              |   |
  * +---+---+---+---+---+---+---+---+  -+
- * |  fs   | partn |   reserved    |
+ * |  fhs  | partn |   reserved    |
  * +---+---+---+---+---+---+---+---+
  * |           data-size           |
  * +---+---+---+---+---+---+---+---+
@@ -70,7 +70,7 @@ extern "C"{
  *
  * ver = Journal version
  * rid = Record ID
- * fs = File header size in sblks (defined by JRNL_SBLK_SIZE)
+ * fhs = File header size in sblks (defined by JRNL_SBLK_SIZE)
  * partn = EFP partition from which this file came
  * fro = First Record Offset
  * qnl = Length of the queue name in octets.

Modified: qpid/trunk/qpid/cpp/src/qpid/linearstore/journal/wmgr.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/linearstore/journal/wmgr.cpp?rev=1556892&r1=1556891&r2=1556892&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/linearstore/journal/wmgr.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/linearstore/journal/wmgr.cpp Thu Jan  9 17:26:10 2014
@@ -426,8 +426,8 @@ wmgr::abort(data_tok* dtokp,
 
             // Delete this txn from tmap, unlock any locked records in emap
             std::string xid((const char*)xid_ptr, xid_len);
-            txn_data_list tdl = _tmap.get_remove_tdata_list(xid); // tdl will be empty if xid not found
-            for (tdl_itr itr = tdl.begin(); itr != tdl.end(); itr++)
+            txn_data_list_t tdl = _tmap.get_remove_tdata_list(xid); // tdl will be empty if xid not found
+            for (tdl_itr_t itr = tdl.begin(); itr != tdl.end(); itr++)
             {
 				if (!itr->enq_flag_)
 				    _emap.unlock(itr->drid_); // ignore rid not found error
@@ -525,8 +525,8 @@ wmgr::commit(data_tok* dtokp,
 
             // Delete this txn from tmap, process records into emap
             std::string xid((const char*)xid_ptr, xid_len);
-            txn_data_list tdl = _tmap.get_remove_tdata_list(xid); // tdl will be empty if xid not found
-            for (tdl_itr itr = tdl.begin(); itr != tdl.end(); itr++)
+            txn_data_list_t tdl = _tmap.get_remove_tdata_list(xid); // tdl will be empty if xid not found
+            for (tdl_itr_t itr = tdl.begin(); itr != tdl.end(); itr++)
             {
                 if (itr->enq_flag_) // txn enqueue
                 {



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org