You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kp...@apache.org on 2013/10/21 23:26:11 UTC

svn commit: r1534383 [1/4] - in /qpid/branches/linearstore/qpid/cpp/src: ./ qpid/broker/ qpid/legacystore/jrnl/ qpid/linearstore/ qpid/linearstore/jrnl/ qpid/linearstore/jrnl/utils/

Author: kpvdr
Date: Mon Oct 21 21:26:10 2013
New Revision: 1534383

URL: http://svn.apache.org/r1534383
Log:
QPID-4984: WIP - Compiles, but functionally incomplete. Transactions not yet functional.

Added:
    qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/JournalLogImpl.cpp
    qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/JournalLogImpl.h
      - copied, changed from r1527754, qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/QpidLog.h
Removed:
    qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/EmptyFilePoolManagerImpl.cpp
    qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/EmptyFilePoolManagerImpl.h
    qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/QpidLog.h
    qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/rcvdat.h
    qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/rmgr.cpp
    qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/rmgr.h
Modified:
    qpid/branches/linearstore/qpid/cpp/src/linearstore.cmake
    qpid/branches/linearstore/qpid/cpp/src/qpid/broker/Broker.cpp
    qpid/branches/linearstore/qpid/cpp/src/qpid/broker/Queue.cpp
    qpid/branches/linearstore/qpid/cpp/src/qpid/legacystore/jrnl/pmgr.cpp
    qpid/branches/linearstore/qpid/cpp/src/qpid/legacystore/jrnl/rmgr.cpp
    qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/JournalImpl.cpp
    qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/JournalImpl.h
    qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/MessageStoreImpl.cpp
    qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/MessageStoreImpl.h
    qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/StorePlugin.cpp
    qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/AtomicCounter.h
    qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/EmptyFilePool.cpp
    qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/EmptyFilePool.h
    qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/EmptyFilePoolManager.cpp
    qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/EmptyFilePoolManager.h
    qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/EmptyFilePoolPartition.cpp
    qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/EmptyFilePoolPartition.h
    qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/EmptyFilePoolTypes.h
    qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/JournalFile.cpp
    qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/JournalFile.h
    qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/JournalLog.cpp
    qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/JournalLog.h
    qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/LinearFileController.cpp
    qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/LinearFileController.h
    qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/RecoveryManager.cpp
    qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/RecoveryManager.h
    qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/deq_rec.cpp
    qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/enq_map.cpp
    qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/enq_map.h
    qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/enq_rec.cpp
    qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/enums.h
    qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/jcfg.h
    qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/jcntl.cpp
    qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/jcntl.h
    qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/jerrno.cpp
    qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/jerrno.h
    qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/jrec.h
    qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/pmgr.cpp
    qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/pmgr.h
    qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/txn_rec.cpp
    qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/utils/file_hdr.c
    qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/wmgr.cpp
    qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/wmgr.h

Modified: qpid/branches/linearstore/qpid/cpp/src/linearstore.cmake
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/linearstore.cmake?rev=1534383&r1=1534382&r2=1534383&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/linearstore.cmake (original)
+++ qpid/branches/linearstore/qpid/cpp/src/linearstore.cmake Mon Oct 21 21:26:10 2013
@@ -78,11 +78,11 @@ if (BUILD_LINEARSTORE)
     set (linear_jrnl_SOURCES
         qpid/linearstore/jrnl/data_tok.cpp
         qpid/linearstore/jrnl/deq_rec.cpp
-        qpid/linearstore/jrnl/enq_map.cpp
-        qpid/linearstore/jrnl/enq_rec.cpp
         qpid/linearstore/jrnl/EmptyFilePool.cpp
         qpid/linearstore/jrnl/EmptyFilePoolManager.cpp
         qpid/linearstore/jrnl/EmptyFilePoolPartition.cpp
+        qpid/linearstore/jrnl/enq_map.cpp
+        qpid/linearstore/jrnl/enq_rec.cpp
         qpid/linearstore/jrnl/jcntl.cpp
         qpid/linearstore/jrnl/jdir.cpp
         qpid/linearstore/jrnl/jerrno.cpp
@@ -105,12 +105,12 @@ if (BUILD_LINEARSTORE)
         qpid/linearstore/BindingDbt.cpp
         qpid/linearstore/BufferValue.cpp
         qpid/linearstore/DataTokenImpl.cpp
-        qpid/linearstore/EmptyFilePoolManagerImpl.cpp
         qpid/linearstore/IdDbt.cpp
         qpid/linearstore/IdSequence.cpp
         qpid/linearstore/JournalImpl.cpp
         qpid/linearstore/MessageStoreImpl.cpp
         qpid/linearstore/PreparedTransaction.cpp
+        qpid/linearstore/JournalLogImpl.cpp
         qpid/linearstore/TxnCtxt.cpp
     )
 

Modified: qpid/branches/linearstore/qpid/cpp/src/qpid/broker/Broker.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/broker/Broker.cpp?rev=1534383&r1=1534382&r2=1534383&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/broker/Broker.cpp (original)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/broker/Broker.cpp Mon Oct 21 21:26:10 2013
@@ -998,7 +998,7 @@ Manageable::status_t Broker::queryQueue(
         return Manageable::STATUS_UNKNOWN_OBJECT;
     }
     q->query( results );
-    return Manageable::STATUS_OK;;
+    return Manageable::STATUS_OK;
 }
 
 Manageable::status_t Broker::getTimestampConfig(bool& receive,

Modified: qpid/branches/linearstore/qpid/cpp/src/qpid/broker/Queue.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/broker/Queue.cpp?rev=1534383&r1=1534382&r2=1534383&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/broker/Queue.cpp (original)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/broker/Queue.cpp Mon Oct 21 21:26:10 2013
@@ -835,7 +835,7 @@ bool Queue::checkAutoDelete(const Mutex:
 
 bool Queue::isUnused(const Mutex::ScopedLock&) const
 {
-    return !owner && !users.isUsed();;
+    return !owner && !users.isUsed();
 }
 
 bool Queue::isEmpty(const Mutex::ScopedLock&) const

Modified: qpid/branches/linearstore/qpid/cpp/src/qpid/legacystore/jrnl/pmgr.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/legacystore/jrnl/pmgr.cpp?rev=1534383&r1=1534382&r2=1534383&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/legacystore/jrnl/pmgr.cpp (original)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/legacystore/jrnl/pmgr.cpp Mon Oct 21 21:26:10 2013
@@ -49,6 +49,7 @@ namespace journal
 pmgr::page_cb::page_cb(u_int16_t index):
         _index(index),
         _state(UNUSED),
+        _frid(0),
         _wdblks(0),
         _rdblks(0),
         _pdtokl(0),

Modified: qpid/branches/linearstore/qpid/cpp/src/qpid/legacystore/jrnl/rmgr.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/legacystore/jrnl/rmgr.cpp?rev=1534383&r1=1534382&r2=1534383&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/legacystore/jrnl/rmgr.cpp (original)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/legacystore/jrnl/rmgr.cpp Mon Oct 21 21:26:10 2013
@@ -227,6 +227,7 @@ rmgr::read(void** const datapp, std::siz
                 return RHM_IORES_EMPTY;
         }
     }
+    return RHM_IORES_SUCCESS;
 }
 
 int32_t
@@ -529,6 +530,7 @@ rmgr::skip(data_tok* dtokp)
             return RHM_IORES_SUCCESS;
         }
     }
+    return RHM_IORES_SUCCESS;
 }
 
 iores

Modified: qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/JournalImpl.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/JournalImpl.cpp?rev=1534383&r1=1534382&r2=1534383&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/JournalImpl.cpp (original)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/JournalImpl.cpp Mon Oct 21 21:26:10 2013
@@ -21,20 +21,20 @@
 
 #include "qpid/linearstore/JournalImpl.h"
 
+#include "qpid/linearstore/JournalLogImpl.h"
 #include "qpid/linearstore/jrnl/jerrno.h"
 #include "qpid/linearstore/jrnl/jexception.h"
 #include "qpid/linearstore/jrnl/EmptyFilePool.h"
+#include "qpid/linearstore/StoreException.h"
 #include "qpid/log/Statement.h"
 #include "qpid/management/ManagementAgent.h"
-//#include "qmf/org/apache/qpid/linearstore/ArgsJournalExpand.h"
+#include "qpid/sys/Monitor.h"
+#include "qpid/sys/Timer.h"
+
 #include "qmf/org/apache/qpid/linearstore/EventCreated.h"
 #include "qmf/org/apache/qpid/linearstore/EventEnqThresholdExceeded.h"
 #include "qmf/org/apache/qpid/linearstore/EventFull.h"
 #include "qmf/org/apache/qpid/linearstore/EventRecovered.h"
-#include "qpid/sys/Monitor.h"
-#include "qpid/sys/Timer.h"
-#include "qpid/linearstore/QpidLog.h"
-#include "qpid/linearstore/StoreException.h"
 
 using namespace qpid::qls_jrnl;
 using namespace qpid::linearstore;
@@ -54,13 +54,14 @@ void GetEventsFireEvent::fire() { qpid::
 JournalImpl::JournalImpl(qpid::sys::Timer& timer_,
                          const std::string& journalId,
                          const std::string& journalDirectory,
-//                         const std::string& journalBaseFilename,
+                         JournalLogImpl& journalLogRef,
                          const qpid::sys::Duration getEventsTimeout,
                          const qpid::sys::Duration flushTimeout,
                          qpid::management::ManagementAgent* a,
                          DeleteCallback onDelete):
-                         jcntl(journalId, journalDirectory/*, journalBaseFilename*/),
+                         jcntl(journalId, journalDirectory, journalLogRef),
                          timer(timer_),
+                         _journalLogRef(journalLogRef),
                          getEventsTimerSetFlag(false),
 //                         lastReadRid(0),
                          writeActivityFlag(false),
@@ -163,7 +164,7 @@ JournalImpl::initialize(qpid::qls_jrnl::
         _mgmtObject->set_writePages(wcache_num_pages);
     }
     if (_agent != 0)
-        _agent->raiseEvent(qmf::org::apache::qpid::linearstore::EventCreated(_jid, _jfsize_sblks * JRNL_SBLK_SIZE, _lpmgr.num_jfiles()),
+        _agent->raiseEvent::(qmf::org::apache::qpid::linearstore::EventCreated(_jid, _jfsize_sblks * JRNL_SBLK_SIZE, _lpmgr.num_jfiles()),
                            qpid::management::ManagementAgent::SEV_NOTE);
 */
 }
@@ -558,11 +559,11 @@ JournalImpl::wr_aio_cb(std::vector<data_
 		    switch (dtokp->wstate())
 		    {
  			    case data_tok::ENQ:
- 			        std::cout << "<<<>>> JournalImpl::wr_aio_cb() ENQ dtokp rid=" << dtokp->rid() << std::endl << std::flush; // DEBUG
+//std::cout << "<<<>>> JournalImpl::wr_aio_cb() ENQ dtokp rid=0x" << std::hex << dtokp->rid() << std::dec << std::endl << std::flush; // DEBUG
              	    dtokp->getSourceMessage()->enqueueComplete();
  				    break;
 			    case data_tok::DEQ:
-			        std::cout << "<<<>>> JournalImpl::wr_aio_cb() DEQ dtokp rid=" << dtokp->rid() << std::endl << std::flush; // DEBUG
+//std::cout << "<<<>>> JournalImpl::wr_aio_cb() DEQ dtokp rid=0x" << std::hex << dtokp->rid() << std::dec << std::endl << std::flush; // DEBUG
 /* Don't need to signal until we have a way to ack completion of dequeue in AMQP
                     dtokp->getSourceMessage()->dequeueComplete();
                     if ( dtokp->getSourceMessage()->isDequeueComplete()  ) // clear id after last dequeue

Modified: qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/JournalImpl.h
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/JournalImpl.h?rev=1534383&r1=1534382&r2=1534383&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/JournalImpl.h (original)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/JournalImpl.h Mon Oct 21 21:26:10 2013
@@ -48,6 +48,7 @@ class EmptyFilePool;
 namespace linearstore{
 
 class JournalImpl;
+class JournalLogImpl;
 
 class InactivityFireEvent : public qpid::sys::TimerTask
 {
@@ -78,8 +79,9 @@ class JournalImpl : public qpid::broker:
   public:
     typedef boost::function<void (JournalImpl&)> DeleteCallback;
 
-  private:
+  protected:
     qpid::sys::Timer& timer;
+    JournalLogImpl& _journalLogRef;
     bool getEventsTimerSetFlag;
     boost::intrusive_ptr<qpid::sys::TimerTask> getEventsFireEventsPtr;
     qpid::sys::Mutex _getf_lock;
@@ -98,6 +100,7 @@ class JournalImpl : public qpid::broker:
     JournalImpl(qpid::sys::Timer& timer,
                 const std::string& journalId,
                 const std::string& journalDirectory,
+                JournalLogImpl& journalLogRef,
                 const qpid::sys::Duration getEventsTimeout,
                 const qpid::sys::Duration flushTimeout,
                 qpid::management::ManagementAgent* agent,
@@ -187,7 +190,7 @@ class JournalImpl : public qpid::broker:
 
     void resetDeleteCallback() { deleteCallback = DeleteCallback(); }
 
-  private:
+  protected:
 //    void free_read_buffers();
     void createStore();
 
@@ -215,10 +218,11 @@ class TplJournalImpl : public JournalImp
     TplJournalImpl(qpid::sys::Timer& timer,
                    const std::string& journalId,
                    const std::string& journalDirectory,
+                   JournalLogImpl& journalLogRef,
                    const qpid::sys::Duration getEventsTimeout,
                    const qpid::sys::Duration flushTimeout,
                    qpid::management::ManagementAgent* agent) :
-        JournalImpl(timer, journalId, journalDirectory, getEventsTimeout, flushTimeout, agent)
+        JournalImpl(timer, journalId, journalDirectory, journalLogRef, getEventsTimeout, flushTimeout, agent)
     {}
 
     virtual ~TplJournalImpl() {}

Added: qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/JournalLogImpl.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/JournalLogImpl.cpp?rev=1534383&view=auto
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/JournalLogImpl.cpp (added)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/JournalLogImpl.cpp Mon Oct 21 21:26:10 2013
@@ -0,0 +1,59 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "qpid/linearstore/JournalLogImpl.h"
+
+namespace qpid {
+namespace linearstore {
+
+JournalLogImpl::JournalLogImpl(const qpid::qls_jrnl::JournalLog::log_level_t logLevelThreshold) : qpid::qls_jrnl::JournalLog(logLevelThreshold) {}
+JournalLogImpl::~JournalLogImpl() {}
+
+void
+JournalLogImpl::log(const qpid::qls_jrnl::JournalLog::log_level_t level,
+                    const std::string& log_stmt) const {
+    switch (level) {
+      case LOG_CRITICAL: QPID_LOG(critical, "Linear Store: " << log_stmt); break;
+      case LOG_ERROR: QPID_LOG(error, "Linear Store: " << log_stmt); break;
+      case LOG_WARN: QPID_LOG(warning, "Linear Store: " << log_stmt); break;
+      case LOG_NOTICE: QPID_LOG(notice, "Linear Store: " << log_stmt); break;
+      case LOG_INFO: QPID_LOG(info, "Linear Store: " << log_stmt); break;
+      case LOG_DEBUG: QPID_LOG(debug, "Linear Store: " << log_stmt); break;
+      default: QPID_LOG(trace, "Linear Store: " << log_stmt);
+    }
+}
+
+void
+JournalLogImpl::log(const qpid::qls_jrnl::JournalLog::log_level_t level,
+                    const std::string& jid,
+                    const std::string& log_stmt) const {
+    switch (level) {
+      case LOG_CRITICAL: QPID_LOG(critical, "Linear Store: Journal \'" << jid << "\":" << log_stmt); break;
+      case LOG_ERROR: QPID_LOG(error, "Linear Store: Journal \'" << jid << "\":" << log_stmt); break;
+      case LOG_WARN: QPID_LOG(warning, "Linear Store: Journal \'" << jid << "\":" << log_stmt); break;
+      case LOG_NOTICE: QPID_LOG(notice, "Linear Store: Journal \'" << jid << "\":" << log_stmt); break;
+      case LOG_INFO: QPID_LOG(info, "Linear Store: Journal \'" << jid << "\":" << log_stmt); break;
+      case LOG_DEBUG: QPID_LOG(debug, "Linear Store: Journal \'" << jid << "\":" << log_stmt); break;
+      default: QPID_LOG(trace, "Linear Store: Journal \'" << jid << "\":" << log_stmt);
+    }
+}
+
+}} // namespace qpid::linearstore

Copied: qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/JournalLogImpl.h (from r1527754, qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/QpidLog.h)
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/JournalLogImpl.h?p2=qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/JournalLogImpl.h&p1=qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/QpidLog.h&r1=1527754&r2=1534383&rev=1534383&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/QpidLog.h (original)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/JournalLogImpl.h Mon Oct 21 21:26:10 2013
@@ -22,9 +22,27 @@
 #ifndef QPID_LEGACYSTORE_LOG_H
 #define QPID_LEGACYSTORE_LOG_H
 
+#include "qpid/linearstore/jrnl/JournalLog.h"
 #include "qpid/log/Statement.h"
 
 #define QLS_LOG(level, msg) QPID_LOG(level, "Linear Store: " << msg)
 #define QLS_LOG2(level, queue, msg) QPID_LOG(level, "Linear Store: Journal \'" << queue << "\":" << msg)
 
+namespace qpid {
+namespace linearstore {
+
+class JournalLogImpl : public qpid::qls_jrnl::JournalLog
+{
+public:
+    JournalLogImpl(const qpid::qls_jrnl::JournalLog::log_level_t logLevelThreshold);
+    virtual ~JournalLogImpl();
+    virtual void log(const qpid::qls_jrnl::JournalLog::log_level_t logLevel,
+                     const std::string& logStatement) const;
+    virtual void log(const qpid::qls_jrnl::JournalLog::log_level_t logLevel,
+                     const std::string& journalId,
+                     const std::string& logStatement) const;
+};
+
+}} // namespace qpid::linearstore
+
 #endif // QPID_LEGACYSTORE_LOG_H

Modified: qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/MessageStoreImpl.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/MessageStoreImpl.cpp?rev=1534383&r1=1534382&r2=1534383&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/MessageStoreImpl.cpp (original)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/MessageStoreImpl.cpp Mon Oct 21 21:26:10 2013
@@ -28,7 +28,6 @@
 #include "qpid/linearstore/IdDbt.h"
 #include "qpid/linearstore/jrnl/EmptyFilePoolManager.h"
 #include "qpid/linearstore/jrnl/txn_map.h"
-#include "qpid/linearstore/QpidLog.h"
 #include "qpid/framing/FieldValue.h"
 #include "qmf/org/apache/qpid/linearstore/Package.h"
 #include "qpid/linearstore/StoreException.h"
@@ -73,6 +72,7 @@ MessageStoreImpl::MessageStoreImpl(qpid:
                                    isInit(false),
                                    envPath(envpath_),
                                    broker(broker_),
+                                   jrnlLog(qpid::qls_jrnl::JournalLog::LOG_NOTICE),
                                    mgmtObject(),
                                    agent(0)
 {}
@@ -83,7 +83,7 @@ uint32_t MessageStoreImpl::chkJrnlWrPage
 
     if (p == 0) {
         // For zero value, use default
-        p = JRNL_WMGR_DEF_PAGE_SIZE_KIB;
+        p = QLS_WMGR_DEF_PAGE_SIZE_KIB;
         QLS_LOG(warning, "parameter " << paramName_ << " (" << param_ << ") must be a power of 2 between 1 and 128; changing this parameter to default value (" << p << ")");
     } else if ( p > 128 || (p & (p-1)) ) {
         // For any positive value that is not a power of 2, use closest value
@@ -100,8 +100,8 @@ uint32_t MessageStoreImpl::chkJrnlWrPage
 
 uint16_t MessageStoreImpl::getJrnlWrNumPages(const uint32_t wrPageSizeKib_)
 {
-    uint32_t wrPageSizeSblks = wrPageSizeKib_ / JRNL_SBLK_SIZE_KIB; // convert from KiB to number sblks
-    uint32_t defTotWCacheSizeSblks = JRNL_WMGR_DEF_PAGE_SIZE_SBLKS * JRNL_WMGR_DEF_PAGES;
+    uint32_t wrPageSizeSblks = wrPageSizeKib_ / QLS_SBLK_SIZE_KIB; // convert from KiB to number sblks
+    uint32_t defTotWCacheSizeSblks = QLS_WMGR_DEF_PAGE_SIZE_SBLKS * QLS_WMGR_DEF_PAGES;
     switch (wrPageSizeKib_)
     {
       case 1:
@@ -127,13 +127,13 @@ qpid::qls_jrnl::efpPartitionNumber_t Mes
 
 qpid::qls_jrnl::efpDataSize_kib_t MessageStoreImpl::chkEfpFileSizeKiB(const qpid::qls_jrnl::efpDataSize_kib_t efpFileSizeKib_,
                                                                      const std::string& paramName_) {
-    uint8_t rem =  efpFileSizeKib_ % uint64_t(JRNL_SBLK_SIZE_KIB);
+    uint8_t rem =  efpFileSizeKib_ % uint64_t(QLS_SBLK_SIZE_KIB);
     if (rem != 0) {
         uint64_t newVal = efpFileSizeKib_ - rem;
-        if (rem >= (JRNL_SBLK_SIZE_KIB / 2))
-            newVal += JRNL_SBLK_SIZE_KIB;
+        if (rem >= (QLS_SBLK_SIZE_KIB / 2))
+            newVal += QLS_SBLK_SIZE_KIB;
         QLS_LOG(warning, "Parameter " << paramName_ << " (" << efpFileSizeKib_ << ") must be a multiple of " <<
-                JRNL_SBLK_SIZE_KIB << "; changing this parameter to the closest allowable value (" <<
+                QLS_SBLK_SIZE_KIB << "; changing this parameter to the closest allowable value (" <<
                 newVal << ")");
         return newVal;
     }
@@ -154,7 +154,7 @@ void MessageStoreImpl::initManagement ()
             mgmtObject->set_location(storeDir);
             mgmtObject->set_tplIsInitialized(false);
             mgmtObject->set_tplDirectory(getTplBaseDir());
-            mgmtObject->set_tplWritePageSize(tplWCachePgSizeSblks * JRNL_SBLK_SIZE_BYTES);
+            mgmtObject->set_tplWritePageSize(tplWCachePgSizeSblks * QLS_SBLK_SIZE_BYTES);
             mgmtObject->set_tplWritePages(tplWCacheNumPages);
 
             agent->addObject(mgmtObject, 0, true);
@@ -193,9 +193,9 @@ bool MessageStoreImpl::init(const std::s
     // Set geometry members (converting to correct units where req'd)
     defaultEfpPartitionNumber = efpPartition_;
     defaultEfpFileSize_kib = efpFileSize_kib_;
-    wCachePgSizeSblks = wCachePageSizeKib_ / JRNL_SBLK_SIZE_KIB; // convert from KiB to number sblks
+    wCachePgSizeSblks = wCachePageSizeKib_ / QLS_SBLK_SIZE_KIB; // convert from KiB to number sblks
     wCacheNumPages = getJrnlWrNumPages(wCachePageSizeKib_);
-    tplWCachePgSizeSblks = tplWCachePageSizeKib_ / JRNL_SBLK_SIZE_KIB; // convert from KiB to number sblks
+    tplWCachePgSizeSblks = tplWCachePageSizeKib_ / QLS_SBLK_SIZE_KIB; // convert from KiB to number sblks
     tplWCacheNumPages = getJrnlWrNumPages(tplWCachePageSizeKib_);
     if (storeDir_.size()>0) storeDir = storeDir_;
 
@@ -267,7 +267,7 @@ void MessageStoreImpl::init()
             // NOTE: during normal initialization, agent == 0 because the store is initialized before the management infrastructure.
             // However during a truncated initialization in a cluster, agent != 0. We always pass 0 as the agent for the
             // TplStore to keep things consistent in a cluster. See https://bugzilla.redhat.com/show_bug.cgi?id=681026
-            tplStorePtr.reset(new TplJournalImpl(broker->getTimer(), "TplStore", getTplBaseDir(), defJournalGetEventsTimeout, defJournalFlushTimeout, 0));
+            tplStorePtr.reset(new TplJournalImpl(broker->getTimer(), "TplStore", getTplBaseDir(), jrnlLog, defJournalGetEventsTimeout, defJournalFlushTimeout, 0));
             isInit = true;
         } catch (const DbException& e) {
             if (e.get_errno() == DB_VERSION_MISMATCH)
@@ -291,7 +291,7 @@ void MessageStoreImpl::init()
         }
     } while (!isInit);
 
-    efpMgr.reset(new EmptyFilePoolManagerImpl(getStoreTopLevelDir()));
+    efpMgr.reset(new qpid::qls_jrnl::EmptyFilePoolManager(getStoreTopLevelDir(), jrnlLog));
     efpMgr->findEfpPartitions();
 }
 
@@ -403,7 +403,7 @@ void MessageStoreImpl::create(qpid::brok
         return;
     }
 
-    jQueue = new JournalImpl(broker->getTimer(), queue_.getName(), getJrnlDir(queue_.getName()),
+    jQueue = new JournalImpl(broker->getTimer(), queue_.getName(), getJrnlDir(queue_.getName()), jrnlLog,
                              defJournalGetEventsTimeout, defJournalFlushTimeout, agent,
                              boost::bind(&MessageStoreImpl::journalDeleted, this, _1));
     {
@@ -711,8 +711,8 @@ void MessageStoreImpl::recoverQueues(Txn
             QLS_LOG(error, "Cannot recover empty (null) queue name - ignoring and attempting to continue.");
             break;
         }
-        jQueue = new JournalImpl(broker->getTimer(), queueName, getJrnlDir(queueName), defJournalGetEventsTimeout,
-                                 defJournalFlushTimeout, agent,
+        jQueue = new JournalImpl(broker->getTimer(), queueName, getJrnlDir(queueName),jrnlLog,
+                                 defJournalGetEventsTimeout, defJournalFlushTimeout, agent,
                                  boost::bind(&MessageStoreImpl::journalDeleted, this, _1));
         {
             qpid::sys::Mutex::ScopedLock sl(journalListLock);
@@ -1207,7 +1207,7 @@ void MessageStoreImpl::loadContent(const
 
 void MessageStoreImpl::flush(const qpid::broker::PersistableQueue& queue_)
 {
-    QLS_LOG(info,   "*** MessageStoreImpl::flush() queue=\"" << queue_.getName() << "\"");
+//    QLS_LOG(info,   "*** MessageStoreImpl::flush() queue=\"" << queue_.getName() << "\"");
     if (queue_.getExternalQueueStore() == 0) return;
     checkInit();
     std::string qn = queue_.getName();

Modified: qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/MessageStoreImpl.h
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/MessageStoreImpl.h?rev=1534383&r1=1534382&r2=1534383&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/MessageStoreImpl.h (original)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/MessageStoreImpl.h Mon Oct 21 21:26:10 2013
@@ -22,14 +22,15 @@
 #ifndef QPID_LEGACYSTORE_MESSAGESTOREIMPL_H
 #define QPID_LEGACYSTORE_MESSAGESTOREIMPL_H
 
+#include <iomanip>
 #include <string>
 
 #include "db-inc.h"
 #include "qpid/linearstore/Cursor.h"
-#include "qpid/linearstore/EmptyFilePoolManagerImpl.h"
 #include "qpid/linearstore/IdDbt.h"
 #include "qpid/linearstore/IdSequence.h"
 #include "qpid/linearstore/JournalImpl.h"
+#include "qpid/linearstore/JournalLogImpl.h"
 #include "qpid/linearstore/jrnl/jcfg.h"
 #include "qpid/linearstore/jrnl/EmptyFilePoolTypes.h"
 #include "qpid/linearstore/PreparedTransaction.h"
@@ -101,10 +102,10 @@ class MessageStoreImpl : public qpid::br
 
     // Default store settings
     static const bool defTruncateFlag = false;
-    static const uint32_t defWCachePageSizeKib = JRNL_WMGR_DEF_PAGE_SIZE_KIB;
+    static const uint32_t defWCachePageSizeKib = QLS_WMGR_DEF_PAGE_SIZE_KIB;
     static const uint32_t defTplWCachePageSizeKib = defWCachePageSizeKib / 8;
     static const uint16_t defEfpPartition = 1;
-    static const uint64_t defEfpFileSizeKib = 512 * JRNL_SBLK_SIZE_KIB;
+    static const uint64_t defEfpFileSizeKib = 512 * QLS_SBLK_SIZE_KIB;
     static const std::string storeTopLevelDir;
 
     static qpid::sys::Duration defJournalGetEventsTimeout;
@@ -143,7 +144,8 @@ class MessageStoreImpl : public qpid::br
     bool isInit;
     const char* envPath;
     qpid::broker::Broker* broker;
-    boost::shared_ptr<EmptyFilePoolManagerImpl> efpMgr;
+    JournalLogImpl jrnlLog;
+    boost::shared_ptr<qpid::qls_jrnl::EmptyFilePoolManager> efpMgr;
 
     qmf::org::apache::qpid::linearstore::Store::shared_ptr mgmtObject;
     qpid::management::ManagementAgent* agent;

Modified: qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/StorePlugin.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/StorePlugin.cpp?rev=1534383&r1=1534382&r2=1534383&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/StorePlugin.cpp (original)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/StorePlugin.cpp Mon Oct 21 21:26:10 2013
@@ -23,7 +23,7 @@
 #include "qpid/Plugin.h"
 #include "qpid/Options.h"
 #include "qpid/DataDir.h"
-#include "qpid/linearstore/QpidLog.h"
+#include "qpid/linearstore/JournalLogImpl.h"
 #include "qpid/linearstore/MessageStoreImpl.h"
 
 using qpid::linearstore::MessageStoreImpl;

Modified: qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/AtomicCounter.h
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/AtomicCounter.h?rev=1534383&r1=1534382&r2=1534383&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/AtomicCounter.h (original)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/AtomicCounter.h Mon Oct 21 21:26:10 2013
@@ -31,105 +31,93 @@ template <class T>
 class AtomicCounter
 {
 private:
-    T count;
+    T count_;
     mutable smutex countMutex;
 
 public:
-    AtomicCounter(const T& initValue = T(0)) : count(initValue) {}
+    AtomicCounter(const T& initValue = T(0)) : count_(initValue) {}
 
     virtual ~AtomicCounter() {}
 
     T get() const {
         slock l(countMutex);
-        return count;
+        return count_;
     }
 
     T increment() {
         slock l(countMutex);
-        return ++count;
+        return ++count_;
     }
 
     T add(const T& a) {
         slock l(countMutex);
-        count += a;
-        return count;
+        count_ += a;
+        return count_;
     }
 
     T addLimit(const T& a, const T& limit, const uint32_t jerr) {
         slock l(countMutex);
-        if (count + a > limit) throw jexception(jerr, "AtomicCounter", "addLimit");
-        count += a;
-        return count;
+        if (count_ + a > limit) throw jexception(jerr, "AtomicCounter", "addLimit");
+        count_ += a;
+        return count_;
     }
 
     T decrement() {
         slock l(countMutex);
-        return --count;
+        return --count_;
     }
 
     T decrementLimit(const T& limit = T(0), const uint32_t jerr = jerrno::JERR__UNDERFLOW) {
         slock l(countMutex);
-        if (count < limit + 1) {
+        if (count_ < limit + 1) {
             throw jexception(jerr, "AtomicCounter", "decrementLimit");
         }
-        return --count;
+        return --count_;
     }
 
     T subtract(const T& s) {
         slock l(countMutex);
-        count -= s;
-        return count;
+        count_ -= s;
+        return count_;
     }
 
     T subtractLimit(const T& s, const T& limit = T(0), const uint32_t jerr = jerrno::JERR__UNDERFLOW) {
         slock l(countMutex);
-        if (count < limit + s) throw jexception(jerr, "AtomicCounter", "subtractLimit");
-        count -= s;
-        return count;
+        if (count_ < limit + s) throw jexception(jerr, "AtomicCounter", "subtractLimit");
+        count_ -= s;
+        return count_;
     }
 
     bool operator==(const T& o) const {
         slock l(countMutex);
-        return count == o;
+        return count_ == o;
     }
 
     bool operator<(const T& o) const {
         slock l(countMutex);
-        return count < o;
+        return count_ < o;
     }
 
     bool operator<=(const T& o) const {
         slock l(countMutex);
-        return count <= o;
+        return count_ <= o;
     }
 
     friend T operator-(const T& a, const AtomicCounter& b) {
         slock l(b.countMutex);
-        return a - b.count;
+        return a - b.count_;
     }
 
     friend T operator-(const AtomicCounter& a, const T& b) {
         slock l(a.countMutex);
-        return a.count - b;
+        return a.count_ - b;
     }
 
     friend T operator-(const AtomicCounter&a, const AtomicCounter& b) {
         slock l1(a.countMutex);
         slock l2(b.countMutex);
-        return a.count - b.count;
+        return a.count_ - b.count_;
     }
-
-/*
-    friend std::ostream& operator<<(std::ostream& out, const AtomicCounter& a) {
-        T temp; // Use temp so lock is not held while streaming to out.
-        {
-            slock l(a.countMutex);
-            temp = a.count;
-        }
-        out << temp;
-        return out;
-    }
-*/
 };
 
 }} // namespace qpid::qls_jrnl

Modified: qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/EmptyFilePool.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/EmptyFilePool.cpp?rev=1534383&r1=1534382&r2=1534383&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/EmptyFilePool.cpp (original)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/EmptyFilePool.cpp Mon Oct 21 21:26:10 2013
@@ -23,229 +23,274 @@
 
 #include <cctype>
 #include <fstream>
+#include "qpid/linearstore/jrnl/EmptyFilePoolPartition.h"
 #include "qpid/linearstore/jrnl/jcfg.h"
 #include "qpid/linearstore/jrnl/jdir.h"
 #include "qpid/linearstore/jrnl/JournalFile.h"
+#include "qpid/linearstore/jrnl/JournalLog.h"
 #include "qpid/linearstore/jrnl/slock.h"
 #include "qpid/linearstore/jrnl/utils/file_hdr.h"
 #include <sys/stat.h>
+#include <unistd.h>
 #include <uuid/uuid.h>
 #include <vector>
 
-#include <iostream> // DEBUG
-
 namespace qpid {
 namespace qls_jrnl {
 
-EmptyFilePool::EmptyFilePool(const std::string& efpDirectory_,
-                             const EmptyFilePoolPartition* partitionPtr_) :
-                efpDirectory(efpDirectory_),
-                efpDataSize_kib(fileSizeKbFromDirName(efpDirectory_, partitionPtr_->partitionNumber())),
-                partitionPtr(partitionPtr_)
+EmptyFilePool::EmptyFilePool(const std::string& efpDirectory,
+                             const EmptyFilePoolPartition* partitionPtr,
+                             JournalLog& journalLogRef) :
+                efpDirectory_(efpDirectory),
+                efpDataSize_kib_(fileSizeKbFromDirName(efpDirectory, partitionPtr->getPartitionNumber())),
+                partitionPtr_(partitionPtr),
+                journalLogRef_(journalLogRef)
 {}
 
 EmptyFilePool::~EmptyFilePool() {}
 
-void
-EmptyFilePool::initialize() {
-    //std::cout << "Reading " << efpDirectory << std::endl; // DEBUG
+void EmptyFilePool::initialize() {
     std::vector<std::string> dirList;
-    jdir::read_dir(efpDirectory, dirList, false, true, false, false);
+    jdir::read_dir(efpDirectory_, dirList, false, true, false, false);
     for (std::vector<std::string>::iterator i = dirList.begin(); i != dirList.end(); ++i) {
         size_t dotPos = i->rfind(".");
         if (dotPos != std::string::npos) {
             if (i->substr(dotPos).compare(".jrnl") == 0 && i->length() == 41) {
-                std::string emptyFile(efpDirectory + "/" + (*i));
+                std::string emptyFile(efpDirectory_ + "/" + (*i));
                 if (validateEmptyFile(emptyFile)) {
                     pushEmptyFile(emptyFile);
                 }
             }
         }
     }
-    //std::cout << "Found " << emptyFileList.size() << " files" << std::endl; // DEBUG
 }
 
-efpDataSize_kib_t
-EmptyFilePool::dataSize_kib() const {
-    return efpDataSize_kib;
+efpDataSize_kib_t EmptyFilePool::dataSize_kib() const {
+    return efpDataSize_kib_;
 }
 
-efpFileSize_kib_t
-EmptyFilePool::fileSize_kib() const {
-    return efpDataSize_kib + (QLS_JRNL_FHDR_RES_SIZE_SBLKS * JRNL_SBLK_SIZE_KIB);
+efpFileSize_kib_t EmptyFilePool::fileSize_kib() const {
+    return efpDataSize_kib_ + (QLS_JRNL_FHDR_RES_SIZE_SBLKS * QLS_SBLK_SIZE_KIB);
 }
 
-efpDataSize_sblks_t
-EmptyFilePool::dataSize_sblks() const {
-    return efpDataSize_kib / JRNL_SBLK_SIZE_KIB;
+efpDataSize_sblks_t EmptyFilePool::dataSize_sblks() const {
+    return efpDataSize_kib_ / QLS_SBLK_SIZE_KIB;
 }
 
-efpFileSize_sblks_t
-EmptyFilePool::fileSize_sblks() const {
-    return (efpDataSize_kib / JRNL_SBLK_SIZE_KIB) + QLS_JRNL_FHDR_RES_SIZE_SBLKS;
+efpFileSize_sblks_t EmptyFilePool::fileSize_sblks() const {
+    return (efpDataSize_kib_ / QLS_SBLK_SIZE_KIB) + QLS_JRNL_FHDR_RES_SIZE_SBLKS;
 }
 
-efpFileCount_t
-EmptyFilePool::numEmptyFiles() const {
-    slock l(emptyFileListMutex);
-    return efpFileCount_t(emptyFileList.size());
+efpFileCount_t EmptyFilePool::numEmptyFiles() const {
+    slock l(emptyFileListMutex_);
+    return efpFileCount_t(emptyFileList_.size());
 }
 
-efpDataSize_kib_t
-EmptyFilePool::cumFileSize_kib() const {
-    slock l(emptyFileListMutex);
-    return efpDataSize_kib_t(emptyFileList.size()) * efpDataSize_kib;
+efpDataSize_kib_t EmptyFilePool::cumFileSize_kib() const {
+    slock l(emptyFileListMutex_);
+    return efpDataSize_kib_t(emptyFileList_.size()) * efpDataSize_kib_;
 }
 
-efpPartitionNumber_t
-EmptyFilePool::getPartitionNumber() const {
-    return partitionPtr->partitionNumber();
+efpPartitionNumber_t EmptyFilePool::getPartitionNumber() const {
+    return partitionPtr_->getPartitionNumber();
 }
 
-const EmptyFilePoolPartition*
-EmptyFilePool::getPartition() const {
-    return partitionPtr;
+const EmptyFilePoolPartition* EmptyFilePool::getPartition() const {
+    return partitionPtr_;
 }
 
-const efpIdentity_t
-EmptyFilePool::getIdentity() const {
-    return efpIdentity_t(partitionPtr->partitionNumber(), efpDataSize_kib);
+const efpIdentity_t EmptyFilePool::getIdentity() const {
+    return efpIdentity_t(partitionPtr_->getPartitionNumber(), efpDataSize_kib_);
 }
 
-std::string
-EmptyFilePool::takeEmptyFile(const std::string& destDirectory) {
+std::string EmptyFilePool::takeEmptyFile(const std::string& destDirectory) {
     std::string emptyFileName = popEmptyFile();
     std::string newFileName = destDirectory + emptyFileName.substr(emptyFileName.rfind('/')); // NOTE: substr() includes leading '/'
-    if (::rename(emptyFileName.c_str(), newFileName.c_str())) {
-        pushEmptyFile(emptyFileName);
-        std::ostringstream oss;
-        oss << "file=\"" << emptyFileName << "\" dest=\"" <<  newFileName << "\"" << FORMAT_SYSERR(errno);
-        throw jexception(jerrno::JERR_JDIR_FMOVE, oss.str(), "EmptyFilePool", "takeEmptyFile");
+    if (moveEmptyFile(emptyFileName.c_str(), newFileName.c_str())) {
+        // Try again with new UUID for file name
+        newFileName = destDirectory + "/" + getEfpFileName();
+        if (moveEmptyFile(emptyFileName.c_str(), newFileName.c_str())) {
+            pushEmptyFile(emptyFileName);
+            std::ostringstream oss;
+            oss << "file=\"" << emptyFileName << "\" dest=\"" <<  newFileName << "\"" << FORMAT_SYSERR(errno);
+            throw jexception(jerrno::JERR_JDIR_FMOVE, oss.str(), "EmptyFilePool", "takeEmptyFile");
+        }
     }
     return newFileName;
 }
 
-bool
-EmptyFilePool::returnEmptyFile(const JournalFile* srcFile) {
-    std::string emptyFileName(efpDirectory + srcFile->getFileName());
-    // TODO: reset file here
-    if (::rename(srcFile->getFqFileName().c_str(), emptyFileName.c_str())) {
-        std::ostringstream oss;
-        oss << "file=\"" << srcFile << "\" dest=\"" <<  emptyFileName << "\"" << FORMAT_SYSERR(errno);
-        throw jexception(jerrno::JERR_JDIR_FMOVE, oss.str(), "EmptyFilePool", "returnEmptyFile");
+void EmptyFilePool::returnEmptyFile(const std::string& fqSrcFile) {
+    std::string emptyFileName(efpDirectory_ + fqSrcFile.substr(fqSrcFile.rfind('/'))); // NOTE: substr() includes leading '/'
+    if (moveEmptyFile(fqSrcFile.c_str(), emptyFileName.c_str())) {
+        // Try again with new UUID for file name
+        emptyFileName = efpDirectory_ + "/" + getEfpFileName();
+        if (moveEmptyFile(fqSrcFile.c_str(), emptyFileName.c_str())) {
+            // Failed twice in a row - delete file
+            ::unlink(fqSrcFile.c_str());
+            return;
+        }
     }
+    resetEmptyFileHeader(emptyFileName);
     pushEmptyFile(emptyFileName);
-    return true;
 }
 
-// protected
+// --- protected functions ---
+
+void EmptyFilePool::createEmptyFile() {
+    ::file_hdr_t fh;
+    ::file_hdr_create(&fh, QLS_FILE_MAGIC, QLS_JRNL_VERSION, QLS_JRNL_FHDR_RES_SIZE_SBLKS, partitionPtr_->getPartitionNumber(), efpDataSize_kib_);
+    std::string efpfn = getEfpFileName();
+    std::ofstream ofs(efpfn.c_str(), std::ofstream::out | std::ofstream::binary);
+    if (ofs.good()) {
+        ofs.write((char*)&fh, sizeof(::file_hdr_t));
+        uint64_t rem = ((efpDataSize_kib_ + (QLS_JRNL_FHDR_RES_SIZE_SBLKS * QLS_SBLK_SIZE_KIB)) * 1024) - sizeof(::file_hdr_t);
+        while (rem--)
+            ofs.put('\0');
+        ofs.close();
+        pushEmptyFile(efpfn);
+//std::cout << "WARNING: EFP " << efpDirectory << " is empty - created new journal file " << efpfn.substr(efpfn.rfind('/') + 1) << " on the fly" << std::endl;
+    } else {
+//std::cerr << "ERROR: Unable to open file \"" << efpfn << "\"" << std::endl; // DEBUG
+    }
+}
 
-void
-EmptyFilePool::pushEmptyFile(const std::string fqFileName_) {
-    slock l(emptyFileListMutex);
-    emptyFileList.push_back(fqFileName_);
+std::string EmptyFilePool::getEfpFileName() {
+    uuid_t uuid;
+    ::uuid_generate(uuid); // NOTE: uuid_generate() is not thread safe
+    char uuid_str[37]; // 36 char uuid + trailing \0
+    ::uuid_unparse(uuid, uuid_str);
+    std::ostringstream oss;
+    oss << efpDirectory_ << "/" << uuid_str << QLS_JRNL_FILE_EXTENSION;
+    return oss.str();
 }
 
-std::string
-EmptyFilePool::popEmptyFile() {
+std::string EmptyFilePool::popEmptyFile() {
     std::string emptyFileName;
     bool isEmpty = false;
     {
-        slock l(emptyFileListMutex);
-        isEmpty = emptyFileList.empty();
+        slock l(emptyFileListMutex_);
+        isEmpty = emptyFileList_.empty();
     }
     if (isEmpty) {
         createEmptyFile();
     }
     {
-        slock l(emptyFileListMutex);
-        emptyFileName = emptyFileList.front();
-        emptyFileList.pop_front();
+        slock l(emptyFileListMutex_);
+        emptyFileName = emptyFileList_.front();
+        emptyFileList_.pop_front();
     }
     return emptyFileName;
 }
 
-void
-EmptyFilePool::createEmptyFile() {
-    ::file_hdr_t fh;
-    ::file_hdr_create(&fh, QLS_FILE_MAGIC, QLS_JRNL_VERSION, QLS_JRNL_FHDR_RES_SIZE_SBLKS, partitionPtr->partitionNumber(), efpDataSize_kib);
-    std::string efpfn = getEfpFileName();
-    std::ofstream ofs(efpfn.c_str(), std::ofstream::out | std::ofstream::binary);
-    if (ofs.good()) {
-        ofs.write((char*)&fh, sizeof(::file_hdr_t));
-        uint64_t rem = ((efpDataSize_kib + (QLS_JRNL_FHDR_RES_SIZE_SBLKS * JRNL_SBLK_SIZE_KIB)) * 1024) - sizeof(::file_hdr_t);
-        while (rem--)
-            ofs.put('\0');
-        ofs.close();
-        pushEmptyFile(efpfn);
-        std::cout << "WARNING: EFP " << efpDirectory << " is empty - created new journal file " <<
-                     efpfn.substr(efpfn.rfind('/') + 1) << " on the fly" << std::endl;
+void EmptyFilePool::pushEmptyFile(const std::string fqFileName) {
+    slock l(emptyFileListMutex_);
+    emptyFileList_.push_back(fqFileName);
+}
+
+void EmptyFilePool::resetEmptyFileHeader(const std::string& fqFileName) {
+    std::fstream fs(fqFileName.c_str(), std::fstream::in | std::fstream::out | std::fstream::binary);
+    if (fs.good()) {
+        const std::streamsize buffsize = QLS_JRNL_FHDR_RES_SIZE_SBLKS * QLS_SBLK_SIZE_BYTES;
+        char buff[buffsize];
+        fs.read((char*)buff, buffsize);
+        std::streampos bytesRead = fs.tellg();
+        if (bytesRead == buffsize) {
+            ::file_hdr_reset((::file_hdr_t*)buff);
+            ::memset(buff + sizeof(::file_hdr_t), 0, MAX_FILE_HDR_LEN - sizeof(::file_hdr_t)); // set rest of buffer to 0
+            fs.seekp(0, std::fstream::beg);
+            fs.write(buff, buffsize);
+            std::streampos bytesWritten = fs.tellp();
+            if (bytesWritten != buffsize) {
+//std::cerr << "ERROR: Unable to write file header of file \"" << fqFileName_ << "\": tried to write " << buffsize << " bytes; wrote " << bytesWritten << " bytes." << std::endl;
+            }
+        } else {
+//std::cerr << "ERROR: Unable to read file header of file \"" << fqFileName_ << "\": tried to read " << sizeof(::file_hdr_t) << " bytes; read " << bytesRead << " bytes." << std::endl;
+        }
+        fs.close();
     } else {
-        std::cerr << "ERROR: Unable to open file \"" << efpfn << "\"" << std::endl; // DEBUG
+//std::cerr << "ERROR: Unable to open file \"" << fqFileName_ << "\" for reading" << std::endl; // DEBUG
     }
 }
 
-bool
-EmptyFilePool::validateEmptyFile(const std::string& emptyFileName_) const {
+bool EmptyFilePool::validateEmptyFile(const std::string& emptyFileName) const {
+    std::ostringstream oss;
     struct stat s;
-    if (::stat(emptyFileName_.c_str(), &s))
+    if (::stat(emptyFileName.c_str(), &s))
     {
-        std::ostringstream oss;
-        oss << "stat: file=\"" << emptyFileName_ << "\"" << FORMAT_SYSERR(errno);
+        oss << "stat: file=\"" << emptyFileName << "\"" << FORMAT_SYSERR(errno);
         throw jexception(jerrno::JERR_JDIR_STAT, oss.str(), "EmptyFilePool", "validateEmptyFile");
     }
-    efpDataSize_kib_t expectedSize = (JRNL_SBLK_SIZE_KIB + efpDataSize_kib) * 1024;
+
+    // Size matches pool
+    efpDataSize_kib_t expectedSize = (QLS_SBLK_SIZE_KIB + efpDataSize_kib_) * 1024;
     if ((efpDataSize_kib_t)s.st_size != expectedSize) {
-        //std::cout << "ERROR: File " << emptyFileName << ": Incorrect size: Expected=" << expectedSize << "; actual=" << s.st_size << std::endl; // DEBUG
+        oss << "ERROR: File " << emptyFileName << ": Incorrect size: Expected=" << expectedSize << "; actual=" << s.st_size;
+        journalLogRef_.log(JournalLog::LOG_ERROR, oss.str());
         return false;
     }
 
-    std::ifstream ifs(emptyFileName_.c_str(), std::ifstream::in | std::ifstream::binary);
-    if (!ifs) {
-        //std::cout << "ERROR: File " << emptyFileName << ": Unable to open for reading" << std::endl;
+    // Open file and read header
+    std::fstream fs(emptyFileName.c_str(), std::fstream::in | std::fstream::out | std::fstream::binary);
+    if (!fs) {
+        oss << "ERROR: File " << emptyFileName << ": Unable to open for reading";
+        journalLogRef_.log(JournalLog::LOG_ERROR, oss.str());
+        return false;
+    }
+    const std::streamsize buffsize = QLS_JRNL_FHDR_RES_SIZE_SBLKS * QLS_SBLK_SIZE_BYTES;
+    char buff[buffsize];
+    fs.read((char*)buff, buffsize);
+    std::streampos bytesRead = fs.tellg();
+    if (bytesRead != buffsize) {
+        oss << "ERROR: Unable to read file header of file \"" << emptyFileName << "\": tried to read " << buffsize << " bytes; read " << bytesRead << " bytes";
+        journalLogRef_.log(JournalLog::LOG_ERROR, oss.str());
+        fs.close();
         return false;
     }
 
-    const uint8_t fhFileNameBuffLen = 50;
-    char fhFileNameBuff[fhFileNameBuffLen];
-    ::file_hdr_t fh;
-    ifs.read((char*)&fh, sizeof(::file_hdr_t));
-    uint16_t fhFileNameLen = fh._queue_name_len > fhFileNameBuffLen ? fhFileNameBuffLen : fh._queue_name_len;
-    ifs.read(fhFileNameBuff, fhFileNameLen);
-    std::string fhFileName(fhFileNameBuff, fhFileNameLen);
-    ifs.close();
-
-    if (fh._rhdr._magic != QLS_FILE_MAGIC ||
-        fh._rhdr._version != QLS_JRNL_VERSION ||
-        fh._efp_partition != partitionPtr->partitionNumber() ||
-        fh._file_size_kib != efpDataSize_kib ||
-        !::is_file_hdr_reset(&fh))
+    // Check file header
+    const bool jrnlMagicError = ((::file_hdr_t*)buff)->_rhdr._magic != QLS_FILE_MAGIC;
+    const bool jrnlVersionError = ((::file_hdr_t*)buff)->_rhdr._version != QLS_JRNL_VERSION;
+    const bool jrnlPartitionError = ((::file_hdr_t*)buff)->_efp_partition != partitionPtr_->getPartitionNumber();
+    const bool jrnlFileSizeError = ((::file_hdr_t*)buff)->_file_size_kib != efpDataSize_kib_;
+    if (jrnlMagicError || jrnlVersionError || jrnlPartitionError || jrnlFileSizeError)
     {
-        //std::cout << "ERROR: File " << emptyFileName << ": Invalid file header" << std::endl;
+        oss << "ERROR: File " << emptyFileName << ": Invalid file header - mismatched header fields: " <<
+                        (jrnlMagicError ? "magic " : "") <<
+                        (jrnlVersionError ? "version " : "") <<
+                        (jrnlPartitionError ? "partition" : "") <<
+                        (jrnlFileSizeError ? "file-size" : "");
+        journalLogRef_.log(JournalLog::LOG_ERROR, oss.str());
+        fs.close();
         return false;
     }
 
-    return true;
-}
+    // Check file header is reset
+    if (!::is_file_hdr_reset((::file_hdr_t*)buff)) {
+        ::file_hdr_reset((::file_hdr_t*)buff);
+        ::memset(buff + sizeof(::file_hdr_t), 0, MAX_FILE_HDR_LEN - sizeof(::file_hdr_t)); // set rest of buffer to 0
+        fs.seekp(0, std::fstream::beg);
+        fs.write(buff, buffsize);
+        std::streampos bytesWritten = fs.tellp();
+        if (bytesWritten != buffsize) {
+            oss << "ERROR: Unable to write file header of file \"" << emptyFileName << "\": tried to write " << buffsize << " bytes; wrote " << bytesWritten << " bytes";
+            journalLogRef_.log(JournalLog::LOG_ERROR, oss.str());
+            fs.close();
+            return false;
+        }
+        oss << "WARNING: File " << emptyFileName << ": File header not reset";
+        journalLogRef_.log(JournalLog::LOG_WARN, oss.str());
+    }
 
-std::string
-EmptyFilePool::getEfpFileName() {
-    uuid_t uuid;
-    ::uuid_generate(uuid); // NOTE: NOT THREAD SAFE
-    char uuid_str[37]; // 36 char uuid + trailing \0
-    ::uuid_unparse(uuid, uuid_str);
-    std::ostringstream oss;
-    oss << efpDirectory << "/" << uuid_str << QLS_JRNL_FILE_EXTENSION;
-    return oss.str();
+    // Close file
+    fs.close();
+    return true;
 }
 
-// protected
 // static
-efpDataSize_kib_t
-EmptyFilePool::fileSizeKbFromDirName(const std::string& dirName_,
-                                     const efpPartitionNumber_t partitionNumber_) {
+efpDataSize_kib_t EmptyFilePool::fileSizeKbFromDirName(const std::string& dirName,
+                                                       const efpPartitionNumber_t partitionNumber) {
     // Check for dirName format 'NNNk', where NNN is a number, convert NNN into an integer. NNN cannot be 0.
-    std::string n(dirName_.substr(dirName_.rfind('/')+1));
+    std::string n(dirName.substr(dirName.rfind('/')+1));
     bool valid = true;
     for (uint16_t charNum = 0; charNum < n.length(); ++charNum) {
         if (charNum < n.length()-1) {
@@ -258,12 +303,24 @@ EmptyFilePool::fileSizeKbFromDirName(con
         }
     }
     efpDataSize_kib_t s = ::atol(n.c_str());
-    if (!valid || s == 0 || s % JRNL_SBLK_SIZE_KIB != 0) {
+    if (!valid || s == 0 || s % QLS_SBLK_SIZE_KIB != 0) {
         std::ostringstream oss;
-        oss << "Partition: " << partitionNumber_ << "; EFP directory: \'" << n << "\'";
+        oss << "Partition: " << partitionNumber << "; EFP directory: \'" << n << "\'";
         throw jexception(jerrno::JERR_EFP_BADEFPDIRNAME, oss.str(), "EmptyFilePool", "fileSizeKbFromDirName");
     }
     return s;
 }
 
+// static
+int EmptyFilePool::moveEmptyFile(const std::string& from,
+                                 const std::string& to) {
+    if (::rename(from.c_str(), to.c_str())) {
+        if (errno == EEXIST) return errno; // File name exists
+        std::ostringstream oss;
+        oss << "file=\"" << from << "\" dest=\"" <<  to << "\"" << FORMAT_SYSERR(errno);
+        throw jexception(jerrno::JERR_JDIR_FMOVE, oss.str(), "EmptyFilePool", "returnEmptyFile");
+    }
+    return 0;
+}
+
 }} // namespace qpid::qls_jrnl

Modified: qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/EmptyFilePool.h
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/EmptyFilePool.h?rev=1534383&r1=1534382&r2=1534383&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/EmptyFilePool.h (original)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/EmptyFilePool.h Mon Oct 21 21:26:10 2013
@@ -30,15 +30,16 @@ namespace qls_jrnl {
 }} // namespace qpid::qls_jrnl
 
 #include <deque>
-#include "qpid/linearstore/jrnl/EmptyFilePoolPartition.h"
 #include "qpid/linearstore/jrnl/EmptyFilePoolTypes.h"
 #include "qpid/linearstore/jrnl/smutex.h"
 #include <string>
 
 namespace qpid {
 namespace qls_jrnl {
+class EmptyFilePoolPartition;
 class jdir;
 class JournalFile;
+class JournalLog;
 
 class EmptyFilePool
 {
@@ -46,17 +47,19 @@ protected:
     typedef std::deque<std::string> emptyFileList_t;
     typedef emptyFileList_t::iterator emptyFileListItr_t;
 
-    const std::string efpDirectory;
-    const efpDataSize_kib_t efpDataSize_kib;
-    const EmptyFilePoolPartition* partitionPtr;
+    const std::string efpDirectory_;
+    const efpDataSize_kib_t efpDataSize_kib_;
+    const EmptyFilePoolPartition* partitionPtr_;
+    JournalLog& journalLogRef_;
 
 private:
-    emptyFileList_t emptyFileList;
-    smutex emptyFileListMutex;
+    emptyFileList_t emptyFileList_;
+    smutex emptyFileListMutex_;
 
 public:
-    EmptyFilePool(const std::string& efpDirectory_,
-                  const EmptyFilePoolPartition* partitionPtr_);
+    EmptyFilePool(const std::string& efpDirectory,
+                  const EmptyFilePoolPartition* partitionPtr,
+                  JournalLog& journalLogRef);
     virtual ~EmptyFilePool();
 
     void initialize();
@@ -70,17 +73,21 @@ public:
     const EmptyFilePoolPartition* getPartition() const;
     const efpIdentity_t getIdentity() const;
 
-    std::string takeEmptyFile(const std::string& destDirectory_);
-    bool returnEmptyFile(const JournalFile* srcFile_);
+    std::string takeEmptyFile(const std::string& destDirectory);
+    void returnEmptyFile(const std::string& srcFile);
 
 protected:
-    void pushEmptyFile(const std::string fqFileName_);
-    std::string popEmptyFile();
     void createEmptyFile();
-    bool validateEmptyFile(const std::string& emptyFileName_) const;
     std::string getEfpFileName();
-    static efpDataSize_kib_t fileSizeKbFromDirName(const std::string& dirName_,
-                                                   const efpPartitionNumber_t partitionNumber_);
+    std::string popEmptyFile();
+    void pushEmptyFile(const std::string fqFileName);
+    void resetEmptyFileHeader(const std::string& fqFileName);
+    bool validateEmptyFile(const std::string& emptyFileName) const;
+
+    static efpDataSize_kib_t fileSizeKbFromDirName(const std::string& dirName,
+                                                   const efpPartitionNumber_t partitionNumber);
+    static int moveEmptyFile(const std::string& fromFqPath,
+                             const std::string& toFqPath);
 };
 
 }} // namespace qpid::qls_jrnl

Modified: qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/EmptyFilePoolManager.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/EmptyFilePoolManager.cpp?rev=1534383&r1=1534382&r2=1534383&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/EmptyFilePoolManager.cpp (original)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/EmptyFilePoolManager.cpp Mon Oct 21 21:26:10 2013
@@ -27,39 +27,37 @@
 #include "qpid/linearstore/jrnl/slock.h"
 #include <vector>
 
-// DEBUG
-//#include <iostream>
-
 namespace qpid {
 namespace qls_jrnl {
 
-EmptyFilePoolManager::EmptyFilePoolManager(const std::string& qlsStorePath_) :
-                qlsStorePath(qlsStorePath_)
+EmptyFilePoolManager::EmptyFilePoolManager(const std::string& qlsStorePath,
+                                           JournalLog& journalLogRef) :
+                qlsStorePath_(qlsStorePath),
+                journalLogRef_(journalLogRef)
 {}
 
 EmptyFilePoolManager::~EmptyFilePoolManager() {
-    slock l(partitionMapMutex);
-    for (partitionMapItr_t i = partitionMap.begin(); i != partitionMap.end(); ++i) {
+    slock l(partitionMapMutex_);
+    for (partitionMapItr_t i = partitionMap_.begin(); i != partitionMap_.end(); ++i) {
         delete i->second;
     }
-    partitionMap.clear();
+    partitionMap_.clear();
 }
 
-void
-EmptyFilePoolManager::findEfpPartitions() {
+void EmptyFilePoolManager::findEfpPartitions() {
     //std::cout << "*** Reading " << qlsStorePath << std::endl; // DEBUG
     std::vector<std::string> dirList;
-    jdir::read_dir(qlsStorePath, dirList, true, false, true, false);
+    jdir::read_dir(qlsStorePath_, dirList, true, false, true, false);
     for (std::vector<std::string>::iterator i = dirList.begin(); i != dirList.end(); ++i) {
         if ((*i)[0] == 'p' && i->length() == 4) { // Filter: look only at names pNNN
             efpPartitionNumber_t pn = ::atoi(i->c_str() + 1);
-            std::string fullDirPath(qlsStorePath + "/" + (*i));
+            std::string fullDirPath(qlsStorePath_ + "/" + (*i));
             EmptyFilePoolPartition* efppp = 0;
             try {
-                efppp = new EmptyFilePoolPartition(pn, fullDirPath);
+                efppp = new EmptyFilePoolPartition(pn, fullDirPath, journalLogRef_);
                 {
-                    slock l(partitionMapMutex);
-                    partitionMap[pn] = efppp;
+                    slock l(partitionMapMutex_);
+                    partitionMap_[pn] = efppp;
                 }
             } catch (const std::exception& e) {
                 if (efppp != 0) {
@@ -72,34 +70,63 @@ EmptyFilePoolManager::findEfpPartitions(
                 efppp->findEmptyFilePools();
         }
     }
+    // TODO: Log results
+/*
+    QLS_LOG(info, "EFP Manager initialization complete");
+    std::vector<qpid::qls_jrnl::EmptyFilePoolPartition*> partitionList;
+    std::vector<qpid::qls_jrnl::EmptyFilePool*> filePoolList;
+    getEfpPartitions(partitionList);
+    if (partitionList.size() == 0) {
+        QLS_LOG(error, "NO EFP PARTITIONS FOUND! No queue creation is possible.")
+    } else {
+        QLS_LOG(info, "> EFP Partitions found: " << partitionList.size());
+        for (std::vector<qpid::qls_jrnl::EmptyFilePoolPartition*>::const_iterator i=partitionList.begin(); i!= partitionList.end(); ++i) {
+            filePoolList.clear();
+            (*i)->getEmptyFilePools(filePoolList);
+            QLS_LOG(info, "  * Partition " << (*i)->partitionNumber() << " containing " << filePoolList.size() << " pool" <<
+                          (filePoolList.size()>1 ? "s" : "") << " at \'" << (*i)->partitionDirectory() << "\'");
+            for (std::vector<qpid::qls_jrnl::EmptyFilePool*>::const_iterator j=filePoolList.begin(); j!=filePoolList.end(); ++j) {
+                QLS_LOG(info, "    - EFP \'" << (*j)->dataSize_kib() << "k\' containing " << (*j)->numEmptyFiles() <<
+                              " files of size " << (*j)->dataSize_kib() << " KiB totaling " << (*j)->cumFileSize_kib() << " KiB");
+            }
+        }
+    }
+*/
 }
 
-uint16_t
-EmptyFilePoolManager::getNumEfpPartitions() const {
-    return partitionMap.size();
+void EmptyFilePoolManager::getEfpFileSizes(std::vector<efpDataSize_kib_t>& efpFileSizeList,
+                                           const efpPartitionNumber_t efpPartitionNumber) const {
+    if (efpPartitionNumber == 0) {
+        for (partitionMapConstItr_t i=partitionMap_.begin(); i!=partitionMap_.end(); ++i) {
+            i->second->getEmptyFilePoolSizes_kib(efpFileSizeList);
+        }
+    } else {
+        partitionMapConstItr_t i = partitionMap_.find(efpPartitionNumber);
+        if (i != partitionMap_.end()) {
+            i->second->getEmptyFilePoolSizes_kib(efpFileSizeList);
+        }
+    }
 }
 
-EmptyFilePoolPartition*
-EmptyFilePoolManager::getEfpPartition(const efpPartitionNumber_t partitionNumber) {
-    partitionMapItr_t i = partitionMap.find(partitionNumber);
-    if (i == partitionMap.end())
+EmptyFilePoolPartition* EmptyFilePoolManager::getEfpPartition(const efpPartitionNumber_t partitionNumber) {
+    partitionMapItr_t i = partitionMap_.find(partitionNumber);
+    if (i == partitionMap_.end())
         return 0;
     else
         return i->second;
 }
 
-void
-EmptyFilePoolManager::getEfpPartitionNumbers(std::vector<efpPartitionNumber_t>& partitionNumberList,
-                                             const efpDataSize_kib_t efpFileSizeKb) const {
-    slock l(partitionMapMutex);
-    for (partitionMapConstItr_t i=partitionMap.begin(); i!=partitionMap.end(); ++i) {
-        if (efpFileSizeKb == 0) {
+void EmptyFilePoolManager::getEfpPartitionNumbers(std::vector<efpPartitionNumber_t>& partitionNumberList,
+                                                  const efpDataSize_kib_t efpDataSize_kib) const {
+    slock l(partitionMapMutex_);
+    for (partitionMapConstItr_t i=partitionMap_.begin(); i!=partitionMap_.end(); ++i) {
+        if (efpDataSize_kib == 0) {
             partitionNumberList.push_back(i->first);
         } else {
             std::vector<efpDataSize_kib_t> efpFileSizeList;
-            i->second->getEmptyFilePoolSizesKb(efpFileSizeList);
+            i->second->getEmptyFilePoolSizes_kib(efpFileSizeList);
             for (std::vector<efpDataSize_kib_t>::iterator j=efpFileSizeList.begin(); j!=efpFileSizeList.end(); ++j) {
-                if (*j == efpFileSizeKb) {
+                if (*j == efpDataSize_kib) {
                     partitionNumberList.push_back(i->first);
                     break;
                 }
@@ -108,18 +135,17 @@ EmptyFilePoolManager::getEfpPartitionNum
     }
 }
 
-void
-EmptyFilePoolManager::getEfpPartitions(std::vector<EmptyFilePoolPartition*>& partitionList,
-                                       const efpDataSize_kib_t efpFileSizeKb) {
-    slock l(partitionMapMutex);
-    for (partitionMapConstItr_t i=partitionMap.begin(); i!=partitionMap.end(); ++i) {
-        if (efpFileSizeKb == 0) {
+void EmptyFilePoolManager::getEfpPartitions(std::vector<EmptyFilePoolPartition*>& partitionList,
+                                            const efpDataSize_kib_t efpDataSize_kib) {
+    slock l(partitionMapMutex_);
+    for (partitionMapConstItr_t i=partitionMap_.begin(); i!=partitionMap_.end(); ++i) {
+        if (efpDataSize_kib == 0) {
             partitionList.push_back(i->second);
         } else {
             std::vector<efpDataSize_kib_t> efpFileSizeList;
-            i->second->getEmptyFilePoolSizesKb(efpFileSizeList);
+            i->second->getEmptyFilePoolSizes_kib(efpFileSizeList);
             for (std::vector<efpDataSize_kib_t>::iterator j=efpFileSizeList.begin(); j!=efpFileSizeList.end(); ++j) {
-                if (*j == efpFileSizeKb) {
+                if (*j == efpDataSize_kib) {
                     partitionList.push_back(i->second);
                     break;
                 }
@@ -128,48 +154,34 @@ EmptyFilePoolManager::getEfpPartitions(s
     }
 }
 
-void
-EmptyFilePoolManager::getEfpFileSizes(std::vector<efpDataSize_kib_t>& efpFileSizeList,
-                                      const efpPartitionNumber_t efpPartitionNumber) const {
-    if (efpPartitionNumber == 0) {
-        for (partitionMapConstItr_t i=partitionMap.begin(); i!=partitionMap.end(); ++i) {
-            i->second->getEmptyFilePoolSizesKb(efpFileSizeList);
-        }
-    } else {
-        partitionMapConstItr_t i = partitionMap.find(efpPartitionNumber);
-        if (i != partitionMap.end()) {
-            i->second->getEmptyFilePoolSizesKb(efpFileSizeList);
-        }
-    }
+EmptyFilePool* EmptyFilePoolManager::getEmptyFilePool(const efpIdentity_t efpIdentity) {
+    return getEmptyFilePool(efpIdentity.first, efpIdentity.second);
+}
+
+EmptyFilePool* EmptyFilePoolManager::getEmptyFilePool(const efpPartitionNumber_t partitionNumber,
+                                                      const efpDataSize_kib_t efpDataSize_kib) {
+    EmptyFilePoolPartition* efppp = getEfpPartition(partitionNumber);
+    if (efppp != 0)
+        return efppp->getEmptyFilePool(efpDataSize_kib);
+    return 0;
 }
 
-void
-EmptyFilePoolManager::getEmptyFilePools(std::vector<EmptyFilePool*>& emptyFilePoolList,
-                                        const efpPartitionNumber_t efpPartitionNumber) {
+void EmptyFilePoolManager::getEmptyFilePools(std::vector<EmptyFilePool*>& emptyFilePoolList,
+                                             const efpPartitionNumber_t efpPartitionNumber) {
     if (efpPartitionNumber == 0) {
-        for (partitionMapConstItr_t i=partitionMap.begin(); i!=partitionMap.end(); ++i) {
+        for (partitionMapConstItr_t i=partitionMap_.begin(); i!=partitionMap_.end(); ++i) {
             i->second->getEmptyFilePools(emptyFilePoolList);
         }
     } else {
-        partitionMapConstItr_t i = partitionMap.find(efpPartitionNumber);
-        if (i != partitionMap.end()) {
+        partitionMapConstItr_t i = partitionMap_.find(efpPartitionNumber);
+        if (i != partitionMap_.end()) {
             i->second->getEmptyFilePools(emptyFilePoolList);
         }
     }
 }
 
-EmptyFilePool*
-EmptyFilePoolManager::getEmptyFilePool(const efpPartitionNumber_t partitionNumber,
-                                       const efpDataSize_kib_t efpFileSizeKib) {
-    EmptyFilePoolPartition* efppp = getEfpPartition(partitionNumber);
-    if (efppp != 0)
-        return efppp->getEmptyFilePool(efpFileSizeKib);
-    return 0;
-}
-
-EmptyFilePool*
-EmptyFilePoolManager::getEmptyFilePool(const efpIdentity_t efpIdentity) {
-    return getEmptyFilePool(efpIdentity.first, efpIdentity.second);
+uint16_t EmptyFilePoolManager::getNumEfpPartitions() const {
+    return partitionMap_.size();
 }
 
 }} // namespace qpid::qls_jrnl

Modified: qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/EmptyFilePoolManager.h
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/EmptyFilePoolManager.h?rev=1534383&r1=1534382&r2=1534383&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/EmptyFilePoolManager.h (original)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/EmptyFilePoolManager.h Mon Oct 21 21:26:10 2013
@@ -25,7 +25,6 @@
 #include <map>
 #include "qpid/linearstore/jrnl/EmptyFilePoolPartition.h"
 #include "qpid/linearstore/jrnl/smutex.h"
-#include <string>
 
 namespace qpid {
 namespace qls_jrnl {
@@ -37,25 +36,30 @@ protected:
     typedef partitionMap_t::iterator partitionMapItr_t;
     typedef partitionMap_t::const_iterator partitionMapConstItr_t;
 
-    std::string qlsStorePath;
-    partitionMap_t partitionMap;
-    smutex partitionMapMutex;
+    std::string qlsStorePath_;
+    JournalLog& journalLogRef_;
+    partitionMap_t partitionMap_;
+    smutex partitionMapMutex_;
 
 public:
-    EmptyFilePoolManager(const std::string& qlsStorePath_);
+    EmptyFilePoolManager(const std::string& qlsStorePath_,
+                         JournalLog& journalLogRef_);
     virtual ~EmptyFilePoolManager();
-    void findEfpPartitions();
 
-    uint16_t getNumEfpPartitions() const;
+    void findEfpPartitions();
+    void getEfpFileSizes(std::vector<efpDataSize_kib_t>& efpFileSizeList,
+                         const efpPartitionNumber_t efpPartitionNumber = 0) const;
     EmptyFilePoolPartition* getEfpPartition(const efpPartitionNumber_t partitionNumber);
-    void getEfpPartitionNumbers(std::vector<efpPartitionNumber_t>& partitionNumberList, const efpDataSize_kib_t efpFileSizeKb = 0) const;
-    void getEfpPartitions(std::vector<EmptyFilePoolPartition*>& partitionList, const efpDataSize_kib_t efpFileSizeKb = 0);
-
-    void getEfpFileSizes(std::vector<efpDataSize_kib_t>& efpFileSizeList, const efpPartitionNumber_t efpPartitionNumber = 0) const;
-    void getEmptyFilePools(std::vector<EmptyFilePool*>& emptyFilePoolList, const efpPartitionNumber_t efpPartitionNumber = 0);
-
-    EmptyFilePool* getEmptyFilePool(const efpPartitionNumber_t partitionNumber, const efpDataSize_kib_t efpFileSizeKb);
+    void getEfpPartitionNumbers(std::vector<efpPartitionNumber_t>& partitionNumberList,
+                                const efpDataSize_kib_t efpDataSize_kib = 0) const;
+    void getEfpPartitions(std::vector<EmptyFilePoolPartition*>& partitionList,
+                          const efpDataSize_kib_t efpDataSize_kib = 0);
     EmptyFilePool* getEmptyFilePool(const efpIdentity_t efpIdentity);
+    EmptyFilePool* getEmptyFilePool(const efpPartitionNumber_t partitionNumber,
+                                    const efpDataSize_kib_t efpDataSize_kib);
+    void getEmptyFilePools(std::vector<EmptyFilePool*>& emptyFilePoolList,
+                           const efpPartitionNumber_t efpPartitionNumber = 0);
+    uint16_t getNumEfpPartitions() const;
 };
 
 }} // namespace qpid::qls_jrnl

Modified: qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/EmptyFilePoolPartition.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/EmptyFilePoolPartition.cpp?rev=1534383&r1=1534382&r2=1534383&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/EmptyFilePoolPartition.cpp (original)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/EmptyFilePoolPartition.cpp Mon Oct 21 21:26:10 2013
@@ -27,62 +27,54 @@
 #include "qpid/linearstore/jrnl/jexception.h"
 #include "qpid/linearstore/jrnl/slock.h"
 
-//#include <iostream> // DEBUG
-
 namespace qpid {
 namespace qls_jrnl {
 
-const std::string EmptyFilePoolPartition::efpTopLevelDir("efp"); // Sets the top-level efp dir within a partition
+// static
+const std::string EmptyFilePoolPartition::s_efpTopLevelDir_("efp"); // Sets the top-level efp dir within a partition
 
-EmptyFilePoolPartition::EmptyFilePoolPartition(const efpPartitionNumber_t partitionNum_, const std::string& partitionDir_) :
-                partitionNum(partitionNum_),
-                partitionDir(partitionDir_)
+EmptyFilePoolPartition::EmptyFilePoolPartition(const efpPartitionNumber_t partitionNum,
+                                               const std::string& partitionDir,
+                                               JournalLog& journalLogRef) :
+                partitionNum_(partitionNum),
+                partitionDir_(partitionDir),
+                journalLogRef_(journalLogRef)
 {
     validatePartitionDir();
 }
 
 EmptyFilePoolPartition::~EmptyFilePoolPartition() {
-    slock l(efpMapMutex);
-    for (efpMapItr_t i = efpMap.begin(); i != efpMap.end(); ++i) {
+    slock l(efpMapMutex_);
+    for (efpMapItr_t i = efpMap_.begin(); i != efpMap_.end(); ++i) {
         delete i->second;
     }
-    efpMap.clear();
-}
-
-void
-EmptyFilePoolPartition::validatePartitionDir() {
-    if (!jdir::is_dir(partitionDir)) {
-        std::ostringstream ss;
-        ss << "Invalid partition directory: \'" << partitionDir << "\' is not a directory";
-        throw jexception(jerrno::JERR_EFP_BADPARTITIONDIR, ss.str(), "EmptyFilePoolPartition", "validatePartitionDir");
-    }
-    // TODO: other validity checks here
+    efpMap_.clear();
 }
 
 void
 EmptyFilePoolPartition::findEmptyFilePools() {
     //std::cout << "Reading " << partitionDir << std::endl; // DEBUG
     std::vector<std::string> dirList;
-    jdir::read_dir(partitionDir, dirList, true, false, false, false);
+    jdir::read_dir(partitionDir_, dirList, true, false, false, false);
     bool foundEfpDir = false;
     for (std::vector<std::string>::iterator i = dirList.begin(); i != dirList.end(); ++i) {
-        if (i->compare(efpTopLevelDir) == 0) {
+        if (i->compare(s_efpTopLevelDir_) == 0) {
             foundEfpDir = true;
             break;
         }
     }
     if (foundEfpDir) {
-        std::string efpDir(partitionDir + "/" + efpTopLevelDir);
+        std::string efpDir(partitionDir_ + "/" + s_efpTopLevelDir_);
         //std::cout << "Reading " << efpDir << std::endl; // DEBUG
         dirList.clear();
         jdir::read_dir(efpDir, dirList, true, false, false, true);
         for (std::vector<std::string>::iterator i = dirList.begin(); i != dirList.end(); ++i) {
             EmptyFilePool* efpp = 0;
             try {
-                efpp = new EmptyFilePool(*i, this);
+                efpp = new EmptyFilePool(*i, this, journalLogRef_);
                 {
-                    slock l(efpMapMutex);
-                    efpMap[efpp->dataSize_kib()] = efpp;
+                    slock l(efpMapMutex_);
+                    efpMap_[efpp->dataSize_kib()] = efpp;
                 }
             }
             catch (const std::exception& e) {
@@ -98,36 +90,42 @@ EmptyFilePoolPartition::findEmptyFilePoo
     }
 }
 
-efpPartitionNumber_t
-EmptyFilePoolPartition::partitionNumber() const {
-    return partitionNum;
+EmptyFilePool* EmptyFilePoolPartition::getEmptyFilePool(const efpDataSize_kib_t efpDataSize_kib) {
+    efpMapItr_t i = efpMap_.find(efpDataSize_kib);
+    if (i == efpMap_.end())
+        return 0;
+    return i->second;
 }
 
-std::string
-EmptyFilePoolPartition::partitionDirectory() const {
-    return partitionDir;
+void EmptyFilePoolPartition::getEmptyFilePools(std::vector<EmptyFilePool*>& efpList) {
+    for (efpMapItr_t i=efpMap_.begin(); i!=efpMap_.end(); ++i) {
+        efpList.push_back(i->second);
+    }
 }
 
-EmptyFilePool*
-EmptyFilePoolPartition::getEmptyFilePool(const efpDataSize_kib_t efpFileSizeKb) {
-    efpMapItr_t i = efpMap.find(efpFileSizeKb);
-    if (i == efpMap.end())
-        return 0;
-    return i->second;
+void EmptyFilePoolPartition::getEmptyFilePoolSizes_kib(std::vector<efpDataSize_kib_t>& efpDataSizesList_kib) const {
+    for (efpMapConstItr_t i=efpMap_.begin(); i!=efpMap_.end(); ++i) {
+        efpDataSizesList_kib.push_back(i->first);
+    }
 }
 
-void
-EmptyFilePoolPartition::getEmptyFilePoolSizesKb(std::vector<efpDataSize_kib_t>& efpFileSizesKbList) const {
-    for (efpMapConstItr_t i=efpMap.begin(); i!=efpMap.end(); ++i) {
-        efpFileSizesKbList.push_back(i->first);
-    }
+std::string EmptyFilePoolPartition::getPartitionDirectory() const {
+    return partitionDir_;
 }
 
-void
-EmptyFilePoolPartition::getEmptyFilePools(std::vector<EmptyFilePool*>& efpList) {
-    for (efpMapItr_t i=efpMap.begin(); i!=efpMap.end(); ++i) {
-        efpList.push_back(i->second);
+efpPartitionNumber_t EmptyFilePoolPartition::getPartitionNumber() const {
+    return partitionNum_;
+}
+
+// --- protected functions ---
+
+void EmptyFilePoolPartition::validatePartitionDir() {
+    if (!jdir::is_dir(partitionDir_)) {
+        std::ostringstream ss;
+        ss << "Invalid partition directory: \'" << partitionDir_ << "\' is not a directory";
+        throw jexception(jerrno::JERR_EFP_BADPARTITIONDIR, ss.str(), "EmptyFilePoolPartition", "validatePartitionDir");
     }
+    // TODO: other validity checks here
 }
 
 }} // namespace qpid::qls_jrnl

Modified: qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/EmptyFilePoolPartition.h
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/EmptyFilePoolPartition.h?rev=1534383&r1=1534382&r2=1534383&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/EmptyFilePoolPartition.h (original)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/EmptyFilePoolPartition.h Mon Oct 21 21:26:10 2013
@@ -38,33 +38,38 @@ namespace qls_jrnl {
 
 namespace qpid {
 namespace qls_jrnl {
+class JournalLog;
 
 class EmptyFilePoolPartition
 {
 public:
-    static const std::string efpTopLevelDir;
+    static const std::string s_efpTopLevelDir_;
 protected:
     typedef std::map<efpDataSize_kib_t, EmptyFilePool*> efpMap_t;
     typedef efpMap_t::iterator efpMapItr_t;
     typedef efpMap_t::const_iterator efpMapConstItr_t;
 
-    const efpPartitionNumber_t partitionNum;
-    const std::string partitionDir;
-    efpMap_t efpMap;
-    smutex efpMapMutex;
-
-    void validatePartitionDir();
+    const efpPartitionNumber_t partitionNum_;
+    const std::string partitionDir_;
+    JournalLog& journalLogRef_;
+    efpMap_t efpMap_;
+    smutex efpMapMutex_;
 
 public:
-    EmptyFilePoolPartition(const efpPartitionNumber_t partitionNum_, const std::string& partitionDir_);
+    EmptyFilePoolPartition(const efpPartitionNumber_t partitionNum,
+                           const std::string& partitionDir,
+                           JournalLog& journalLogRef);
     virtual ~EmptyFilePoolPartition();
-    void findEmptyFilePools();
-    efpPartitionNumber_t partitionNumber() const;
-    std::string partitionDirectory() const;
 
-    EmptyFilePool* getEmptyFilePool(const efpDataSize_kib_t efpFileSizeKb);
-    void getEmptyFilePoolSizesKb(std::vector<efpDataSize_kib_t>& efpFileSizesKbList) const;
+    void findEmptyFilePools();
+    EmptyFilePool* getEmptyFilePool(const efpDataSize_kib_t efpDataSize_kib);
     void getEmptyFilePools(std::vector<EmptyFilePool*>& efpList);
+    void getEmptyFilePoolSizes_kib(std::vector<efpDataSize_kib_t>& efpDataSizesList) const;
+    std::string getPartitionDirectory() const;
+    efpPartitionNumber_t getPartitionNumber() const;
+
+protected:
+    void validatePartitionDir();
 };
 
 }} // namespace qpid::qls_jrnl

Modified: qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/EmptyFilePoolTypes.h
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/EmptyFilePoolTypes.h?rev=1534383&r1=1534382&r2=1534383&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/EmptyFilePoolTypes.h (original)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/EmptyFilePoolTypes.h Mon Oct 21 21:26:10 2013
@@ -28,13 +28,13 @@
 namespace qpid {
 namespace qls_jrnl {
 
-    typedef uint64_t efpDataSize_kib_t;   // Size of data part of file (excluding file header) in kib
-    typedef uint64_t efpFileSize_kib_t;   // Size of file (header + data) in kib
-    typedef uint32_t efpDataSize_sblks_t; // Size of data part of file (excluding file header) in sblks
-    typedef uint32_t efpFileSize_sblks_t; // Size of file (header + data) in sblks
-    typedef uint32_t efpFileCount_t;
-    typedef uint16_t efpPartitionNumber_t;
-    typedef std::pair<efpPartitionNumber_t, efpDataSize_kib_t> efpIdentity_t;
+    typedef uint64_t efpDataSize_kib_t;     ///< Size of data part of file (excluding file header) in kib
+    typedef uint64_t efpFileSize_kib_t;     ///< Size of file (header + data) in kib
+    typedef uint32_t efpDataSize_sblks_t;   ///< Size of data part of file (excluding file header) in sblks
+    typedef uint32_t efpFileSize_sblks_t;   ///< Size of file (header + data) in sblks
+    typedef uint32_t efpFileCount_t;        ///< Number of files in a partition or pool
+    typedef uint16_t efpPartitionNumber_t;  ///< Number assigned to a partition
+    typedef std::pair<efpPartitionNumber_t, efpDataSize_kib_t> efpIdentity_t; ///< Unique identity of a pool, consisting of the partition number and data size
 
 }} // namespace qpid::qls_jrnl
 



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org