You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kp...@apache.org on 2013/10/07 20:39:25 UTC

svn commit: r1530024 [1/4] - in /qpid/branches/linearstore/qpid/cpp/src: ./ qpid/linearstore/ qpid/linearstore/jrnl/ qpid/linearstore/jrnl/utils/ tests/linearstore/

Author: kpvdr
Date: Mon Oct  7 18:39:24 2013
New Revision: 1530024

URL: http://svn.apache.org/r1530024
Log:
QPID-4984: WIP - Compiles, but functionally incomplete

Added:
    qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/AtomicCounter.h
    qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/LinearFileController.cpp
      - copied, changed from r1525056, qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/JournalFileController.cpp
    qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/LinearFileController.h
      - copied, changed from r1525056, qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/JournalFileController.h
    qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/RecoveryManager.cpp
    qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/RecoveryManager.h
    qpid/branches/linearstore/qpid/cpp/src/tests/linearstore/
    qpid/branches/linearstore/qpid/cpp/src/tests/linearstore/linearstoredirsetup.sh   (with props)
Removed:
    qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/JournalFileController.cpp
    qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/JournalFileController.h
Modified:
    qpid/branches/linearstore/qpid/cpp/src/linearstore.cmake
    qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/EmptyFilePoolManagerImpl.cpp
    qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/JournalImpl.cpp
    qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/JournalImpl.h
    qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/MessageStoreImpl.cpp
    qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/MessageStoreImpl.h
    qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/EmptyFilePool.cpp
    qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/EmptyFilePool.h
    qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/EmptyFilePoolManager.cpp
    qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/EmptyFilePoolManager.h
    qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/EmptyFilePoolPartition.cpp
    qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/EmptyFilePoolPartition.h
    qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/EmptyFilePoolTypes.h
    qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/JournalFile.cpp
    qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/JournalFile.h
    qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/JournalLog.cpp
    qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/data_tok.cpp
    qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/data_tok.h
    qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/deq_rec.cpp
    qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/enq_map.cpp
    qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/enq_map.h
    qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/enq_rec.cpp
    qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/enums.h
    qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/jcfg.h
    qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/jcntl.cpp
    qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/jcntl.h
    qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/jdir.cpp
    qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/jdir.h
    qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/jerrno.cpp
    qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/jerrno.h
    qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/jrec.h
    qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/pmgr.cpp
    qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/pmgr.h
    qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/rcvdat.h
    qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/rmgr.cpp
    qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/txn_map.cpp
    qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/txn_map.h
    qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/txn_rec.cpp
    qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/utils/enq_hdr.c
    qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/utils/enq_hdr.h
    qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/utils/file_hdr.c
    qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/utils/file_hdr.h
    qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/wmgr.cpp
    qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/wmgr.h

Modified: qpid/branches/linearstore/qpid/cpp/src/linearstore.cmake
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/linearstore.cmake?rev=1530024&r1=1530023&r2=1530024&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/linearstore.cmake (original)
+++ qpid/branches/linearstore/qpid/cpp/src/linearstore.cmake Mon Oct  7 18:39:24 2013
@@ -83,27 +83,20 @@ if (BUILD_LINEARSTORE)
         qpid/linearstore/jrnl/EmptyFilePool.cpp
         qpid/linearstore/jrnl/EmptyFilePoolManager.cpp
         qpid/linearstore/jrnl/EmptyFilePoolPartition.cpp
-        #qpid/linearstore/jrnl/fcntl.cpp
         qpid/linearstore/jrnl/jcntl.cpp
         qpid/linearstore/jrnl/jdir.cpp
         qpid/linearstore/jrnl/jerrno.cpp
         qpid/linearstore/jrnl/jexception.cpp
-        #qpid/linearstore/jrnl/jinf.cpp
 		qpid/linearstore/jrnl/JournalFile.cpp
-		qpid/linearstore/jrnl/JournalFileController.cpp
 		qpid/linearstore/jrnl/JournalLog.cpp
         qpid/linearstore/jrnl/jrec.cpp
-        #qpid/linearstore/jrnl/lp_map.cpp
-        #qpid/linearstore/jrnl/lpmgr.cpp
+        qpid/linearstore/jrnl/LinearFileController.cpp
         qpid/linearstore/jrnl/pmgr.cpp
-        qpid/linearstore/jrnl/rmgr.cpp
-        #qpid/linearstore/jrnl/rfc.cpp
-        #qpid/linearstore/jrnl/rrfc.cpp
+        qpid/linearstore/jrnl/RecoveryManager.cpp
         qpid/linearstore/jrnl/time_ns.cpp
         qpid/linearstore/jrnl/txn_map.cpp
         qpid/linearstore/jrnl/txn_rec.cpp
         qpid/linearstore/jrnl/wmgr.cpp
-        #qpid/linearstore/jrnl/wrfc.cpp
     )
 
     # linearstore source files

Modified: qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/EmptyFilePoolManagerImpl.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/EmptyFilePoolManagerImpl.cpp?rev=1530024&r1=1530023&r2=1530024&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/EmptyFilePoolManagerImpl.cpp (original)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/EmptyFilePoolManagerImpl.cpp Mon Oct  7 18:39:24 2013
@@ -49,8 +49,8 @@ void EmptyFilePoolManagerImpl::findEfpPa
             QLS_LOG(info, "  * Partition " << (*i)->partitionNumber() << " containing " << filePoolList.size() << " pool" <<
                           (filePoolList.size()>1 ? "s" : "") << " at \'" << (*i)->partitionDirectory() << "\'");
             for (std::vector<qpid::qls_jrnl::EmptyFilePool*>::const_iterator j=filePoolList.begin(); j!=filePoolList.end(); ++j) {
-                QLS_LOG(info, "    - EFP \'" << (*j)->fileSizeKib() << "k\' containing " << (*j)->numEmptyFiles() <<
-                              " files of size " << (*j)->fileSizeKib() << " KiB totaling " << (*j)->cumFileSizeKib() << " KiB");
+                QLS_LOG(info, "    - EFP \'" << (*j)->dataSize_kib() << "k\' containing " << (*j)->numEmptyFiles() <<
+                              " files of size " << (*j)->dataSize_kib() << " KiB totaling " << (*j)->cumFileSize_kib() << " KiB");
             }
         }
     }

Modified: qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/JournalImpl.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/JournalImpl.cpp?rev=1530024&r1=1530023&r2=1530024&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/JournalImpl.cpp (original)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/JournalImpl.cpp Mon Oct  7 18:39:24 2013
@@ -62,7 +62,6 @@ JournalImpl::JournalImpl(qpid::sys::Time
                          jcntl(journalId, journalDirectory/*, journalBaseFilename*/),
                          timer(timer_),
                          getEventsTimerSetFlag(false),
-                         efpp(0),
 //                         lastReadRid(0),
                          writeActivityFlag(false),
                          flushTriggeredFlag(true),
@@ -119,8 +118,8 @@ JournalImpl::initManagement(qpid::manage
         _mgmtObject->set_name(_jid);
         _mgmtObject->set_directory(_jdir.dirname());
 //        _mgmtObject->set_baseFileName(_base_filename);
-        _mgmtObject->set_readPageSize(JRNL_RMGR_PAGE_SIZE * JRNL_SBLK_SIZE);
-        _mgmtObject->set_readPages(JRNL_RMGR_PAGES);
+//        _mgmtObject->set_readPageSize(JRNL_RMGR_PAGE_SIZE * JRNL_SBLK_SIZE);
+//        _mgmtObject->set_readPages(JRNL_RMGR_PAGES);
 
         // The following will be set on initialize(), but being properties, these must be set to 0 in the meantime
         //_mgmtObject->set_initialFileCount(0);
@@ -140,7 +139,6 @@ JournalImpl::initialize(qpid::qls_jrnl::
                         const uint32_t wcache_pgsize_sblks,
                         qpid::qls_jrnl::aio_callback* const cbp)
 {
-    efpp = efpp_;
 //    efpp->createJournal(_jdir);
 //    QLS_LOG2(notice, _jid, "Initialized");
 //    std::ostringstream oss;
@@ -150,7 +148,7 @@ JournalImpl::initialize(qpid::qls_jrnl::
 //    oss << " wcache_pgsize_sblks=" << wcache_pgsize_sblks;
 //    oss << " wcache_num_pages=" << wcache_num_pages;
 //    QLS_LOG2(debug, _jid, oss.str());
-    jcntl::initialize(/*num_jfiles, auto_expand, ae_max_jfiles, jfsize_sblks,*/ efpp, wcache_num_pages, wcache_pgsize_sblks, cbp);
+    jcntl::initialize(/*num_jfiles, auto_expand, ae_max_jfiles, jfsize_sblks,*/ efpp_, wcache_num_pages, wcache_pgsize_sblks, cbp);
 //    QLS_LOG2(debug, _jid, "Initialization complete");
     // TODO: replace for linearstore: _lpmgr
 /*
@@ -175,6 +173,7 @@ JournalImpl::recover(/*const uint16_t nu
                      const bool auto_expand,
                      const uint16_t ae_max_jfiles,
                      const uint32_t jfsize_sblks,*/
+                     boost::shared_ptr<qpid::qls_jrnl::EmptyFilePoolManager> efpm,
                      const uint16_t wcache_num_pages,
                      const uint32_t wcache_pgsize_sblks,
                      qpid::qls_jrnl::aio_callback* const cbp,
@@ -210,10 +209,10 @@ JournalImpl::recover(/*const uint16_t nu
             prep_xid_list.push_back(i->xid);
         }
 
-        jcntl::recover(/*num_jfiles, auto_expand, ae_max_jfiles, jfsize_sblks,*/ wcache_num_pages, wcache_pgsize_sblks,
+        jcntl::recover(/*num_jfiles, auto_expand, ae_max_jfiles, jfsize_sblks,*/efpm.get(), wcache_num_pages, wcache_pgsize_sblks,
                 cbp, &prep_xid_list, highest_rid);
     } else {
-        jcntl::recover(/*num_jfiles, auto_expand, ae_max_jfiles, jfsize_sblks,*/ wcache_num_pages, wcache_pgsize_sblks,
+        jcntl::recover(/*num_jfiles, auto_expand, ae_max_jfiles, jfsize_sblks,*/efpm.get(), wcache_num_pages, wcache_pgsize_sblks,
                 cbp, 0, highest_rid);
     }
 
@@ -559,9 +558,11 @@ JournalImpl::wr_aio_cb(std::vector<data_
 		    switch (dtokp->wstate())
 		    {
  			    case data_tok::ENQ:
+ 			        std::cout << "<<<>>> JournalImpl::wr_aio_cb() ENQ dtokp rid=" << dtokp->rid() << std::endl << std::flush; // DEBUG
              	    dtokp->getSourceMessage()->enqueueComplete();
  				    break;
 			    case data_tok::DEQ:
+			        std::cout << "<<<>>> JournalImpl::wr_aio_cb() DEQ dtokp rid=" << dtokp->rid() << std::endl << std::flush; // DEBUG
 /* Don't need to signal until we have a way to ack completion of dequeue in AMQP
                     dtokp->getSourceMessage()->dequeueComplete();
                     if ( dtokp->getSourceMessage()->isDequeueComplete()  ) // clear id after last dequeue
@@ -607,16 +608,7 @@ JournalImpl::handleIoResult(const iores 
     {
         case qpid::qls_jrnl::RHM_IORES_SUCCESS:
             return;
-        case qpid::qls_jrnl::RHM_IORES_ENQCAPTHRESH:
-            {
-                std::ostringstream oss;
-                oss << "Enqueue capacity threshold exceeded.";
-                QLS_LOG2(warning, _jid, oss.str());
-                if (_agent != 0)
-                    _agent->raiseEvent(qmf::org::apache::qpid::linearstore::EventEnqThresholdExceeded(_jid, "Journal enqueue capacity threshold exceeded"),
-                                       qpid::management::ManagementAgent::SEV_WARN);
-                THROW_STORE_FULL_EXCEPTION(oss.str());
-            }
+/*
         case qpid::qls_jrnl::RHM_IORES_FULL:
             {
                 std::ostringstream oss;
@@ -626,6 +618,7 @@ JournalImpl::handleIoResult(const iores 
                     _agent->raiseEvent(qmf::org::apache::qpid::linearstore::EventFull(_jid, "Journal full"), qpid::management::ManagementAgent::SEV_ERROR);
                 THROW_STORE_FULL_EXCEPTION(oss.str());
             }
+*/
         default:
             {
                 std::ostringstream oss;

Modified: qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/JournalImpl.h
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/JournalImpl.h?rev=1530024&r1=1530023&r2=1530024&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/JournalImpl.h (original)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/JournalImpl.h Mon Oct  7 18:39:24 2013
@@ -79,30 +79,16 @@ class JournalImpl : public qpid::broker:
     typedef boost::function<void (JournalImpl&)> DeleteCallback;
 
   private:
-//    static qpid::sys::Mutex _static_lock;
-//    static uint32_t cnt;
-
     qpid::sys::Timer& timer;
     bool getEventsTimerSetFlag;
     boost::intrusive_ptr<qpid::sys::TimerTask> getEventsFireEventsPtr;
     qpid::sys::Mutex _getf_lock;
     qpid::sys::Mutex _read_lock;
-    qpid::qls_jrnl::EmptyFilePool* efpp;
-
-//    uint64_t lastReadRid; // rid of last read msg for loadMsgContent() - detects out-of-order read requests
-//    std::vector<uint64_t> oooRidList; // list of out-of-order rids (greater than current rid) encountered during read sequence
 
     bool writeActivityFlag;
     bool flushTriggeredFlag;
     boost::intrusive_ptr<qpid::sys::TimerTask> inactivityFireEventPtr;
 
-    // temp local vars for loadMsgContent below
-//    void* _xidp;
-//    void* _datap;
-//    size_t _dlen;
-//    qpid::qls_jrnl::data_tok _dtok;
-//    bool _external;
-
     qpid::management::ManagementAgent* _agent;
     qmf::org::apache::qpid::linearstore::Journal::shared_ptr _mgmtObject;
     DeleteCallback deleteCallback;
@@ -112,7 +98,6 @@ class JournalImpl : public qpid::broker:
     JournalImpl(qpid::sys::Timer& timer,
                 const std::string& journalId,
                 const std::string& journalDirectory,
-//                const std::string& journalBaseFilename,
                 const qpid::sys::Duration getEventsTimeout,
                 const qpid::sys::Duration flushTimeout,
                 qpid::management::ManagementAgent* agent,
@@ -122,29 +107,18 @@ class JournalImpl : public qpid::broker:
 
     void initManagement(qpid::management::ManagementAgent* agent);
 
-    void initialize(/*const uint16_t num_jfiles,
-                    const bool auto_expand,
-                    const uint16_t ae_max_jfiles,
-                    const uint32_t jfsize_sblks,*/
-                    qpid::qls_jrnl::EmptyFilePool* efp,
+    void initialize(qpid::qls_jrnl::EmptyFilePool* efp,
                     const uint16_t wcache_num_pages,
                     const uint32_t wcache_pgsize_sblks,
                     qpid::qls_jrnl::aio_callback* const cbp);
 
-    inline void initialize(/*const uint16_t num_jfiles,
-                           const bool auto_expand,
-                           const uint16_t ae_max_jfiles,
-                           const uint32_t jfsize_sblks,*/
-                           qpid::qls_jrnl::EmptyFilePool* efp,
+    inline void initialize(qpid::qls_jrnl::EmptyFilePool* efpp,
                            const uint16_t wcache_num_pages,
                            const uint32_t wcache_pgsize_sblks) {
-        initialize(/*num_jfiles, auto_expand, ae_max_jfiles, jfsize_sblks,*/ efp, wcache_num_pages, wcache_pgsize_sblks, this);
+        initialize(efpp, wcache_num_pages, wcache_pgsize_sblks, this);
     }
 
-    void recover(/*const uint16_t num_jfiles,
-                 const bool auto_expand,
-                 const uint16_t ae_max_jfiles,
-                 const uint32_t jfsize_sblks,*/
+    void recover(boost::shared_ptr<qpid::qls_jrnl::EmptyFilePoolManager> efpm,
                  const uint16_t wcache_num_pages,
                  const uint32_t wcache_pgsize_sblks,
                  qpid::qls_jrnl::aio_callback* const cbp,
@@ -152,17 +126,13 @@ class JournalImpl : public qpid::broker:
                  uint64_t& highest_rid,
                  uint64_t queue_id);
 
-    inline void recover(/*const uint16_t num_jfiles,
-                        const bool auto_expand,
-                        const uint16_t ae_max_jfiles,
-                        const uint32_t jfsize_sblks,*/
+    inline void recover(boost::shared_ptr<qpid::qls_jrnl::EmptyFilePoolManager> efpm,
                         const uint16_t wcache_num_pages,
                         const uint32_t wcache_pgsize_sblks,
                         boost::ptr_list<PreparedTransaction>* prep_tx_list_ptr,
                         uint64_t& highest_rid,
                         uint64_t queue_id) {
-        recover(/*num_jfiles, auto_expand, ae_max_jfiles, jfsize_sblks,*/ wcache_num_pages, wcache_pgsize_sblks,
-                this, prep_tx_list_ptr, highest_rid, queue_id);
+        recover(efpm, wcache_num_pages, wcache_pgsize_sblks, this, prep_tx_list_ptr, highest_rid, queue_id);
     }
 
     void recover_complete();
@@ -197,10 +167,6 @@ class JournalImpl : public qpid::broker:
 
     void stop(bool block_till_aio_cmpl = false);
 
-    // Logging
-//    void log(qpid::qls_jrnl::log_level level, const std::string& log_stmt) const;
-//    void log(qpid::qls_jrnl::log_level level, const char* const log_stmt) const;
-
     // Overrides for get_events timer
     qpid::qls_jrnl::iores flush(const bool block_till_aio_cmpl = false);
 
@@ -249,15 +215,15 @@ class TplJournalImpl : public JournalImp
     TplJournalImpl(qpid::sys::Timer& timer,
                    const std::string& journalId,
                    const std::string& journalDirectory,
-//                   const std::string& journalBaseFilename,
                    const qpid::sys::Duration getEventsTimeout,
                    const qpid::sys::Duration flushTimeout,
                    qpid::management::ManagementAgent* agent) :
-        JournalImpl(timer, journalId, journalDirectory/*, journalBaseFilename*/, getEventsTimeout, flushTimeout, agent)
+        JournalImpl(timer, journalId, journalDirectory, getEventsTimeout, flushTimeout, agent)
     {}
 
     virtual ~TplJournalImpl() {}
 
+/*
     // Special version of read_data_record that ignores transactions - needed when reading the TPL
     inline qpid::qls_jrnl::iores read_data_record(void** const datapp, std::size_t& dsize,
                                                 void** const xidpp, std::size_t& xidsize, bool& transient, bool& external,
@@ -265,6 +231,7 @@ class TplJournalImpl : public JournalImp
         return JournalImpl::read_data_record(datapp, dsize, xidpp, xidsize, transient, external, dtokp, true);
     }
     inline void read_reset() { _rmgr.invalidate(); }
+*/
 }; // class TplJournalImpl
 
 } // namespace msgstore

Modified: qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/MessageStoreImpl.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/MessageStoreImpl.cpp?rev=1530024&r1=1530023&r2=1530024&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/MessageStoreImpl.cpp (original)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/MessageStoreImpl.cpp Mon Oct  7 18:39:24 2013
@@ -63,7 +63,7 @@ MessageStoreImpl::TplRecoverStruct::TplR
 
 MessageStoreImpl::MessageStoreImpl(qpid::broker::Broker* broker_, const char* envpath_) :
                                    defaultEfpPartitionNumber(0),
-                                   defaultEfpFileSizeKib(0),
+                                   defaultEfpFileSize_kib(0),
                                    truncateFlag(false),
                                    wCachePgSizeSblks(0),
                                    wCacheNumPages(0),
@@ -83,7 +83,7 @@ uint32_t MessageStoreImpl::chkJrnlWrPage
 
     if (p == 0) {
         // For zero value, use default
-        p = JRNL_WMGR_DEF_PAGE_SIZE * JRNL_SBLK_SIZE / 1024;
+        p = JRNL_WMGR_DEF_PAGE_SIZE_KIB;
         QLS_LOG(warning, "parameter " << paramName_ << " (" << param_ << ") must be a power of 2 between 1 and 128; changing this parameter to default value (" << p << ")");
     } else if ( p > 128 || (p & (p-1)) ) {
         // For any positive value that is not a power of 2, use closest value
@@ -100,22 +100,22 @@ uint32_t MessageStoreImpl::chkJrnlWrPage
 
 uint16_t MessageStoreImpl::getJrnlWrNumPages(const uint32_t wrPageSizeKib_)
 {
-    uint32_t wrPageSizeSblks = wrPageSizeKib_ * 1024 / JRNL_SBLK_SIZE; // convert from KiB to number sblks
-    uint32_t defTotWCacheSize = JRNL_WMGR_DEF_PAGE_SIZE * JRNL_WMGR_DEF_PAGES; // in sblks. Currently 2014 sblks (1 MiB).
+    uint32_t wrPageSizeSblks = wrPageSizeKib_ / JRNL_SBLK_SIZE_KIB; // convert from KiB to number sblks
+    uint32_t defTotWCacheSizeSblks = JRNL_WMGR_DEF_PAGE_SIZE_SBLKS * JRNL_WMGR_DEF_PAGES;
     switch (wrPageSizeKib_)
     {
       case 1:
       case 2:
       case 4:
         // 256 KiB total cache
-        return defTotWCacheSize / wrPageSizeSblks / 4;
+        return defTotWCacheSizeSblks / wrPageSizeSblks / 4;
       case 8:
       case 16:
         // 512 KiB total cache
-        return defTotWCacheSize / wrPageSizeSblks / 2;
+        return defTotWCacheSizeSblks / wrPageSizeSblks / 2;
       default: // 32, 64, 128
         // 1 MiB total cache
-        return defTotWCacheSize / wrPageSizeSblks;
+        return defTotWCacheSizeSblks / wrPageSizeSblks;
     }
 }
 
@@ -125,7 +125,7 @@ qpid::qls_jrnl::efpPartitionNumber_t Mes
     return partition_;
 }
 
-qpid::qls_jrnl::efpFileSizeKib_t MessageStoreImpl::chkEfpFileSizeKiB(const qpid::qls_jrnl::efpFileSizeKib_t efpFileSizeKib_,
+qpid::qls_jrnl::efpDataSize_kib_t MessageStoreImpl::chkEfpFileSizeKiB(const qpid::qls_jrnl::efpDataSize_kib_t efpFileSizeKib_,
                                                                      const std::string& paramName_) {
     uint8_t rem =  efpFileSizeKib_ % uint64_t(JRNL_SBLK_SIZE_KIB);
     if (rem != 0) {
@@ -154,7 +154,7 @@ void MessageStoreImpl::initManagement ()
             mgmtObject->set_location(storeDir);
             mgmtObject->set_tplIsInitialized(false);
             mgmtObject->set_tplDirectory(getTplBaseDir());
-            mgmtObject->set_tplWritePageSize(tplWCachePgSizeSblks * JRNL_SBLK_SIZE);
+            mgmtObject->set_tplWritePageSize(tplWCachePgSizeSblks * JRNL_SBLK_SIZE_BYTES);
             mgmtObject->set_tplWritePages(tplWCacheNumPages);
 
             agent->addObject(mgmtObject, 0, true);
@@ -172,36 +172,30 @@ bool MessageStoreImpl::init(const qpid::
     // Extract and check options
     const StoreOptions* opts = static_cast<const StoreOptions*>(options_);
     qpid::qls_jrnl::efpPartitionNumber_t efpPartition = chkEfpPartition(opts->efpPartition, "efp-partition");
-    qpid::qls_jrnl::efpFileSizeKib_t efpFilePoolSize = chkEfpFileSizeKiB(opts->efpFileSizeKib, "efp-file-size");
+    qpid::qls_jrnl::efpDataSize_kib_t efpFilePoolSize_kib = chkEfpFileSizeKiB(opts->efpFileSizeKib, "efp-file-size");
     uint32_t jrnlWrCachePageSizeKib = chkJrnlWrPageCacheSize(opts->wCachePageSizeKib, "wcache-page-size");
     uint32_t tplJrnlWrCachePageSizeKib = chkJrnlWrPageCacheSize(opts->tplWCachePageSizeKib, "tpl-wcache-page-size");
 
     // Pass option values to init()
-    return init(opts->storeDir, efpPartition, efpFilePoolSize, opts->truncateFlag, jrnlWrCachePageSizeKib, tplJrnlWrCachePageSizeKib);
+    return init(opts->storeDir, efpPartition, efpFilePoolSize_kib, opts->truncateFlag, jrnlWrCachePageSizeKib, tplJrnlWrCachePageSizeKib);
 }
 
 // These params, taken from options, are assumed to be correct and verified
 bool MessageStoreImpl::init(const std::string& storeDir_,
-                           /*uint16_t jfiles,
-                           uint32_t jfileSizePgs,*/
                            qpid::qls_jrnl::efpPartitionNumber_t efpPartition_,
-                           qpid::qls_jrnl::efpFileSizeKib_t efpFileSizeKib_,
+                           qpid::qls_jrnl::efpDataSize_kib_t efpFileSize_kib_,
                            const bool truncateFlag_,
                            uint32_t wCachePageSizeKib_,
-                           /*uint16_t tplJfiles,
-                           uint32_t tplJfileSizePgs,*/
                            uint32_t tplWCachePageSizeKib_)
-                           /*bool      autoJExpand,
-                           uint16_t autoJExpandMaxFiles)*/
 {
     if (isInit) return true;
 
     // Set geometry members (converting to correct units where req'd)
     defaultEfpPartitionNumber = efpPartition_;
-    defaultEfpFileSizeKib = efpFileSizeKib_;
-    wCachePgSizeSblks = wCachePageSizeKib_ * 1024 / JRNL_SBLK_SIZE; // convert from KiB to number sblks
+    defaultEfpFileSize_kib = efpFileSize_kib_;
+    wCachePgSizeSblks = wCachePageSizeKib_ / JRNL_SBLK_SIZE_KIB; // convert from KiB to number sblks
     wCacheNumPages = getJrnlWrNumPages(wCachePageSizeKib_);
-    tplWCachePgSizeSblks = tplWCachePageSizeKib_ * 1024 / JRNL_SBLK_SIZE; // convert from KiB to number sblks
+    tplWCachePgSizeSblks = tplWCachePageSizeKib_ / JRNL_SBLK_SIZE_KIB; // convert from KiB to number sblks
     tplWCacheNumPages = getJrnlWrNumPages(tplWCachePageSizeKib_);
     if (storeDir_.size()>0) storeDir = storeDir_;
 
@@ -212,13 +206,13 @@ bool MessageStoreImpl::init(const std::s
 
     QLS_LOG(notice, "Store module initialized; store-dir=" << storeDir_);
     QLS_LOG(info,   "> Default EFP partition: " << defaultEfpPartitionNumber);
-    QLS_LOG(info,   "> Default EFP file size: " << defaultEfpFileSizeKib << " (KiB)");
+    QLS_LOG(info,   "> Default EFP file size: " << defaultEfpFileSize_kib << " (KiB)");
     QLS_LOG(info,   "> Default write cache page size: " << wCachePageSizeKib_ << " (KiB)");
     QLS_LOG(info,   "> Default number of write cache pages: " << wCacheNumPages);
     QLS_LOG(info,   "> TPL write cache page size: " << tplWCachePageSizeKib_ << " (KiB)");
     QLS_LOG(info,   "> TPL number of write cache pages: " << tplWCacheNumPages);
     QLS_LOG(info,   "> EFP partition: " << defaultEfpPartitionNumber);
-    QLS_LOG(info,   "> EFP file size pool: " << defaultEfpFileSizeKib << " (KiB)");
+    QLS_LOG(info,   "> EFP file size pool: " << defaultEfpFileSize_kib << " (KiB)");
 
     return isInit;
 }
@@ -273,7 +267,7 @@ void MessageStoreImpl::init()
             // NOTE: during normal initialization, agent == 0 because the store is initialized before the management infrastructure.
             // However during a truncated initialization in a cluster, agent != 0. We always pass 0 as the agent for the
             // TplStore to keep things consistent in a cluster. See https://bugzilla.redhat.com/show_bug.cgi?id=681026
-            tplStorePtr.reset(new TplJournalImpl(broker->getTimer(), "TplStore", getTplBaseDir(), /*"tpl",*/ defJournalGetEventsTimeout, defJournalFlushTimeout, 0));
+            tplStorePtr.reset(new TplJournalImpl(broker->getTimer(), "TplStore", getTplBaseDir(), defJournalGetEventsTimeout, defJournalFlushTimeout, 0));
             isInit = true;
         } catch (const DbException& e) {
             if (e.get_errno() == DB_VERSION_MISMATCH)
@@ -350,7 +344,7 @@ void MessageStoreImpl::chkTplStoreInit()
     qpid::sys::Mutex::ScopedLock sl(tplInitLock);
     if (!tplStorePtr->is_ready()) {
         qpid::qls_jrnl::jdir::create_dir(getTplBaseDir());
-        tplStorePtr->initialize(/*tplNumJrnlFiles, false, 0, tplJrnlFsizeSblks,*/ getEmptyFilePool(defaultEfpPartitionNumber, defaultEfpFileSizeKib), tplWCacheNumPages, tplWCachePgSizeSblks);
+        tplStorePtr->initialize(/*tplNumJrnlFiles, false, 0, tplJrnlFsizeSblks,*/ getEmptyFilePool(defaultEfpPartitionNumber, defaultEfpFileSize_kib), tplWCacheNumPages, tplWCachePgSizeSblks);
         if (mgmtObject.get() != 0) mgmtObject->set_tplIsInitialized(true);
     }
 }
@@ -403,26 +397,13 @@ void MessageStoreImpl::create(qpid::brok
     }
     JournalImpl* jQueue = 0;
 
-//    uint16_t localFileCount = numJrnlFiles;
-//    bool      localAutoExpandFlag = autoJrnlExpand;
-//    uint16_t localAutoExpandMaxFileCount = autoJrnlExpandMaxFiles;
-//    uint32_t localFileSizeSblks  = jrnlFsizeSblks;
-//
-//    value = args.get("qpid.file_count");
-//    if (value.get() != 0 && !value->empty() && value->convertsTo<int>())
-//        localFileCount = chkJrnlNumFilesParam((uint16_t) value->get<int>(), "qpid.file_count");
-//
-//    value = args.get("qpid.file_size");
-//    if (value.get() != 0 && !value->empty() && value->convertsTo<int>())
-//        localFileSizeSblks = chkJrnlFileSizeParam((uint32_t) value->get<int>(), "qpid.file_size", wCachePgSizeSblks) * JRNL_RMGR_PAGE_SIZE;
-
     if (queue_.getName().size() == 0)
     {
         QLS_LOG(error, "Cannot create store for empty (null) queue name - ignoring and attempting to continue.");
         return;
     }
 
-    jQueue = new JournalImpl(broker->getTimer(), queue_.getName(), getJrnlDir(queue_), /*std::string("JournalData"),*/
+    jQueue = new JournalImpl(broker->getTimer(), queue_.getName(), getJrnlDir(queue_.getName()),
                              defJournalGetEventsTimeout, defJournalFlushTimeout, agent,
                              boost::bind(&MessageStoreImpl::journalDeleted, this, _1));
     {
@@ -430,32 +411,8 @@ void MessageStoreImpl::create(qpid::brok
         journalList[queue_.getName()]=jQueue;
     }
 
-//    value = args.get("qpid.auto_expand");
-//    if (value.get() != 0 && !value->empty() && value->convertsTo<bool>())
-//        localAutoExpandFlag = (bool) value->get<bool>();
-//
-//    value = args.get("qpid.auto_expand_max_jfiles");
-//    if (value.get() != 0 && !value->empty() && value->convertsTo<int>())
-//        localAutoExpandMaxFileCount = (uint16_t) value->get<int>();
-/*
-    qpid::framing::FieldTable::ValuePtr value;
-    qpid::qls_jrnl::efpPartitionNumber_t localEfpPartition = efpPartition;
-    value = args_.get("qpid.efp_partition");
-    if (value.get() != 0 && !value->empty() && value->convertsTo<int>()) {
-        localEfpPartition = chkEfpPartition((uint32_t)value->get<int>(), "qpid.efp_partition");
-    }
-
-    qpid::qls_jrnl::efpFileSizeKib_t localEfpFileSizeKib = efpFileSizeKib;
-    value = args_.get("qpid.efp_file_size");
-    if (value.get() != 0 && !value->empty() && value->convertsTo<int>()) {
-        localEfpFileSizeKib = chkEfpFileSizeKiB((uint32_t)value->get<int>(),"qpid.efp_file_size" );
-    }
-*/
-
     queue_.setExternalQueueStore(dynamic_cast<qpid::broker::ExternalQueueStore*>(jQueue));
     try {
-        // init will create the deque's for the init...
-//        jQueue->initialize(localFileCount, localAutoExpandFlag, localAutoExpandMaxFileCount, localFileSizeSblks, wCacheNumPages, wCachePgSizeSblks);
         jQueue->initialize(getEmptyFilePool(args_), wCacheNumPages, wCachePgSizeSblks);
     } catch (const qpid::qls_jrnl::jexception& e) {
         THROW_STORE_EXCEPTION(std::string("Queue ") + queue_.getName() + ": create() failed: " + e.what());
@@ -471,7 +428,7 @@ void MessageStoreImpl::create(qpid::brok
 
 qpid::qls_jrnl::EmptyFilePool*
 MessageStoreImpl::getEmptyFilePool(const qpid::qls_jrnl::efpPartitionNumber_t efpPartitionNumber_,
-                                   const qpid::qls_jrnl::efpFileSizeKib_t efpFileSizeKib_) {
+                                   const qpid::qls_jrnl::efpDataSize_kib_t efpFileSizeKib_) {
     qpid::qls_jrnl::EmptyFilePool* efpp = efpMgr->getEmptyFilePool(efpPartitionNumber_, efpFileSizeKib_);
     if (efpp == 0) {
         std::ostringstream oss;
@@ -490,7 +447,7 @@ MessageStoreImpl::getEmptyFilePool(const
         localEfpPartition = chkEfpPartition((uint32_t)value->get<int>(), "qpid.efp_partition");
     }
 
-    qpid::qls_jrnl::efpFileSizeKib_t localEfpFileSizeKib = defaultEfpFileSizeKib;
+    qpid::qls_jrnl::efpDataSize_kib_t localEfpFileSizeKib = defaultEfpFileSize_kib;
     value = args_.get("qpid.efp_file_size");
     if (value.get() != 0 && !value->empty() && value->convertsTo<int>()) {
         localEfpFileSizeKib = chkEfpFileSizeKiB((uint32_t)value->get<int>(),"qpid.efp_file_size" );
@@ -501,21 +458,19 @@ MessageStoreImpl::getEmptyFilePool(const
 void MessageStoreImpl::destroy(qpid::broker::PersistableQueue& queue_)
 {
     QLS_LOG(info,   "*** MessageStoreImpl::destroy() queue=\"" << queue_.getName() << "\"");
-/*
     checkInit();
-    destroy(queueDb, queue);
-    deleteBindingsForQueue(queue);
-    qpid::broker::ExternalQueueStore* eqs = queue.getExternalQueueStore();
+    destroy(queueDb, queue_);
+    deleteBindingsForQueue(queue_);
+    qpid::broker::ExternalQueueStore* eqs = queue_.getExternalQueueStore();
     if (eqs) {
         JournalImpl* jQueue = static_cast<JournalImpl*>(eqs);
         jQueue->delete_jrnl_files();
-        queue.setExternalQueueStore(0); // will delete the journal if exists
+        queue_.setExternalQueueStore(0); // will delete the journal if exists
         {
             qpid::sys::Mutex::ScopedLock sl(journalListLock);
-            journalList.erase(queue.getName());
+            journalList.erase(queue_.getName());
         }
     }
-*/
 }
 
 void MessageStoreImpl::create(const qpid::broker::PersistableExchange& exchange_,
@@ -727,14 +682,13 @@ void MessageStoreImpl::recover(qpid::bro
     registry_.recoveryComplete();
 }
 
-void MessageStoreImpl::recoverQueues(TxnCtxt& /*txn*/,
-                                     qpid::broker::RecoveryManager& /*registry*/,
-                                     queue_index& /*queue_index*/,
-                                     txn_list& /*prepared*/,
-                                     message_index& /*messages*/)
+void MessageStoreImpl::recoverQueues(TxnCtxt& txn,
+                                     qpid::broker::RecoveryManager& registry,
+                                     queue_index& queue_index,
+                                     txn_list& prepared,
+                                     message_index& messages)
 {
     QLS_LOG(info,   "*** MessageStoreImpl::recoverQueues()");
-/*
     Cursor queues;
     queues.open(queueDb, txn.get());
 
@@ -757,8 +711,8 @@ void MessageStoreImpl::recoverQueues(Txn
             QLS_LOG(error, "Cannot recover empty (null) queue name - ignoring and attempting to continue.");
             break;
         }
-        jQueue = new JournalImpl(broker->getTimer(), queueName, getJrnlHashDir(queueName), std::string("JournalData"),
-                                 defJournalGetEventsTimeout, defJournalFlushTimeout, agent,
+        jQueue = new JournalImpl(broker->getTimer(), queueName, getJrnlDir(queueName), defJournalGetEventsTimeout,
+                                 defJournalFlushTimeout, agent,
                                  boost::bind(&MessageStoreImpl::journalDeleted, this, _1));
         {
             qpid::sys::Mutex::ScopedLock sl(journalListLock);
@@ -771,22 +725,23 @@ void MessageStoreImpl::recoverQueues(Txn
             long rcnt = 0L;     // recovered msg count
             long idcnt = 0L;    // in-doubt msg count
             uint64_t thisHighestRid = 0ULL;
-            jQueue->recover(numJrnlFiles, autoJrnlExpand, autoJrnlExpandMaxFiles, jrnlFsizeSblks, wCacheNumPages,
-                            wCachePgSizeSblks, &prepared, thisHighestRid, key.id); // start recovery
+            jQueue->recover(boost::dynamic_pointer_cast<qpid::qls_jrnl::EmptyFilePoolManager>(efpMgr), wCacheNumPages, wCachePgSizeSblks, &prepared, thisHighestRid, key.id);
 
-//            // Check for changes to queue store settings qpid.file_count and qpid.file_size resulting
-//            // from recovery of a store that has had its size changed externally by the resize utility.
-//            // If so, update the queue store settings so that QMF queries will reflect the new values.
-//            const qpid::framing::FieldTable& storeargs = queue->getSettings().storeSettings;
-//            qpid::framing::FieldTable::ValuePtr value;
-//            value = storeargs.get("qpid.file_count");
-//            if (value.get() != 0 && !value->empty() && value->convertsTo<int>() && (uint16_t)value->get<int>() != jQueue->num_jfiles()) {
-//                queue->addArgument("qpid.file_count", jQueue->num_jfiles());
-//            }
-//            value = storeargs.get("qpid.file_size");
-//            if (value.get() != 0 && !value->empty() && value->convertsTo<int>() && (uint32_t)value->get<int>() != jQueue->jfsize_sblks()/JRNL_RMGR_PAGE_SIZE) {
-//                queue->addArgument("qpid.file_size", jQueue->jfsize_sblks()/JRNL_RMGR_PAGE_SIZE);
-//            }
+            // Check for changes to queue store settings qpid.file_count and qpid.file_size resulting
+            // from recovery of a store that has had its size changed externally by the resize utility.
+            // If so, update the queue store settings so that QMF queries will reflect the new values.
+/*
+            const qpid::framing::FieldTable& storeargs = queue->getSettings().storeSettings;
+            qpid::framing::FieldTable::ValuePtr value;
+            value = storeargs.get("qpid.file_count");
+            if (value.get() != 0 && !value->empty() && value->convertsTo<int>() && (uint16_t)value->get<int>() != jQueue->num_jfiles()) {
+                queue->addArgument("qpid.file_count", jQueue->num_jfiles());
+            }
+            value = storeargs.get("qpid.file_size");
+            if (value.get() != 0 && !value->empty() && value->convertsTo<int>() && (uint32_t)value->get<int>() != jQueue->jfsize_sblks()/JRNL_RMGR_PAGE_SIZE) {
+                queue->addArgument("qpid.file_size", jQueue->jfsize_sblks()/JRNL_RMGR_PAGE_SIZE);
+            }
+*/
 
             if (highestRid == 0ULL)
                 highestRid = thisHighestRid;
@@ -810,7 +765,6 @@ void MessageStoreImpl::recoverQueues(Txn
     QLS_LOG(info, "Most recent persistence id found: 0x" << std::hex << highestRid << std::dec);
 
     queueIdSequence.reset(maxQueueId + 1);
-*/
 }
 
 
@@ -901,16 +855,15 @@ void MessageStoreImpl::recoverGeneral(Tx
 }
 
 void MessageStoreImpl::recoverMessages(TxnCtxt& /*txn*/,
-                                       qpid::broker::RecoveryManager& /*recovery*/,
-                                       qpid::broker::RecoverableQueue::shared_ptr& queue_,
-                                       txn_list& /*prepared*/,
-                                       message_index& /*messages*/,
-                                       long& /*rcnt*/,
-                                       long& /*idcnt*/)
+                                       qpid::broker::RecoveryManager& recovery,
+                                       qpid::broker::RecoverableQueue::shared_ptr& queue,
+                                       txn_list& prepared,
+                                       message_index& messages,
+                                       long& rcnt,
+                                       long& idcnt)
 {
-    QLS_LOG(info,   "*** MessageStoreImpl::recoverMessages() queue=\"" << queue_->getName() << "\"");
-/*
-    size_t preambleLength = sizeof(uint32_t)header size;
+    QLS_LOG(info,   "*** MessageStoreImpl::recoverMessages() queue=\"" << queue->getName() << "\"");
+    size_t preambleLength = sizeof(uint32_t)/*header size*/;
 
     JournalImpl* jc = static_cast<JournalImpl*>(queue->getExternalQueueStore());
     DataTokenImpl dtok;
@@ -1035,7 +988,6 @@ void MessageStoreImpl::recoverMessages(T
     } catch (const qpid::qls_jrnl::jexception& e) {
         THROW_STORE_EXCEPTION(std::string("Queue ") + queue->getName() + ": recoverMessages() failed: " + e.what());
     }
-*/
 }
 
 qpid::broker::RecoverableMessage::shared_ptr MessageStoreImpl::getExternMessage(qpid::broker::RecoveryManager& /*recovery*/,
@@ -1079,7 +1031,6 @@ int MessageStoreImpl::enqueueMessage(Txn
 void MessageStoreImpl::readTplStore()
 {
     QLS_LOG(info,   "*** MessageStoreImpl::readTplStore()");
-/*
     tplRecoverMap.clear();
     qpid::qls_jrnl::txn_map& tmap = tplStorePtr->get_txn_map();
     DataTokenImpl dtok;
@@ -1148,7 +1099,6 @@ void MessageStoreImpl::readTplStore()
     } catch (const qpid::qls_jrnl::jexception& e) {
         THROW_STORE_EXCEPTION(std::string("TPL recoverTplStore() failed: ") + e.what());
     }
-*/
 }
 
 void MessageStoreImpl::recoverTplStore()
@@ -1171,10 +1121,9 @@ void MessageStoreImpl::recoverTplStore()
 */
 }
 
-void MessageStoreImpl::recoverLockedMappings(txn_list& /*txns*/)
+void MessageStoreImpl::recoverLockedMappings(txn_list& txns)
 {
     QLS_LOG(info,   "*** MessageStoreImpl::recoverLockedMappings()");
-/*
     if (!tplStorePtr->is_ready())
         recoverTplStore();
 
@@ -1186,7 +1135,6 @@ void MessageStoreImpl::recoverLockedMapp
         deq_ptr.reset(new LockedMappings);
         txns.push_back(new PreparedTransaction(i->first, enq_ptr, deq_ptr));
     }
-*/
 }
 
 void MessageStoreImpl::collectPreparedXids(std::set<std::string>& /*xids*/)
@@ -1260,55 +1208,46 @@ void MessageStoreImpl::loadContent(const
 void MessageStoreImpl::flush(const qpid::broker::PersistableQueue& queue_)
 {
     QLS_LOG(info,   "*** MessageStoreImpl::flush() queue=\"" << queue_.getName() << "\"");
-/*
-    if (queue.getExternalQueueStore() == 0) return;
+    if (queue_.getExternalQueueStore() == 0) return;
     checkInit();
-    std::string qn = queue.getName();
+    std::string qn = queue_.getName();
     try {
-        JournalImpl* jc = static_cast<JournalImpl*>(queue.getExternalQueueStore());
+        JournalImpl* jc = static_cast<JournalImpl*>(queue_.getExternalQueueStore());
         if (jc) {
             // TODO: check if this result should be used...
-            mrg::journal::iores res = jc->flush();
+            /*mrg::journal::iores res =*/ jc->flush();
         }
     } catch (const qpid::qls_jrnl::jexception& e) {
         THROW_STORE_EXCEPTION(std::string("Queue ") + qn + ": flush() failed: " + e.what() );
     }
-*/
 }
 
-void MessageStoreImpl::enqueue(qpid::broker::TransactionContext* /*ctxt*/,
+void MessageStoreImpl::enqueue(qpid::broker::TransactionContext* ctxt_,
                                const boost::intrusive_ptr<qpid::broker::PersistableMessage>& msg_,
-                               const qpid::broker::PersistableQueue& /*queue*/)
+                               const qpid::broker::PersistableQueue& queue_)
 {
-//    QLS_LOG(info,   "*** MessageStoreImpl::enqueue() queue=\"" << queue.getName() << "\"");
-/*
+    //QLS_LOG(info,   "*** MessageStoreImpl::enqueue() queue=\"" << queue_.getName() << "\"");
     checkInit();
-    uint64_t queueId (queue.getPersistenceId());
-    uint64_t messageId (msg->getPersistenceId());
+    uint64_t queueId (queue_.getPersistenceId());
     if (queueId == 0) {
-        THROW_STORE_EXCEPTION("Queue not created: " + queue.getName());
+        THROW_STORE_EXCEPTION("Queue not created: " + queue_.getName());
     }
 
     TxnCtxt implicit;
     TxnCtxt* txn = 0;
-    if (ctxt) {
-        txn = check(ctxt);
+    if (ctxt_) {
+        txn = check(ctxt_);
     } else {
         txn = &implicit;
     }
 
-    bool newId = false;
-    if (messageId == 0) {
-        messageId = messageIdSequence.next();
-        msg->setPersistenceId(messageId);
-        newId = true;
+    if (msg_->getPersistenceId() == 0) {
+        msg_->setPersistenceId(messageIdSequence.next());
     }
-    store(&queue, txn, msg, newId);
+    store(&queue_, txn, msg_);
 
     // add queue* to the txn map..
-    if (ctxt) txn->addXidRecord(queue.getExternalQueueStore());
-*/
-    msg_->enqueueComplete();// DEBUG: only while null fns in use
+    if (ctxt_) txn->addXidRecord(queue_.getExternalQueueStore());
 }
 
 uint64_t MessageStoreImpl::msgEncode(std::vector<char>& buff_,
@@ -1329,91 +1268,85 @@ uint64_t MessageStoreImpl::msgEncode(std
 }
 
 void MessageStoreImpl::store(const qpid::broker::PersistableQueue* queue_,
-                             TxnCtxt* /*txn*/,
-                             const boost::intrusive_ptr<qpid::broker::PersistableMessage>& /*message*/,
-                             bool /*newId*/)
+                             TxnCtxt* txn_,
+                             const boost::intrusive_ptr<qpid::broker::PersistableMessage>& message_)
 {
-    QLS_LOG(info,   "*** MessageStoreImpl::store() queue=\"" << queue_->getName() << "\"");
-/*
+    //QLS_LOG(info,   "*** MessageStoreImpl::store() queue=\"" << queue_->getName() << "\"");
     std::vector<char> buff;
-    uint64_t size = msgEncode(buff, message);
+    uint64_t size = msgEncode(buff, message_);
 
     try {
-        if (queue) {
+        if (queue_) {
             boost::intrusive_ptr<DataTokenImpl> dtokp(new DataTokenImpl);
             dtokp->addRef();
-            dtokp->setSourceMessage(message);
+            dtokp->setSourceMessage(message_);
             dtokp->set_external_rid(true);
-            dtokp->set_rid(message->getPersistenceId()); // set the messageID into the Journal header (record-id)
+            dtokp->set_rid(message_->getPersistenceId()); // set the messageID into the Journal header (record-id)
 
-            JournalImpl* jc = static_cast<JournalImpl*>(queue->getExternalQueueStore());
-            if (txn->getXid().empty()) {
-                jc->enqueue_data_record(&buff[0], size, size, dtokp.get(), !message->isPersistent());
+            JournalImpl* jc = static_cast<JournalImpl*>(queue_->getExternalQueueStore());
+            if (txn_->getXid().empty()) {
+                jc->enqueue_data_record(&buff[0], size, size, dtokp.get(), !message_->isPersistent());
             } else {
-                jc->enqueue_txn_data_record(&buff[0], size, size, dtokp.get(), txn->getXid(), !message->isPersistent());
+                jc->enqueue_txn_data_record(&buff[0], size, size, dtokp.get(), txn_->getXid(), !message_->isPersistent());
             }
         } else {
             THROW_STORE_EXCEPTION(std::string("MessageStoreImpl::store() failed: queue NULL."));
        }
     } catch (const qpid::qls_jrnl::jexception& e) {
-        THROW_STORE_EXCEPTION(std::string("Queue ") + queue->getName() + ": MessageStoreImpl::store() failed: " +
+        THROW_STORE_EXCEPTION(std::string("Queue ") + queue_->getName() + ": MessageStoreImpl::store() failed: " +
                               e.what());
     }
-*/
 }
 
-void MessageStoreImpl::dequeue(qpid::broker::TransactionContext* /*ctxt*/,
+void MessageStoreImpl::dequeue(qpid::broker::TransactionContext* ctxt_,
                                const boost::intrusive_ptr<qpid::broker::PersistableMessage>& msg_,
-                               const qpid::broker::PersistableQueue& /*queue*/)
+                               const qpid::broker::PersistableQueue& queue_)
 {
-//    QLS_LOG(info,   "*** MessageStoreImpl::dequeue() queue=\"" << queue.getName() << "\"");
-/*
+    //QLS_LOG(info,   "*** MessageStoreImpl::dequeue() queue=\"" << queue_.getName() << "\"");
     checkInit();
-    uint64_t queueId (queue.getPersistenceId());
-    uint64_t messageId (msg->getPersistenceId());
+    uint64_t queueId (queue_.getPersistenceId());
+    uint64_t messageId (msg_->getPersistenceId());
     if (queueId == 0) {
-        THROW_STORE_EXCEPTION("Queue \"" + queue.getName() + "\" has null queue Id (has not been created)");
+        THROW_STORE_EXCEPTION("Queue \"" + queue_.getName() + "\" has null queue Id (has not been created)");
     }
     if (messageId == 0) {
-        THROW_STORE_EXCEPTION("Queue \"" + queue.getName() + "\": Dequeuing message with null persistence Id.");
+        THROW_STORE_EXCEPTION("Queue \"" + queue_.getName() + "\": Dequeuing message with null persistence Id.");
     }
 
     TxnCtxt implicit;
     TxnCtxt* txn = 0;
-    if (ctxt) {
-        txn = check(ctxt);
+    if (ctxt_) {
+        txn = check(ctxt_);
     } else {
         txn = &implicit;
     }
 
     // add queue* to the txn map..
-    if (ctxt) txn->addXidRecord(queue.getExternalQueueStore());
-    async_dequeue(ctxt, msg, queue);
-*/
+    if (ctxt_) txn->addXidRecord(queue_.getExternalQueueStore());
+    async_dequeue(ctxt_, msg_, queue_);
     msg_->dequeueComplete();
 }
 
-void MessageStoreImpl::async_dequeue(qpid::broker::TransactionContext* /*ctxt*/,
-                                     const boost::intrusive_ptr<qpid::broker::PersistableMessage>& /*msg*/,
+void MessageStoreImpl::async_dequeue(qpid::broker::TransactionContext* ctxt_,
+                                     const boost::intrusive_ptr<qpid::broker::PersistableMessage>& msg_,
                                      const qpid::broker::PersistableQueue& queue_)
 {
-    QLS_LOG(info,   "*** MessageStoreImpl::async_dequeue() queue=\"" << queue_.getName() << "\"");
-/*
+    //QLS_LOG(info,   "*** MessageStoreImpl::async_dequeue() queue=\"" << queue_.getName() << "\"");
     boost::intrusive_ptr<DataTokenImpl> ddtokp(new DataTokenImpl);
-    ddtokp->setSourceMessage(msg);
+    ddtokp->setSourceMessage(msg_);
     ddtokp->set_external_rid(true);
     ddtokp->set_rid(messageIdSequence.next());
-    ddtokp->set_dequeue_rid(msg->getPersistenceId());
+    ddtokp->set_dequeue_rid(msg_->getPersistenceId());
     ddtokp->set_wstate(DataTokenImpl::ENQ);
     std::string tid;
-    if (ctxt) {
-        TxnCtxt* txn = check(ctxt);
+    if (ctxt_) {
+        TxnCtxt* txn = check(ctxt_);
         tid = txn->getXid();
     }
     // Manually increase the ref count, as raw pointers are used beyond this point
     ddtokp->addRef();
     try {
-        JournalImpl* jc = static_cast<JournalImpl*>(queue.getExternalQueueStore());
+        JournalImpl* jc = static_cast<JournalImpl*>(queue_.getExternalQueueStore());
         if (tid.empty()) {
             jc->dequeue_data_record(ddtokp.get());
         } else {
@@ -1421,9 +1354,8 @@ void MessageStoreImpl::async_dequeue(qpi
         }
     } catch (const qpid::qls_jrnl::jexception& e) {
         ddtokp->release();
-        THROW_STORE_EXCEPTION(std::string("Queue ") + queue.getName() + ": async_dequeue() failed: " + e.what());
+        THROW_STORE_EXCEPTION(std::string("Queue ") + queue_.getName() + ": async_dequeue() failed: " + e.what());
     }
-*/
 }
 
 uint32_t MessageStoreImpl::outstandingQueueAIO(const qpid::broker::PersistableQueue& /*queue_*/)
@@ -1661,11 +1593,10 @@ std::string MessageStoreImpl::getTplBase
     return dir.str();
 }
 
-std::string MessageStoreImpl::getJrnlDir(const qpid::broker::PersistableQueue& queue_) //for exmaple /var/rhm/ + queueDir/
+std::string MessageStoreImpl::getJrnlDir(const std::string& queueName_)
 {
-    /*return getJrnlHashDir(queue_.getName().c_str());*/
     std::ostringstream oss;
-    oss << getJrnlBaseDir() << queue_.getName();
+    oss << getJrnlBaseDir() << queueName_;
     return oss.str();
 }
 
@@ -1679,8 +1610,8 @@ void MessageStoreImpl::journalDeleted(Jo
 MessageStoreImpl::StoreOptions::StoreOptions(const std::string& name_) :
                                              qpid::Options(name_),
                                              truncateFlag(defTruncateFlag),
-                                             wCachePageSizeKib(defWCachePageSize),
-                                             tplWCachePageSizeKib(defTplWCachePageSize),
+                                             wCachePageSizeKib(defWCachePageSizeKib),
+                                             tplWCachePageSizeKib(defTplWCachePageSizeKib),
                                              efpPartition(defEfpPartition),
                                              efpFileSizeKib(defEfpFileSizeKib)
 {

Modified: qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/MessageStoreImpl.h
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/MessageStoreImpl.h?rev=1530024&r1=1530023&r2=1530024&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/MessageStoreImpl.h (original)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/MessageStoreImpl.h Mon Oct  7 18:39:24 2013
@@ -101,10 +101,10 @@ class MessageStoreImpl : public qpid::br
 
     // Default store settings
     static const bool defTruncateFlag = false;
-    static const uint32_t defWCachePageSize = JRNL_WMGR_DEF_PAGE_SIZE * JRNL_SBLK_SIZE / 1024;
-    static const uint32_t defTplWCachePageSize = defWCachePageSize / 8;
+    static const uint32_t defWCachePageSizeKib = JRNL_WMGR_DEF_PAGE_SIZE_KIB;
+    static const uint32_t defTplWCachePageSizeKib = defWCachePageSizeKib / 8;
     static const uint16_t defEfpPartition = 1;
-    static const uint64_t defEfpFileSizeKib = 512 * JRNL_SBLK_SIZE / 1024;
+    static const uint64_t defEfpFileSizeKib = 512 * JRNL_SBLK_SIZE_KIB;
     static const std::string storeTopLevelDir;
 
     static qpid::sys::Duration defJournalGetEventsTimeout;
@@ -133,7 +133,7 @@ class MessageStoreImpl : public qpid::br
     IdSequence messageIdSequence;
     std::string storeDir;
     qpid::qls_jrnl::efpPartitionNumber_t defaultEfpPartitionNumber;
-    qpid::qls_jrnl::efpFileSizeKib_t defaultEfpFileSizeKib;
+    qpid::qls_jrnl::efpDataSize_kib_t defaultEfpFileSize_kib;
     bool      truncateFlag;
     uint32_t wCachePgSizeSblks;
     uint16_t wCacheNumPages;
@@ -156,7 +156,7 @@ class MessageStoreImpl : public qpid::br
     static uint16_t getJrnlWrNumPages(const uint32_t wrPageSizeKiB);
     static qpid::qls_jrnl::efpPartitionNumber_t chkEfpPartition(const qpid::qls_jrnl::efpPartitionNumber_t partition,
                                                                 const std::string& paramName);
-    static qpid::qls_jrnl::efpFileSizeKib_t chkEfpFileSizeKiB(const qpid::qls_jrnl::efpFileSizeKib_t efpFileSizeKiB,
+    static qpid::qls_jrnl::efpDataSize_kib_t chkEfpFileSizeKiB(const qpid::qls_jrnl::efpDataSize_kib_t efpFileSizeKiB,
                                                               const std::string& paramName);
 
     void init();
@@ -202,8 +202,7 @@ class MessageStoreImpl : public qpid::br
     uint64_t msgEncode(std::vector<char>& buff, const boost::intrusive_ptr<qpid::broker::PersistableMessage>& message);
     void store(const qpid::broker::PersistableQueue* queue,
                TxnCtxt* txn,
-               const boost::intrusive_ptr<qpid::broker::PersistableMessage>& message,
-               bool newId);
+               const boost::intrusive_ptr<qpid::broker::PersistableMessage>& message);
     void async_dequeue(qpid::broker::TransactionContext* ctxt,
                        const boost::intrusive_ptr<qpid::broker::PersistableMessage>& msg,
                        const qpid::broker::PersistableQueue& queue);
@@ -231,8 +230,8 @@ class MessageStoreImpl : public qpid::br
 
     // journal functions
     void createJrnlQueue(const qpid::broker::PersistableQueue& queue);
-    std::string getJrnlDir(const qpid::broker::PersistableQueue& queue); //for exmaple /var/rhm/ + queueDir/
-    qpid::qls_jrnl::EmptyFilePool* getEmptyFilePool(const qpid::qls_jrnl::efpPartitionNumber_t p, const qpid::qls_jrnl::efpFileSizeKib_t s);
+    std::string getJrnlDir(const std::string& queueName);
+    qpid::qls_jrnl::EmptyFilePool* getEmptyFilePool(const qpid::qls_jrnl::efpPartitionNumber_t p, const qpid::qls_jrnl::efpDataSize_kib_t s);
     qpid::qls_jrnl::EmptyFilePool* getEmptyFilePool(const qpid::framing::FieldTable& args);
     std::string getStoreTopLevelDir();
     std::string getJrnlBaseDir();
@@ -268,10 +267,10 @@ class MessageStoreImpl : public qpid::br
 
     bool init(const std::string& dir,
               qpid::qls_jrnl::efpPartitionNumber_t efpPartition = defEfpPartition,
-              qpid::qls_jrnl::efpFileSizeKib_t efpFileSizeKib = defEfpFileSizeKib,
+              qpid::qls_jrnl::efpDataSize_kib_t efpFileSizeKib = defEfpFileSizeKib,
               const bool truncateFlag = false,
-              uint32_t wCachePageSize = defWCachePageSize,
-              uint32_t tplWCachePageSize = defTplWCachePageSize);
+              uint32_t wCachePageSize = defWCachePageSizeKib,
+              uint32_t tplWCachePageSize = defTplWCachePageSizeKib);
 
     void truncateInit();
 
@@ -279,6 +278,8 @@ class MessageStoreImpl : public qpid::br
 
     void finalize();
 
+    // --- Implementation of qpid::broker::MessageStore ---
+
     void create(qpid::broker::PersistableQueue& queue,
                 const qpid::framing::FieldTable& args);
 
@@ -344,6 +345,8 @@ class MessageStoreImpl : public qpid::br
 
     void abort(qpid::broker::TransactionContext& ctxt);
 
+    // --- Implementation of qpid::management::Managable ---
+
     qpid::management::ManagementObject::shared_ptr GetManagementObject (void) const
         { return mgmtObject; }
 

Added: qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/AtomicCounter.h
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/AtomicCounter.h?rev=1530024&view=auto
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/AtomicCounter.h (added)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/AtomicCounter.h Mon Oct  7 18:39:24 2013
@@ -0,0 +1,137 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#ifndef QPID_LINEARSTORE_ATOMICCOUNTER_H_
+#define QPID_LINEARSTORE_ATOMICCOUNTER_H_
+
+#include "qpid/linearstore/jrnl/slock.h"
+
+namespace qpid {
+namespace qls_jrnl {
+
+template <class T>
+class AtomicCounter
+{
+private:
+    T count;
+    mutable smutex countMutex;
+
+public:
+    AtomicCounter(const T& initValue = T(0)) : count(initValue) {}
+
+    virtual ~AtomicCounter() {}
+
+    T get() const {
+        slock l(countMutex);
+        return count;
+    }
+
+    T increment() {
+        slock l(countMutex);
+        return ++count;
+    }
+
+    T add(const T& a) {
+        slock l(countMutex);
+        count += a;
+        return count;
+    }
+
+    T addLimit(const T& a, const T& limit, const uint32_t jerr) {
+        slock l(countMutex);
+        if (count + a > limit) throw jexception(jerr, "AtomicCounter", "addLimit");
+        count += a;
+        return count;
+    }
+
+    T decrement() {
+        slock l(countMutex);
+        return --count;
+    }
+
+    T decrementLimit(const T& limit = T(0), const uint32_t jerr = jerrno::JERR__UNDERFLOW) {
+        slock l(countMutex);
+        if (count < limit + 1) {
+            throw jexception(jerr, "AtomicCounter", "decrementLimit");
+        }
+        return --count;
+    }
+
+    T subtract(const T& s) {
+        slock l(countMutex);
+        count -= s;
+        return count;
+    }
+
+    T subtractLimit(const T& s, const T& limit = T(0), const uint32_t jerr = jerrno::JERR__UNDERFLOW) {
+        slock l(countMutex);
+        if (count < limit + s) throw jexception(jerr, "AtomicCounter", "subtractLimit");
+        count -= s;
+        return count;
+    }
+
+    bool operator==(const T& o) const {
+        slock l(countMutex);
+        return count == o;
+    }
+
+    bool operator<(const T& o) const {
+        slock l(countMutex);
+        return count < o;
+    }
+
+    bool operator<=(const T& o) const {
+        slock l(countMutex);
+        return count <= o;
+    }
+
+    friend T operator-(const T& a, const AtomicCounter& b) {
+        slock l(b.countMutex);
+        return a - b.count;
+    }
+
+    friend T operator-(const AtomicCounter& a, const T& b) {
+        slock l(a.countMutex);
+        return a.count - b;
+    }
+
+    friend T operator-(const AtomicCounter&a, const AtomicCounter& b) {
+        slock l1(a.countMutex);
+        slock l2(b.countMutex);
+        return a.count - b.count;
+    }
+
+/*
+    friend std::ostream& operator<<(std::ostream& out, const AtomicCounter& a) {
+        T temp; // Use temp so lock is not held while streaming to out.
+        {
+            slock l(a.countMutex);
+            temp = a.count;
+        }
+        out << temp;
+        return out;
+    }
+*/
+};
+
+}} // namespace qpid::qls_jrnl
+
+#endif // QPID_LINEARSTORE_ATOMICCOUNTER_H_

Modified: qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/EmptyFilePool.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/EmptyFilePool.cpp?rev=1530024&r1=1530023&r2=1530024&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/EmptyFilePool.cpp (original)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/EmptyFilePool.cpp Mon Oct  7 18:39:24 2013
@@ -40,7 +40,7 @@ namespace qls_jrnl {
 EmptyFilePool::EmptyFilePool(const std::string& efpDirectory_,
                              const EmptyFilePoolPartition* partitionPtr_) :
                 efpDirectory(efpDirectory_),
-                efpFileSizeKib(fileSizeKbFromDirName(efpDirectory_, partitionPtr_->partitionNumber())),
+                efpDataSize_kib(fileSizeKbFromDirName(efpDirectory_, partitionPtr_->partitionNumber())),
                 partitionPtr(partitionPtr_)
 {}
 
@@ -50,7 +50,7 @@ void
 EmptyFilePool::initialize() {
     //std::cout << "Reading " << efpDirectory << std::endl; // DEBUG
     std::vector<std::string> dirList;
-    jdir::read_dir(efpDirectory, dirList, false, true, false);
+    jdir::read_dir(efpDirectory, dirList, false, true, false, false);
     for (std::vector<std::string>::iterator i = dirList.begin(); i != dirList.end(); ++i) {
         size_t dotPos = i->rfind(".");
         if (dotPos != std::string::npos) {
@@ -65,9 +65,24 @@ EmptyFilePool::initialize() {
     //std::cout << "Found " << emptyFileList.size() << " files" << std::endl; // DEBUG
 }
 
-efpFileSizeKib_t
-EmptyFilePool::fileSizeKib() const {
-    return efpFileSizeKib;
+efpDataSize_kib_t
+EmptyFilePool::dataSize_kib() const {
+    return efpDataSize_kib;
+}
+
+efpFileSize_kib_t
+EmptyFilePool::fileSize_kib() const {
+    return efpDataSize_kib + (QLS_JRNL_FHDR_RES_SIZE_SBLKS * JRNL_SBLK_SIZE_KIB);
+}
+
+efpDataSize_sblks_t
+EmptyFilePool::dataSize_sblks() const {
+    return efpDataSize_kib / JRNL_SBLK_SIZE_KIB;
+}
+
+efpFileSize_sblks_t
+EmptyFilePool::fileSize_sblks() const {
+    return (efpDataSize_kib / JRNL_SBLK_SIZE_KIB) + QLS_JRNL_FHDR_RES_SIZE_SBLKS;
 }
 
 efpFileCount_t
@@ -76,10 +91,10 @@ EmptyFilePool::numEmptyFiles() const {
     return efpFileCount_t(emptyFileList.size());
 }
 
-efpFileSizeKib_t
-EmptyFilePool::cumFileSizeKib() const {
+efpDataSize_kib_t
+EmptyFilePool::cumFileSize_kib() const {
     slock l(emptyFileListMutex);
-    return efpFileSizeKib_t(emptyFileList.size()) * efpFileSizeKib;
+    return efpDataSize_kib_t(emptyFileList.size()) * efpDataSize_kib;
 }
 
 efpPartitionNumber_t
@@ -94,7 +109,7 @@ EmptyFilePool::getPartition() const {
 
 const efpIdentity_t
 EmptyFilePool::getIdentity() const {
-    return efpIdentity_t(partitionPtr->partitionNumber(), efpFileSizeKib);
+    return efpIdentity_t(partitionPtr->partitionNumber(), efpDataSize_kib);
 }
 
 std::string
@@ -112,9 +127,9 @@ EmptyFilePool::takeEmptyFile(const std::
 
 bool
 EmptyFilePool::returnEmptyFile(const JournalFile* srcFile) {
-    std::string emptyFileName(efpDirectory + srcFile->fileName());
+    std::string emptyFileName(efpDirectory + srcFile->getFileName());
     // TODO: reset file here
-    if (::rename(srcFile->fqFileName().c_str(), emptyFileName.c_str())) {
+    if (::rename(srcFile->getFqFileName().c_str(), emptyFileName.c_str())) {
         std::ostringstream oss;
         oss << "file=\"" << srcFile << "\" dest=\"" <<  emptyFileName << "\"" << FORMAT_SYSERR(errno);
         throw jexception(jerrno::JERR_JDIR_FMOVE, oss.str(), "EmptyFilePool", "returnEmptyFile");
@@ -152,14 +167,13 @@ EmptyFilePool::popEmptyFile() {
 
 void
 EmptyFilePool::createEmptyFile() {
-    file_hdr_t fh;
-    ::file_hdr_create(&fh, QLS_FILE_MAGIC, QLS_JRNL_VERSION, QLS_JRNL_FHDRSIZESBLKS, partitionPtr->partitionNumber(),
-                      efpFileSizeKib);
+    ::file_hdr_t fh;
+    ::file_hdr_create(&fh, QLS_FILE_MAGIC, QLS_JRNL_VERSION, QLS_JRNL_FHDR_RES_SIZE_SBLKS, partitionPtr->partitionNumber(), efpDataSize_kib);
     std::string efpfn = getEfpFileName();
     std::ofstream ofs(efpfn.c_str(), std::ofstream::out | std::ofstream::binary);
     if (ofs.good()) {
-        ofs.write((char*)&fh, sizeof(file_hdr_t));
-        uint64_t rem = ((efpFileSizeKib + (QLS_JRNL_FHDRSIZESBLKS * JRNL_SBLK_SIZE_KIB)) * 1024) - sizeof(file_hdr_t);
+        ofs.write((char*)&fh, sizeof(::file_hdr_t));
+        uint64_t rem = ((efpDataSize_kib + (QLS_JRNL_FHDR_RES_SIZE_SBLKS * JRNL_SBLK_SIZE_KIB)) * 1024) - sizeof(::file_hdr_t);
         while (rem--)
             ofs.put('\0');
         ofs.close();
@@ -180,8 +194,8 @@ EmptyFilePool::validateEmptyFile(const s
         oss << "stat: file=\"" << emptyFileName_ << "\"" << FORMAT_SYSERR(errno);
         throw jexception(jerrno::JERR_JDIR_STAT, oss.str(), "EmptyFilePool", "validateEmptyFile");
     }
-    efpFileSizeKib_t expectedSize = (JRNL_SBLK_SIZE_KIB + efpFileSizeKib) * 1024;
-    if ((efpFileSizeKib_t)s.st_size != expectedSize) {
+    efpDataSize_kib_t expectedSize = (JRNL_SBLK_SIZE_KIB + efpDataSize_kib) * 1024;
+    if ((efpDataSize_kib_t)s.st_size != expectedSize) {
         //std::cout << "ERROR: File " << emptyFileName << ": Incorrect size: Expected=" << expectedSize << "; actual=" << s.st_size << std::endl; // DEBUG
         return false;
     }
@@ -194,8 +208,8 @@ EmptyFilePool::validateEmptyFile(const s
 
     const uint8_t fhFileNameBuffLen = 50;
     char fhFileNameBuff[fhFileNameBuffLen];
-    file_hdr_t fh;
-    ifs.read((char*)&fh, sizeof(file_hdr_t));
+    ::file_hdr_t fh;
+    ifs.read((char*)&fh, sizeof(::file_hdr_t));
     uint16_t fhFileNameLen = fh._queue_name_len > fhFileNameBuffLen ? fhFileNameBuffLen : fh._queue_name_len;
     ifs.read(fhFileNameBuff, fhFileNameLen);
     std::string fhFileName(fhFileNameBuff, fhFileNameLen);
@@ -204,7 +218,7 @@ EmptyFilePool::validateEmptyFile(const s
     if (fh._rhdr._magic != QLS_FILE_MAGIC ||
         fh._rhdr._version != QLS_JRNL_VERSION ||
         fh._efp_partition != partitionPtr->partitionNumber() ||
-        fh._file_size_kib != efpFileSizeKib ||
+        fh._file_size_kib != efpDataSize_kib ||
         !::is_file_hdr_reset(&fh))
     {
         //std::cout << "ERROR: File " << emptyFileName << ": Invalid file header" << std::endl;
@@ -227,7 +241,7 @@ EmptyFilePool::getEfpFileName() {
 
 // protected
 // static
-efpFileSizeKib_t
+efpDataSize_kib_t
 EmptyFilePool::fileSizeKbFromDirName(const std::string& dirName_,
                                      const efpPartitionNumber_t partitionNumber_) {
     // Check for dirName format 'NNNk', where NNN is a number, convert NNN into an integer. NNN cannot be 0.
@@ -243,7 +257,7 @@ EmptyFilePool::fileSizeKbFromDirName(con
             valid = n[charNum] == 'k';
         }
     }
-    efpFileSizeKib_t s = ::atol(n.c_str());
+    efpDataSize_kib_t s = ::atol(n.c_str());
     if (!valid || s == 0 || s % JRNL_SBLK_SIZE_KIB != 0) {
         std::ostringstream oss;
         oss << "Partition: " << partitionNumber_ << "; EFP directory: \'" << n << "\'";

Modified: qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/EmptyFilePool.h
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/EmptyFilePool.h?rev=1530024&r1=1530023&r2=1530024&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/EmptyFilePool.h (original)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/EmptyFilePool.h Mon Oct  7 18:39:24 2013
@@ -47,7 +47,7 @@ protected:
     typedef emptyFileList_t::iterator emptyFileListItr_t;
 
     const std::string efpDirectory;
-    const efpFileSizeKib_t efpFileSizeKib;
+    const efpDataSize_kib_t efpDataSize_kib;
     const EmptyFilePoolPartition* partitionPtr;
 
 private:
@@ -60,9 +60,12 @@ public:
     virtual ~EmptyFilePool();
 
     void initialize();
-    efpFileSizeKib_t fileSizeKib() const;
+    efpDataSize_kib_t dataSize_kib() const;
+    efpFileSize_kib_t fileSize_kib() const;
+    efpDataSize_sblks_t dataSize_sblks() const;
+    efpFileSize_sblks_t fileSize_sblks() const;
     efpFileCount_t numEmptyFiles() const;
-    efpFileSizeKib_t cumFileSizeKib() const;
+    efpDataSize_kib_t cumFileSize_kib() const;
     efpPartitionNumber_t getPartitionNumber() const;
     const EmptyFilePoolPartition* getPartition() const;
     const efpIdentity_t getIdentity() const;
@@ -76,8 +79,8 @@ protected:
     void createEmptyFile();
     bool validateEmptyFile(const std::string& emptyFileName_) const;
     std::string getEfpFileName();
-    static efpFileSizeKib_t fileSizeKbFromDirName(const std::string& dirName_,
-                                                  const efpPartitionNumber_t partitionNumber_);
+    static efpDataSize_kib_t fileSizeKbFromDirName(const std::string& dirName_,
+                                                   const efpPartitionNumber_t partitionNumber_);
 };
 
 }} // namespace qpid::qls_jrnl

Modified: qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/EmptyFilePoolManager.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/EmptyFilePoolManager.cpp?rev=1530024&r1=1530023&r2=1530024&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/EmptyFilePoolManager.cpp (original)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/EmptyFilePoolManager.cpp Mon Oct  7 18:39:24 2013
@@ -49,7 +49,7 @@ void
 EmptyFilePoolManager::findEfpPartitions() {
     //std::cout << "*** Reading " << qlsStorePath << std::endl; // DEBUG
     std::vector<std::string> dirList;
-    jdir::read_dir(qlsStorePath, dirList, true, false, true);
+    jdir::read_dir(qlsStorePath, dirList, true, false, true, false);
     for (std::vector<std::string>::iterator i = dirList.begin(); i != dirList.end(); ++i) {
         if ((*i)[0] == 'p' && i->length() == 4) { // Filter: look only at names pNNN
             efpPartitionNumber_t pn = ::atoi(i->c_str() + 1);
@@ -90,15 +90,15 @@ EmptyFilePoolManager::getEfpPartition(co
 
 void
 EmptyFilePoolManager::getEfpPartitionNumbers(std::vector<efpPartitionNumber_t>& partitionNumberList,
-                                             const efpFileSizeKib_t efpFileSizeKb) const {
+                                             const efpDataSize_kib_t efpFileSizeKb) const {
     slock l(partitionMapMutex);
     for (partitionMapConstItr_t i=partitionMap.begin(); i!=partitionMap.end(); ++i) {
         if (efpFileSizeKb == 0) {
             partitionNumberList.push_back(i->first);
         } else {
-            std::vector<efpFileSizeKib_t> efpFileSizeList;
+            std::vector<efpDataSize_kib_t> efpFileSizeList;
             i->second->getEmptyFilePoolSizesKb(efpFileSizeList);
-            for (std::vector<efpFileSizeKib_t>::iterator j=efpFileSizeList.begin(); j!=efpFileSizeList.end(); ++j) {
+            for (std::vector<efpDataSize_kib_t>::iterator j=efpFileSizeList.begin(); j!=efpFileSizeList.end(); ++j) {
                 if (*j == efpFileSizeKb) {
                     partitionNumberList.push_back(i->first);
                     break;
@@ -110,15 +110,15 @@ EmptyFilePoolManager::getEfpPartitionNum
 
 void
 EmptyFilePoolManager::getEfpPartitions(std::vector<EmptyFilePoolPartition*>& partitionList,
-                                       const efpFileSizeKib_t efpFileSizeKb) {
+                                       const efpDataSize_kib_t efpFileSizeKb) {
     slock l(partitionMapMutex);
     for (partitionMapConstItr_t i=partitionMap.begin(); i!=partitionMap.end(); ++i) {
         if (efpFileSizeKb == 0) {
             partitionList.push_back(i->second);
         } else {
-            std::vector<efpFileSizeKib_t> efpFileSizeList;
+            std::vector<efpDataSize_kib_t> efpFileSizeList;
             i->second->getEmptyFilePoolSizesKb(efpFileSizeList);
-            for (std::vector<efpFileSizeKib_t>::iterator j=efpFileSizeList.begin(); j!=efpFileSizeList.end(); ++j) {
+            for (std::vector<efpDataSize_kib_t>::iterator j=efpFileSizeList.begin(); j!=efpFileSizeList.end(); ++j) {
                 if (*j == efpFileSizeKb) {
                     partitionList.push_back(i->second);
                     break;
@@ -129,7 +129,7 @@ EmptyFilePoolManager::getEfpPartitions(s
 }
 
 void
-EmptyFilePoolManager::getEfpFileSizes(std::vector<efpFileSizeKib_t>& efpFileSizeList,
+EmptyFilePoolManager::getEfpFileSizes(std::vector<efpDataSize_kib_t>& efpFileSizeList,
                                       const efpPartitionNumber_t efpPartitionNumber) const {
     if (efpPartitionNumber == 0) {
         for (partitionMapConstItr_t i=partitionMap.begin(); i!=partitionMap.end(); ++i) {
@@ -160,7 +160,7 @@ EmptyFilePoolManager::getEmptyFilePools(
 
 EmptyFilePool*
 EmptyFilePoolManager::getEmptyFilePool(const efpPartitionNumber_t partitionNumber,
-                                       const efpFileSizeKib_t efpFileSizeKib) {
+                                       const efpDataSize_kib_t efpFileSizeKib) {
     EmptyFilePoolPartition* efppp = getEfpPartition(partitionNumber);
     if (efppp != 0)
         return efppp->getEmptyFilePool(efpFileSizeKib);

Modified: qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/EmptyFilePoolManager.h
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/EmptyFilePoolManager.h?rev=1530024&r1=1530023&r2=1530024&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/EmptyFilePoolManager.h (original)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/EmptyFilePoolManager.h Mon Oct  7 18:39:24 2013
@@ -48,13 +48,13 @@ public:
 
     uint16_t getNumEfpPartitions() const;
     EmptyFilePoolPartition* getEfpPartition(const efpPartitionNumber_t partitionNumber);
-    void getEfpPartitionNumbers(std::vector<efpPartitionNumber_t>& partitionNumberList, const efpFileSizeKib_t efpFileSizeKb = 0) const;
-    void getEfpPartitions(std::vector<EmptyFilePoolPartition*>& partitionList, const efpFileSizeKib_t efpFileSizeKb = 0);
+    void getEfpPartitionNumbers(std::vector<efpPartitionNumber_t>& partitionNumberList, const efpDataSize_kib_t efpFileSizeKb = 0) const;
+    void getEfpPartitions(std::vector<EmptyFilePoolPartition*>& partitionList, const efpDataSize_kib_t efpFileSizeKb = 0);
 
-    void getEfpFileSizes(std::vector<efpFileSizeKib_t>& efpFileSizeList, const efpPartitionNumber_t efpPartitionNumber = 0) const;
+    void getEfpFileSizes(std::vector<efpDataSize_kib_t>& efpFileSizeList, const efpPartitionNumber_t efpPartitionNumber = 0) const;
     void getEmptyFilePools(std::vector<EmptyFilePool*>& emptyFilePoolList, const efpPartitionNumber_t efpPartitionNumber = 0);
 
-    EmptyFilePool* getEmptyFilePool(const efpPartitionNumber_t partitionNumber, const efpFileSizeKib_t efpFileSizeKb);
+    EmptyFilePool* getEmptyFilePool(const efpPartitionNumber_t partitionNumber, const efpDataSize_kib_t efpFileSizeKb);
     EmptyFilePool* getEmptyFilePool(const efpIdentity_t efpIdentity);
 };
 

Modified: qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/EmptyFilePoolPartition.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/EmptyFilePoolPartition.cpp?rev=1530024&r1=1530023&r2=1530024&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/EmptyFilePoolPartition.cpp (original)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/EmptyFilePoolPartition.cpp Mon Oct  7 18:39:24 2013
@@ -63,7 +63,7 @@ void
 EmptyFilePoolPartition::findEmptyFilePools() {
     //std::cout << "Reading " << partitionDir << std::endl; // DEBUG
     std::vector<std::string> dirList;
-    jdir::read_dir(partitionDir, dirList, true, false, false);
+    jdir::read_dir(partitionDir, dirList, true, false, false, false);
     bool foundEfpDir = false;
     for (std::vector<std::string>::iterator i = dirList.begin(); i != dirList.end(); ++i) {
         if (i->compare(efpTopLevelDir) == 0) {
@@ -75,15 +75,14 @@ EmptyFilePoolPartition::findEmptyFilePoo
         std::string efpDir(partitionDir + "/" + efpTopLevelDir);
         //std::cout << "Reading " << efpDir << std::endl; // DEBUG
         dirList.clear();
-        jdir::read_dir(efpDir, dirList, true, false, false);
+        jdir::read_dir(efpDir, dirList, true, false, false, true);
         for (std::vector<std::string>::iterator i = dirList.begin(); i != dirList.end(); ++i) {
-            std::string efpSizeDir(efpDir + "/" + (*i));
             EmptyFilePool* efpp = 0;
             try {
-                efpp = new EmptyFilePool(efpSizeDir, this);
+                efpp = new EmptyFilePool(*i, this);
                 {
                     slock l(efpMapMutex);
-                    efpMap[efpp->fileSizeKib()] = efpp;
+                    efpMap[efpp->dataSize_kib()] = efpp;
                 }
             }
             catch (const std::exception& e) {
@@ -110,7 +109,7 @@ EmptyFilePoolPartition::partitionDirecto
 }
 
 EmptyFilePool*
-EmptyFilePoolPartition::getEmptyFilePool(const efpFileSizeKib_t efpFileSizeKb) {
+EmptyFilePoolPartition::getEmptyFilePool(const efpDataSize_kib_t efpFileSizeKb) {
     efpMapItr_t i = efpMap.find(efpFileSizeKb);
     if (i == efpMap.end())
         return 0;
@@ -118,7 +117,7 @@ EmptyFilePoolPartition::getEmptyFilePool
 }
 
 void
-EmptyFilePoolPartition::getEmptyFilePoolSizesKb(std::vector<efpFileSizeKib_t>& efpFileSizesKbList) const {
+EmptyFilePoolPartition::getEmptyFilePoolSizesKb(std::vector<efpDataSize_kib_t>& efpFileSizesKbList) const {
     for (efpMapConstItr_t i=efpMap.begin(); i!=efpMap.end(); ++i) {
         efpFileSizesKbList.push_back(i->first);
     }

Modified: qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/EmptyFilePoolPartition.h
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/EmptyFilePoolPartition.h?rev=1530024&r1=1530023&r2=1530024&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/EmptyFilePoolPartition.h (original)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/EmptyFilePoolPartition.h Mon Oct  7 18:39:24 2013
@@ -44,7 +44,7 @@ class EmptyFilePoolPartition
 public:
     static const std::string efpTopLevelDir;
 protected:
-    typedef std::map<efpFileSizeKib_t, EmptyFilePool*> efpMap_t;
+    typedef std::map<efpDataSize_kib_t, EmptyFilePool*> efpMap_t;
     typedef efpMap_t::iterator efpMapItr_t;
     typedef efpMap_t::const_iterator efpMapConstItr_t;
 
@@ -62,8 +62,8 @@ public:
     efpPartitionNumber_t partitionNumber() const;
     std::string partitionDirectory() const;
 
-    EmptyFilePool* getEmptyFilePool(const efpFileSizeKib_t efpFileSizeKb);
-    void getEmptyFilePoolSizesKb(std::vector<efpFileSizeKib_t>& efpFileSizesKbList) const;
+    EmptyFilePool* getEmptyFilePool(const efpDataSize_kib_t efpFileSizeKb);
+    void getEmptyFilePoolSizesKb(std::vector<efpDataSize_kib_t>& efpFileSizesKbList) const;
     void getEmptyFilePools(std::vector<EmptyFilePool*>& efpList);
 };
 

Modified: qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/EmptyFilePoolTypes.h
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/EmptyFilePoolTypes.h?rev=1530024&r1=1530023&r2=1530024&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/EmptyFilePoolTypes.h (original)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/EmptyFilePoolTypes.h Mon Oct  7 18:39:24 2013
@@ -23,14 +23,18 @@
 #define QPID_QLS_JRNL_EMPTYFILEPOOLTYPES_H_
 
 #include <stdint.h>
+#include <utility> // std::pair
 
 namespace qpid {
 namespace qls_jrnl {
 
-    typedef uint64_t efpFileSizeKib_t;
+    typedef uint64_t efpDataSize_kib_t;   // Size of data part of file (excluding file header) in kib
+    typedef uint64_t efpFileSize_kib_t;   // Size of file (header + data) in kib
+    typedef uint32_t efpDataSize_sblks_t; // Size of data part of file (excluding file header) in sblks
+    typedef uint32_t efpFileSize_sblks_t; // Size of file (header + data) in sblks
     typedef uint32_t efpFileCount_t;
     typedef uint16_t efpPartitionNumber_t;
-    typedef std::pair<efpPartitionNumber_t, efpFileSizeKib_t> efpIdentity_t;
+    typedef std::pair<efpPartitionNumber_t, efpDataSize_kib_t> efpIdentity_t;
 
 }} // namespace qpid::qls_jrnl
 



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org