You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kp...@apache.org on 2014/02/05 19:49:33 UTC

svn commit: r1564877 [2/2] - in /qpid/trunk/qpid/cpp/src/qpid/linearstore: ./ journal/

Modified: qpid/trunk/qpid/cpp/src/qpid/linearstore/journal/wmgr.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/linearstore/journal/wmgr.cpp?rev=1564877&r1=1564876&r2=1564877&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/linearstore/journal/wmgr.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/linearstore/journal/wmgr.cpp Wed Feb  5 18:49:32 2014
@@ -30,8 +30,6 @@
 #include "qpid/linearstore/journal/LinearFileController.h"
 #include "qpid/linearstore/journal/utils/file_hdr.h"
 
-//#include <iostream> // DEBUG
-
 namespace qpid {
 namespace linearstore {
 namespace journal {
@@ -49,7 +47,7 @@ wmgr::wmgr(jcntl* jc,
         _deq_busy(false),
         _abort_busy(false),
         _commit_busy(false),
-        _txn_pending_set()
+        _txn_pending_map()
 {}
 
 wmgr::wmgr(jcntl* jc,
@@ -67,7 +65,7 @@ wmgr::wmgr(jcntl* jc,
         _deq_busy(false),
         _abort_busy(false),
         _commit_busy(false),
-        _txn_pending_set()
+        _txn_pending_map()
 {}
 
 wmgr::~wmgr()
@@ -281,6 +279,7 @@ wmgr::dequeue(data_tok* dtokp,
         _deq_busy = true;
     }
 //std::cout << "---+++ wmgr::dequeue() DEQ rid=0x" << std::hex << rid << " drid=0x" << dequeue_rid << " " << std::dec << std::flush; // DEBUG
+    std::string xid((const char*)xid_ptr, xid_len);
     bool done = false;
     Checksum checksum;
     while (!done)
@@ -292,9 +291,27 @@ wmgr::dequeue(data_tok* dtokp,
         uint32_t ret = _deq_rec.encode(wptr, data_offs_dblks,
                 (_cache_pgsize_sblks * QLS_SBLK_SIZE_DBLKS) - _pg_offset_dblks, checksum);
 
-        // Remember fid which contains the record header in case record is split over several files
         if (data_offs_dblks == 0) {
-            dtokp->set_fid(_lfc.getCurrentFileSeqNum());
+            uint64_t fid;
+            short eres = _emap.get_pfid(dtokp->dequeue_rid(), fid);
+            if (eres == enq_map::EMAP_OK) {
+                dtokp->set_fid(fid);
+            } else if (xid_len > 0) {
+                txn_data_list_t tdl = _tmap.get_tdata_list(xid);
+                bool found = false;
+                for (tdl_const_itr_t i=tdl.begin(); i!=tdl.end() && !found; ++i) {
+                    if (i->rid_ == dtokp->dequeue_rid()) {
+                        found = true;
+                        dtokp->set_fid(i->pfid_);
+                        break;
+                    }
+                }
+                if (!found) {
+                    throw jexception("rid found in neither emap nor tmap, transactional");
+                }
+            } else {
+                throw jexception("rid not found in emap, non-transactional");
+            }
         }
         _pg_offset_dblks += ret;
         _cached_offset_dblks += ret;
@@ -325,7 +342,7 @@ wmgr::dequeue(data_tok* dtokp,
                     if (eres == enq_map::EMAP_RID_NOT_FOUND)
                     {
                         std::ostringstream oss;
-                        oss << std::hex << "rid=0x" << rid;
+                        oss << std::hex << "emap: rid=0x" << rid;
                         throw jexception(jerrno::JERR_MAP_NOTFOUND, oss.str(), "wmgr", "dequeue");
                     }
                     if (eres == enq_map::EMAP_LOCKED)
@@ -335,10 +352,6 @@ wmgr::dequeue(data_tok* dtokp,
                         throw jexception(jerrno::JERR_MAP_LOCKED, oss.str(), "wmgr", "dequeue");
                     }
                 }
-//std::cout << "[0x" << std::hex << _lfc.getEnqueuedRecordCount(fid) << std::dec << std::flush; // DEBUG
-//try {
-                _lfc.decrEnqueuedRecordCount(fid);
-//} catch (std::exception& e) { std::cout << "***OOPS*** " << e.what() << " cfid=" << _lfc.getCurrentFileSeqNum() << " fid=" << fid << std::flush; throw; }
             }
 
             done = true;
@@ -427,14 +440,16 @@ wmgr::abort(data_tok* dtokp,
             // Delete this txn from tmap, unlock any locked records in emap
             std::string xid((const char*)xid_ptr, xid_len);
             txn_data_list_t tdl = _tmap.get_remove_tdata_list(xid); // tdl will be empty if xid not found
+            fidl_t fidl;
             for (tdl_itr_t itr = tdl.begin(); itr != tdl.end(); itr++)
             {
 				if (!itr->enq_flag_)
 				    _emap.unlock(itr->drid_); // ignore rid not found error
-                if (itr->enq_flag_)
-                    _lfc.decrEnqueuedRecordCount(itr->pfid_);
+                if (itr->enq_flag_) {
+                    fidl.push_back(itr->pfid_);
+                }
             }
-            std::pair<std::set<std::string>::iterator, bool> res = _txn_pending_set.insert(xid);
+            std::pair<pending_txn_map_itr_t, bool> res = _txn_pending_map.insert(std::pair<std::string, fidl_t>(xid, fidl));
             if (!res.second)
             {
                 std::ostringstream oss;
@@ -526,6 +541,7 @@ wmgr::commit(data_tok* dtokp,
             // Delete this txn from tmap, process records into emap
             std::string xid((const char*)xid_ptr, xid_len);
             txn_data_list_t tdl = _tmap.get_remove_tdata_list(xid); // tdl will be empty if xid not found
+            fidl_t fidl;
             for (tdl_itr_t itr = tdl.begin(); itr != tdl.end(); itr++)
             {
                 if (itr->enq_flag_) // txn enqueue
@@ -547,20 +563,20 @@ wmgr::commit(data_tok* dtokp,
                         if (eres == enq_map::EMAP_RID_NOT_FOUND)
                         {
                             std::ostringstream oss;
-                            oss << std::hex << "rid=0x" << rid;
-                            throw jexception(jerrno::JERR_MAP_NOTFOUND, oss.str(), "wmgr", "dequeue");
+                            oss << std::hex << "emap: rid=0x" << itr->drid_;
+                            throw jexception(jerrno::JERR_MAP_NOTFOUND, oss.str(), "wmgr", "commit");
                         }
                         if (eres == enq_map::EMAP_LOCKED)
                         {
                             std::ostringstream oss;
-                            oss << std::hex << "rid=0x" << rid;
-                            throw jexception(jerrno::JERR_MAP_LOCKED, oss.str(), "wmgr", "dequeue");
+                            oss << std::hex << "rid=0x" << itr->drid_;
+                            throw jexception(jerrno::JERR_MAP_LOCKED, oss.str(), "wmgr", "commit");
                         }
                     }
-                    _lfc.decrEnqueuedRecordCount(fid);
+                    fidl.push_back(fid);
                 }
             }
-            std::pair<std::set<std::string>::iterator, bool> res = _txn_pending_set.insert(xid);
+            std::pair<pending_txn_map_itr_t, bool> res = _txn_pending_map.insert(std::pair<std::string, fidl_t>(xid, fidl));
             if (!res.second)
             {
                 std::ostringstream oss;
@@ -695,7 +711,7 @@ wmgr::get_next_file()
 {
     _pg_cntr = 0;
 //std::cout << "&&&&& wmgr::get_next_file(): " << status_str() << std::flush << std::endl; // DEBUG
-    _lfc.pullEmptyFileFromEfp();
+    _lfc.getNextJournalFile();
 }
 
 int32_t
@@ -757,7 +773,7 @@ wmgr::get_events(timespec* const timeout
                 data_tok* dtokp = pcbp->_pdtokl->at(k);
                 if (dtokp->decr_pg_cnt() == 0)
                 {
-                    std::set<std::string>::iterator it;
+                    pending_txn_map_itr_t it;
                     switch (dtokp->wstate())
                     {
                     case data_tok::ENQ_SUBM:
@@ -770,6 +786,9 @@ wmgr::get_events(timespec* const timeout
                             _tmap.set_aio_compl(dtokp->xid(), dtokp->rid());
                         break;
                     case data_tok::DEQ_SUBM:
+                        if (!dtokp->has_xid()) {
+                            _lfc.decrEnqueuedRecordCount(dtokp->fid());
+                        }
                         dtokl.push_back(dtokp);
                         tot_data_toks++;
                         dtokp->set_wstate(data_tok::DEQ);
@@ -781,31 +800,35 @@ wmgr::get_events(timespec* const timeout
                         dtokl.push_back(dtokp);
                         tot_data_toks++;
                         dtokp->set_wstate(data_tok::ABORTED);
-                        it = _txn_pending_set.find(dtokp->xid());
-                        if (it == _txn_pending_set.end())
+                        it = _txn_pending_map.find(dtokp->xid());
+                        if (it == _txn_pending_map.end())
                         {
                             std::ostringstream oss;
-                            oss << std::hex << "_txn_pending_set: abort xid=\"";
-                            oss << dtokp->xid() << "\"";
-                            throw jexception(jerrno::JERR_MAP_NOTFOUND, oss.str(), "wmgr",
-                                    "get_events");
+                            oss << std::hex << "_txn_pending_set: abort xid=\""
+                                            << qpid::linearstore::journal::jcntl::str2hexnum(dtokp->xid()) << "\"";
+                            throw jexception(jerrno::JERR_MAP_NOTFOUND, oss.str(), "wmgr", "get_events");
+                        }
+                        for (fidl_itr_t i=it->second.begin(); i!=it->second.end(); ++i) {
+                            _lfc.decrEnqueuedRecordCount(*i);
                         }
-                        _txn_pending_set.erase(it);
+                        _txn_pending_map.erase(it);
                         break;
                     case data_tok::COMMIT_SUBM:
                         dtokl.push_back(dtokp);
                         tot_data_toks++;
                         dtokp->set_wstate(data_tok::COMMITTED);
-                        it = _txn_pending_set.find(dtokp->xid());
-                        if (it == _txn_pending_set.end())
+                        it = _txn_pending_map.find(dtokp->xid());
+                        if (it == _txn_pending_map.end())
                         {
                             std::ostringstream oss;
-                            oss << std::hex << "_txn_pending_set: commit xid=\"";
-                            oss << dtokp->xid() << "\"";
-                            throw jexception(jerrno::JERR_MAP_NOTFOUND, oss.str(), "wmgr",
-                                    "get_events");
+                            oss << std::hex << "_txn_pending_set: commit xid=\""
+                                            << qpid::linearstore::journal::jcntl::str2hexnum(dtokp->xid()) << "\"";
+                            throw jexception(jerrno::JERR_MAP_NOTFOUND, oss.str(), "wmgr", "get_events");
+                        }
+                        for (fidl_itr_t i=it->second.begin(); i!=it->second.end(); ++i) {
+                            _lfc.decrEnqueuedRecordCount(*i);
                         }
-                        _txn_pending_set.erase(it);
+                        _txn_pending_map.erase(it);
                         break;
                     case data_tok::ENQ_PART:
                     case data_tok::DEQ_PART:
@@ -858,8 +881,8 @@ wmgr::is_txn_synced(const std::string& x
     if (_tmap.is_txn_synced(xid) == txn_map::TMAP_NOT_SYNCED)
         return false;
     // Check for outstanding commit/aborts
-    std::set<std::string>::iterator it = _txn_pending_set.find(xid);
-    return it == _txn_pending_set.end();
+    pending_txn_map_itr_t it = _txn_pending_map.find(xid);
+    return it == _txn_pending_map.end();
 }
 
 void
@@ -871,7 +894,6 @@ wmgr::initialize(aio_callback* const cbp
     pmgr::initialize(cbp, wcache_pgsize_sblks, wcache_num_pages);
     wmgr::clean();
     _page_cb_arr[0]._state = IN_USE;
-    _ddtokl.clear();
     _cached_offset_dblks = 0;
     _enq_busy = false;
 }

Modified: qpid/trunk/qpid/cpp/src/qpid/linearstore/journal/wmgr.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/linearstore/journal/wmgr.h?rev=1564877&r1=1564876&r2=1564877&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/linearstore/journal/wmgr.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/linearstore/journal/wmgr.h Wed Feb  5 18:49:32 2014
@@ -22,9 +22,11 @@
 #ifndef QPID_LINEARSTORE_JOURNAL_WMGR_H
 #define QPID_LINEARSTORE_JOURNAL_WMGR_H
 
+#include <deque>
+#include <map>
 #include "qpid/linearstore/journal/enums.h"
 #include "qpid/linearstore/journal/pmgr.h"
-#include <set>
+#include <vector>
 
 namespace qpid {
 namespace linearstore {
@@ -51,11 +53,15 @@ class LinearFileController;
 class wmgr : public pmgr
 {
 private:
+    typedef std::vector<uint64_t> fidl_t;
+    typedef fidl_t::iterator fidl_itr_t;
+    typedef std::map<std::string, fidl_t> pending_txn_map_t;
+    typedef pending_txn_map_t::iterator pending_txn_map_itr_t;
+
     LinearFileController& _lfc;     ///< Linear File Controller ref
     uint32_t _max_dtokpp;           ///< Max data writes per page
     uint32_t _max_io_wait_us;       ///< Max wait in microseconds till submit
     uint32_t _cached_offset_dblks;  ///< Amount of unwritten data in page (dblocks)
-    std::deque<data_tok*> _ddtokl;  ///< Deferred dequeue data_tok list
 
     // TODO: Convert _enq_busy etc into a proper threadsafe lock
     // TODO: Convert to enum? Are these encodes mutually exclusive?
@@ -70,7 +76,7 @@ private:
     enq_rec _enq_rec;               ///< Enqueue record used for encoding/decoding
     deq_rec _deq_rec;               ///< Dequeue record used for encoding/decoding
     txn_rec _txn_rec;               ///< Transaction record used for encoding/decoding
-    std::set<std::string> _txn_pending_set; ///< Set containing xids of pending commits/aborts
+    pending_txn_map_t _txn_pending_map; ///< Set containing xids of pending commits/aborts
 
 public:
     wmgr(jcntl* jc,



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org