You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kp...@apache.org on 2013/10/21 23:26:11 UTC
svn commit: r1534383 [1/4] - in /qpid/branches/linearstore/qpid/cpp/src: ./
qpid/broker/ qpid/legacystore/jrnl/ qpid/linearstore/
qpid/linearstore/jrnl/ qpid/linearstore/jrnl/utils/
Author: kpvdr
Date: Mon Oct 21 21:26:10 2013
New Revision: 1534383
URL: http://svn.apache.org/r1534383
Log:
QPID-4984: WIP - Compiles, but functionally incomplete. Transactions not yet functional.
Added:
qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/JournalLogImpl.cpp
qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/JournalLogImpl.h
- copied, changed from r1527754, qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/QpidLog.h
Removed:
qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/EmptyFilePoolManagerImpl.cpp
qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/EmptyFilePoolManagerImpl.h
qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/QpidLog.h
qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/rcvdat.h
qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/rmgr.cpp
qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/rmgr.h
Modified:
qpid/branches/linearstore/qpid/cpp/src/linearstore.cmake
qpid/branches/linearstore/qpid/cpp/src/qpid/broker/Broker.cpp
qpid/branches/linearstore/qpid/cpp/src/qpid/broker/Queue.cpp
qpid/branches/linearstore/qpid/cpp/src/qpid/legacystore/jrnl/pmgr.cpp
qpid/branches/linearstore/qpid/cpp/src/qpid/legacystore/jrnl/rmgr.cpp
qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/JournalImpl.cpp
qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/JournalImpl.h
qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/MessageStoreImpl.cpp
qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/MessageStoreImpl.h
qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/StorePlugin.cpp
qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/AtomicCounter.h
qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/EmptyFilePool.cpp
qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/EmptyFilePool.h
qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/EmptyFilePoolManager.cpp
qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/EmptyFilePoolManager.h
qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/EmptyFilePoolPartition.cpp
qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/EmptyFilePoolPartition.h
qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/EmptyFilePoolTypes.h
qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/JournalFile.cpp
qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/JournalFile.h
qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/JournalLog.cpp
qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/JournalLog.h
qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/LinearFileController.cpp
qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/LinearFileController.h
qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/RecoveryManager.cpp
qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/RecoveryManager.h
qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/deq_rec.cpp
qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/enq_map.cpp
qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/enq_map.h
qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/enq_rec.cpp
qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/enums.h
qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/jcfg.h
qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/jcntl.cpp
qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/jcntl.h
qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/jerrno.cpp
qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/jerrno.h
qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/jrec.h
qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/pmgr.cpp
qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/pmgr.h
qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/txn_rec.cpp
qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/utils/file_hdr.c
qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/wmgr.cpp
qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/wmgr.h
Modified: qpid/branches/linearstore/qpid/cpp/src/linearstore.cmake
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/linearstore.cmake?rev=1534383&r1=1534382&r2=1534383&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/linearstore.cmake (original)
+++ qpid/branches/linearstore/qpid/cpp/src/linearstore.cmake Mon Oct 21 21:26:10 2013
@@ -78,11 +78,11 @@ if (BUILD_LINEARSTORE)
set (linear_jrnl_SOURCES
qpid/linearstore/jrnl/data_tok.cpp
qpid/linearstore/jrnl/deq_rec.cpp
- qpid/linearstore/jrnl/enq_map.cpp
- qpid/linearstore/jrnl/enq_rec.cpp
qpid/linearstore/jrnl/EmptyFilePool.cpp
qpid/linearstore/jrnl/EmptyFilePoolManager.cpp
qpid/linearstore/jrnl/EmptyFilePoolPartition.cpp
+ qpid/linearstore/jrnl/enq_map.cpp
+ qpid/linearstore/jrnl/enq_rec.cpp
qpid/linearstore/jrnl/jcntl.cpp
qpid/linearstore/jrnl/jdir.cpp
qpid/linearstore/jrnl/jerrno.cpp
@@ -105,12 +105,12 @@ if (BUILD_LINEARSTORE)
qpid/linearstore/BindingDbt.cpp
qpid/linearstore/BufferValue.cpp
qpid/linearstore/DataTokenImpl.cpp
- qpid/linearstore/EmptyFilePoolManagerImpl.cpp
qpid/linearstore/IdDbt.cpp
qpid/linearstore/IdSequence.cpp
qpid/linearstore/JournalImpl.cpp
qpid/linearstore/MessageStoreImpl.cpp
qpid/linearstore/PreparedTransaction.cpp
+ qpid/linearstore/JournalLogImpl.cpp
qpid/linearstore/TxnCtxt.cpp
)
Modified: qpid/branches/linearstore/qpid/cpp/src/qpid/broker/Broker.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/broker/Broker.cpp?rev=1534383&r1=1534382&r2=1534383&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/broker/Broker.cpp (original)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/broker/Broker.cpp Mon Oct 21 21:26:10 2013
@@ -998,7 +998,7 @@ Manageable::status_t Broker::queryQueue(
return Manageable::STATUS_UNKNOWN_OBJECT;
}
q->query( results );
- return Manageable::STATUS_OK;;
+ return Manageable::STATUS_OK;
}
Manageable::status_t Broker::getTimestampConfig(bool& receive,
Modified: qpid/branches/linearstore/qpid/cpp/src/qpid/broker/Queue.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/broker/Queue.cpp?rev=1534383&r1=1534382&r2=1534383&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/broker/Queue.cpp (original)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/broker/Queue.cpp Mon Oct 21 21:26:10 2013
@@ -835,7 +835,7 @@ bool Queue::checkAutoDelete(const Mutex:
bool Queue::isUnused(const Mutex::ScopedLock&) const
{
- return !owner && !users.isUsed();;
+ return !owner && !users.isUsed();
}
bool Queue::isEmpty(const Mutex::ScopedLock&) const
Modified: qpid/branches/linearstore/qpid/cpp/src/qpid/legacystore/jrnl/pmgr.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/legacystore/jrnl/pmgr.cpp?rev=1534383&r1=1534382&r2=1534383&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/legacystore/jrnl/pmgr.cpp (original)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/legacystore/jrnl/pmgr.cpp Mon Oct 21 21:26:10 2013
@@ -49,6 +49,7 @@ namespace journal
pmgr::page_cb::page_cb(u_int16_t index):
_index(index),
_state(UNUSED),
+ _frid(0),
_wdblks(0),
_rdblks(0),
_pdtokl(0),
Modified: qpid/branches/linearstore/qpid/cpp/src/qpid/legacystore/jrnl/rmgr.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/legacystore/jrnl/rmgr.cpp?rev=1534383&r1=1534382&r2=1534383&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/legacystore/jrnl/rmgr.cpp (original)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/legacystore/jrnl/rmgr.cpp Mon Oct 21 21:26:10 2013
@@ -227,6 +227,7 @@ rmgr::read(void** const datapp, std::siz
return RHM_IORES_EMPTY;
}
}
+ return RHM_IORES_SUCCESS;
}
int32_t
@@ -529,6 +530,7 @@ rmgr::skip(data_tok* dtokp)
return RHM_IORES_SUCCESS;
}
}
+ return RHM_IORES_SUCCESS;
}
iores
Modified: qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/JournalImpl.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/JournalImpl.cpp?rev=1534383&r1=1534382&r2=1534383&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/JournalImpl.cpp (original)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/JournalImpl.cpp Mon Oct 21 21:26:10 2013
@@ -21,20 +21,20 @@
#include "qpid/linearstore/JournalImpl.h"
+#include "qpid/linearstore/JournalLogImpl.h"
#include "qpid/linearstore/jrnl/jerrno.h"
#include "qpid/linearstore/jrnl/jexception.h"
#include "qpid/linearstore/jrnl/EmptyFilePool.h"
+#include "qpid/linearstore/StoreException.h"
#include "qpid/log/Statement.h"
#include "qpid/management/ManagementAgent.h"
-//#include "qmf/org/apache/qpid/linearstore/ArgsJournalExpand.h"
+#include "qpid/sys/Monitor.h"
+#include "qpid/sys/Timer.h"
+
#include "qmf/org/apache/qpid/linearstore/EventCreated.h"
#include "qmf/org/apache/qpid/linearstore/EventEnqThresholdExceeded.h"
#include "qmf/org/apache/qpid/linearstore/EventFull.h"
#include "qmf/org/apache/qpid/linearstore/EventRecovered.h"
-#include "qpid/sys/Monitor.h"
-#include "qpid/sys/Timer.h"
-#include "qpid/linearstore/QpidLog.h"
-#include "qpid/linearstore/StoreException.h"
using namespace qpid::qls_jrnl;
using namespace qpid::linearstore;
@@ -54,13 +54,14 @@ void GetEventsFireEvent::fire() { qpid::
JournalImpl::JournalImpl(qpid::sys::Timer& timer_,
const std::string& journalId,
const std::string& journalDirectory,
-// const std::string& journalBaseFilename,
+ JournalLogImpl& journalLogRef,
const qpid::sys::Duration getEventsTimeout,
const qpid::sys::Duration flushTimeout,
qpid::management::ManagementAgent* a,
DeleteCallback onDelete):
- jcntl(journalId, journalDirectory/*, journalBaseFilename*/),
+ jcntl(journalId, journalDirectory, journalLogRef),
timer(timer_),
+ _journalLogRef(journalLogRef),
getEventsTimerSetFlag(false),
// lastReadRid(0),
writeActivityFlag(false),
@@ -163,7 +164,7 @@ JournalImpl::initialize(qpid::qls_jrnl::
_mgmtObject->set_writePages(wcache_num_pages);
}
if (_agent != 0)
- _agent->raiseEvent(qmf::org::apache::qpid::linearstore::EventCreated(_jid, _jfsize_sblks * JRNL_SBLK_SIZE, _lpmgr.num_jfiles()),
+ _agent->raiseEvent::(qmf::org::apache::qpid::linearstore::EventCreated(_jid, _jfsize_sblks * JRNL_SBLK_SIZE, _lpmgr.num_jfiles()),
qpid::management::ManagementAgent::SEV_NOTE);
*/
}
@@ -558,11 +559,11 @@ JournalImpl::wr_aio_cb(std::vector<data_
switch (dtokp->wstate())
{
case data_tok::ENQ:
- std::cout << "<<<>>> JournalImpl::wr_aio_cb() ENQ dtokp rid=" << dtokp->rid() << std::endl << std::flush; // DEBUG
+//std::cout << "<<<>>> JournalImpl::wr_aio_cb() ENQ dtokp rid=0x" << std::hex << dtokp->rid() << std::dec << std::endl << std::flush; // DEBUG
dtokp->getSourceMessage()->enqueueComplete();
break;
case data_tok::DEQ:
- std::cout << "<<<>>> JournalImpl::wr_aio_cb() DEQ dtokp rid=" << dtokp->rid() << std::endl << std::flush; // DEBUG
+//std::cout << "<<<>>> JournalImpl::wr_aio_cb() DEQ dtokp rid=0x" << std::hex << dtokp->rid() << std::dec << std::endl << std::flush; // DEBUG
/* Don't need to signal until we have a way to ack completion of dequeue in AMQP
dtokp->getSourceMessage()->dequeueComplete();
if ( dtokp->getSourceMessage()->isDequeueComplete() ) // clear id after last dequeue
Modified: qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/JournalImpl.h
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/JournalImpl.h?rev=1534383&r1=1534382&r2=1534383&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/JournalImpl.h (original)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/JournalImpl.h Mon Oct 21 21:26:10 2013
@@ -48,6 +48,7 @@ class EmptyFilePool;
namespace linearstore{
class JournalImpl;
+class JournalLogImpl;
class InactivityFireEvent : public qpid::sys::TimerTask
{
@@ -78,8 +79,9 @@ class JournalImpl : public qpid::broker:
public:
typedef boost::function<void (JournalImpl&)> DeleteCallback;
- private:
+ protected:
qpid::sys::Timer& timer;
+ JournalLogImpl& _journalLogRef;
bool getEventsTimerSetFlag;
boost::intrusive_ptr<qpid::sys::TimerTask> getEventsFireEventsPtr;
qpid::sys::Mutex _getf_lock;
@@ -98,6 +100,7 @@ class JournalImpl : public qpid::broker:
JournalImpl(qpid::sys::Timer& timer,
const std::string& journalId,
const std::string& journalDirectory,
+ JournalLogImpl& journalLogRef,
const qpid::sys::Duration getEventsTimeout,
const qpid::sys::Duration flushTimeout,
qpid::management::ManagementAgent* agent,
@@ -187,7 +190,7 @@ class JournalImpl : public qpid::broker:
void resetDeleteCallback() { deleteCallback = DeleteCallback(); }
- private:
+ protected:
// void free_read_buffers();
void createStore();
@@ -215,10 +218,11 @@ class TplJournalImpl : public JournalImp
TplJournalImpl(qpid::sys::Timer& timer,
const std::string& journalId,
const std::string& journalDirectory,
+ JournalLogImpl& journalLogRef,
const qpid::sys::Duration getEventsTimeout,
const qpid::sys::Duration flushTimeout,
qpid::management::ManagementAgent* agent) :
- JournalImpl(timer, journalId, journalDirectory, getEventsTimeout, flushTimeout, agent)
+ JournalImpl(timer, journalId, journalDirectory, journalLogRef, getEventsTimeout, flushTimeout, agent)
{}
virtual ~TplJournalImpl() {}
Added: qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/JournalLogImpl.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/JournalLogImpl.cpp?rev=1534383&view=auto
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/JournalLogImpl.cpp (added)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/JournalLogImpl.cpp Mon Oct 21 21:26:10 2013
@@ -0,0 +1,59 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "qpid/linearstore/JournalLogImpl.h"
+
+namespace qpid {
+namespace linearstore {
+
+JournalLogImpl::JournalLogImpl(const qpid::qls_jrnl::JournalLog::log_level_t logLevelThreshold) : qpid::qls_jrnl::JournalLog(logLevelThreshold) {}
+JournalLogImpl::~JournalLogImpl() {}
+
+void
+JournalLogImpl::log(const qpid::qls_jrnl::JournalLog::log_level_t level,
+ const std::string& log_stmt) const {
+ switch (level) {
+ case LOG_CRITICAL: QPID_LOG(critical, "Linear Store: " << log_stmt); break;
+ case LOG_ERROR: QPID_LOG(error, "Linear Store: " << log_stmt); break;
+ case LOG_WARN: QPID_LOG(warning, "Linear Store: " << log_stmt); break;
+ case LOG_NOTICE: QPID_LOG(notice, "Linear Store: " << log_stmt); break;
+ case LOG_INFO: QPID_LOG(info, "Linear Store: " << log_stmt); break;
+ case LOG_DEBUG: QPID_LOG(debug, "Linear Store: " << log_stmt); break;
+ default: QPID_LOG(trace, "Linear Store: " << log_stmt);
+ }
+}
+
+void
+JournalLogImpl::log(const qpid::qls_jrnl::JournalLog::log_level_t level,
+ const std::string& jid,
+ const std::string& log_stmt) const {
+ switch (level) {
+ case LOG_CRITICAL: QPID_LOG(critical, "Linear Store: Journal \'" << jid << "\":" << log_stmt); break;
+ case LOG_ERROR: QPID_LOG(error, "Linear Store: Journal \'" << jid << "\":" << log_stmt); break;
+ case LOG_WARN: QPID_LOG(warning, "Linear Store: Journal \'" << jid << "\":" << log_stmt); break;
+ case LOG_NOTICE: QPID_LOG(notice, "Linear Store: Journal \'" << jid << "\":" << log_stmt); break;
+ case LOG_INFO: QPID_LOG(info, "Linear Store: Journal \'" << jid << "\":" << log_stmt); break;
+ case LOG_DEBUG: QPID_LOG(debug, "Linear Store: Journal \'" << jid << "\":" << log_stmt); break;
+ default: QPID_LOG(trace, "Linear Store: Journal \'" << jid << "\":" << log_stmt);
+ }
+}
+
+}} // namespace qpid::linearstore
Copied: qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/JournalLogImpl.h (from r1527754, qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/QpidLog.h)
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/JournalLogImpl.h?p2=qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/JournalLogImpl.h&p1=qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/QpidLog.h&r1=1527754&r2=1534383&rev=1534383&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/QpidLog.h (original)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/JournalLogImpl.h Mon Oct 21 21:26:10 2013
@@ -22,9 +22,27 @@
#ifndef QPID_LEGACYSTORE_LOG_H
#define QPID_LEGACYSTORE_LOG_H
+#include "qpid/linearstore/jrnl/JournalLog.h"
#include "qpid/log/Statement.h"
#define QLS_LOG(level, msg) QPID_LOG(level, "Linear Store: " << msg)
#define QLS_LOG2(level, queue, msg) QPID_LOG(level, "Linear Store: Journal \'" << queue << "\":" << msg)
+namespace qpid {
+namespace linearstore {
+
+class JournalLogImpl : public qpid::qls_jrnl::JournalLog
+{
+public:
+ JournalLogImpl(const qpid::qls_jrnl::JournalLog::log_level_t logLevelThreshold);
+ virtual ~JournalLogImpl();
+ virtual void log(const qpid::qls_jrnl::JournalLog::log_level_t logLevel,
+ const std::string& logStatement) const;
+ virtual void log(const qpid::qls_jrnl::JournalLog::log_level_t logLevel,
+ const std::string& journalId,
+ const std::string& logStatement) const;
+};
+
+}} // namespace qpid::linearstore
+
#endif // QPID_LEGACYSTORE_LOG_H
Modified: qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/MessageStoreImpl.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/MessageStoreImpl.cpp?rev=1534383&r1=1534382&r2=1534383&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/MessageStoreImpl.cpp (original)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/MessageStoreImpl.cpp Mon Oct 21 21:26:10 2013
@@ -28,7 +28,6 @@
#include "qpid/linearstore/IdDbt.h"
#include "qpid/linearstore/jrnl/EmptyFilePoolManager.h"
#include "qpid/linearstore/jrnl/txn_map.h"
-#include "qpid/linearstore/QpidLog.h"
#include "qpid/framing/FieldValue.h"
#include "qmf/org/apache/qpid/linearstore/Package.h"
#include "qpid/linearstore/StoreException.h"
@@ -73,6 +72,7 @@ MessageStoreImpl::MessageStoreImpl(qpid:
isInit(false),
envPath(envpath_),
broker(broker_),
+ jrnlLog(qpid::qls_jrnl::JournalLog::LOG_NOTICE),
mgmtObject(),
agent(0)
{}
@@ -83,7 +83,7 @@ uint32_t MessageStoreImpl::chkJrnlWrPage
if (p == 0) {
// For zero value, use default
- p = JRNL_WMGR_DEF_PAGE_SIZE_KIB;
+ p = QLS_WMGR_DEF_PAGE_SIZE_KIB;
QLS_LOG(warning, "parameter " << paramName_ << " (" << param_ << ") must be a power of 2 between 1 and 128; changing this parameter to default value (" << p << ")");
} else if ( p > 128 || (p & (p-1)) ) {
// For any positive value that is not a power of 2, use closest value
@@ -100,8 +100,8 @@ uint32_t MessageStoreImpl::chkJrnlWrPage
uint16_t MessageStoreImpl::getJrnlWrNumPages(const uint32_t wrPageSizeKib_)
{
- uint32_t wrPageSizeSblks = wrPageSizeKib_ / JRNL_SBLK_SIZE_KIB; // convert from KiB to number sblks
- uint32_t defTotWCacheSizeSblks = JRNL_WMGR_DEF_PAGE_SIZE_SBLKS * JRNL_WMGR_DEF_PAGES;
+ uint32_t wrPageSizeSblks = wrPageSizeKib_ / QLS_SBLK_SIZE_KIB; // convert from KiB to number sblks
+ uint32_t defTotWCacheSizeSblks = QLS_WMGR_DEF_PAGE_SIZE_SBLKS * QLS_WMGR_DEF_PAGES;
switch (wrPageSizeKib_)
{
case 1:
@@ -127,13 +127,13 @@ qpid::qls_jrnl::efpPartitionNumber_t Mes
qpid::qls_jrnl::efpDataSize_kib_t MessageStoreImpl::chkEfpFileSizeKiB(const qpid::qls_jrnl::efpDataSize_kib_t efpFileSizeKib_,
const std::string& paramName_) {
- uint8_t rem = efpFileSizeKib_ % uint64_t(JRNL_SBLK_SIZE_KIB);
+ uint8_t rem = efpFileSizeKib_ % uint64_t(QLS_SBLK_SIZE_KIB);
if (rem != 0) {
uint64_t newVal = efpFileSizeKib_ - rem;
- if (rem >= (JRNL_SBLK_SIZE_KIB / 2))
- newVal += JRNL_SBLK_SIZE_KIB;
+ if (rem >= (QLS_SBLK_SIZE_KIB / 2))
+ newVal += QLS_SBLK_SIZE_KIB;
QLS_LOG(warning, "Parameter " << paramName_ << " (" << efpFileSizeKib_ << ") must be a multiple of " <<
- JRNL_SBLK_SIZE_KIB << "; changing this parameter to the closest allowable value (" <<
+ QLS_SBLK_SIZE_KIB << "; changing this parameter to the closest allowable value (" <<
newVal << ")");
return newVal;
}
@@ -154,7 +154,7 @@ void MessageStoreImpl::initManagement ()
mgmtObject->set_location(storeDir);
mgmtObject->set_tplIsInitialized(false);
mgmtObject->set_tplDirectory(getTplBaseDir());
- mgmtObject->set_tplWritePageSize(tplWCachePgSizeSblks * JRNL_SBLK_SIZE_BYTES);
+ mgmtObject->set_tplWritePageSize(tplWCachePgSizeSblks * QLS_SBLK_SIZE_BYTES);
mgmtObject->set_tplWritePages(tplWCacheNumPages);
agent->addObject(mgmtObject, 0, true);
@@ -193,9 +193,9 @@ bool MessageStoreImpl::init(const std::s
// Set geometry members (converting to correct units where req'd)
defaultEfpPartitionNumber = efpPartition_;
defaultEfpFileSize_kib = efpFileSize_kib_;
- wCachePgSizeSblks = wCachePageSizeKib_ / JRNL_SBLK_SIZE_KIB; // convert from KiB to number sblks
+ wCachePgSizeSblks = wCachePageSizeKib_ / QLS_SBLK_SIZE_KIB; // convert from KiB to number sblks
wCacheNumPages = getJrnlWrNumPages(wCachePageSizeKib_);
- tplWCachePgSizeSblks = tplWCachePageSizeKib_ / JRNL_SBLK_SIZE_KIB; // convert from KiB to number sblks
+ tplWCachePgSizeSblks = tplWCachePageSizeKib_ / QLS_SBLK_SIZE_KIB; // convert from KiB to number sblks
tplWCacheNumPages = getJrnlWrNumPages(tplWCachePageSizeKib_);
if (storeDir_.size()>0) storeDir = storeDir_;
@@ -267,7 +267,7 @@ void MessageStoreImpl::init()
// NOTE: during normal initialization, agent == 0 because the store is initialized before the management infrastructure.
// However during a truncated initialization in a cluster, agent != 0. We always pass 0 as the agent for the
// TplStore to keep things consistent in a cluster. See https://bugzilla.redhat.com/show_bug.cgi?id=681026
- tplStorePtr.reset(new TplJournalImpl(broker->getTimer(), "TplStore", getTplBaseDir(), defJournalGetEventsTimeout, defJournalFlushTimeout, 0));
+ tplStorePtr.reset(new TplJournalImpl(broker->getTimer(), "TplStore", getTplBaseDir(), jrnlLog, defJournalGetEventsTimeout, defJournalFlushTimeout, 0));
isInit = true;
} catch (const DbException& e) {
if (e.get_errno() == DB_VERSION_MISMATCH)
@@ -291,7 +291,7 @@ void MessageStoreImpl::init()
}
} while (!isInit);
- efpMgr.reset(new EmptyFilePoolManagerImpl(getStoreTopLevelDir()));
+ efpMgr.reset(new qpid::qls_jrnl::EmptyFilePoolManager(getStoreTopLevelDir(), jrnlLog));
efpMgr->findEfpPartitions();
}
@@ -403,7 +403,7 @@ void MessageStoreImpl::create(qpid::brok
return;
}
- jQueue = new JournalImpl(broker->getTimer(), queue_.getName(), getJrnlDir(queue_.getName()),
+ jQueue = new JournalImpl(broker->getTimer(), queue_.getName(), getJrnlDir(queue_.getName()), jrnlLog,
defJournalGetEventsTimeout, defJournalFlushTimeout, agent,
boost::bind(&MessageStoreImpl::journalDeleted, this, _1));
{
@@ -711,8 +711,8 @@ void MessageStoreImpl::recoverQueues(Txn
QLS_LOG(error, "Cannot recover empty (null) queue name - ignoring and attempting to continue.");
break;
}
- jQueue = new JournalImpl(broker->getTimer(), queueName, getJrnlDir(queueName), defJournalGetEventsTimeout,
- defJournalFlushTimeout, agent,
+ jQueue = new JournalImpl(broker->getTimer(), queueName, getJrnlDir(queueName),jrnlLog,
+ defJournalGetEventsTimeout, defJournalFlushTimeout, agent,
boost::bind(&MessageStoreImpl::journalDeleted, this, _1));
{
qpid::sys::Mutex::ScopedLock sl(journalListLock);
@@ -1207,7 +1207,7 @@ void MessageStoreImpl::loadContent(const
void MessageStoreImpl::flush(const qpid::broker::PersistableQueue& queue_)
{
- QLS_LOG(info, "*** MessageStoreImpl::flush() queue=\"" << queue_.getName() << "\"");
+// QLS_LOG(info, "*** MessageStoreImpl::flush() queue=\"" << queue_.getName() << "\"");
if (queue_.getExternalQueueStore() == 0) return;
checkInit();
std::string qn = queue_.getName();
Modified: qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/MessageStoreImpl.h
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/MessageStoreImpl.h?rev=1534383&r1=1534382&r2=1534383&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/MessageStoreImpl.h (original)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/MessageStoreImpl.h Mon Oct 21 21:26:10 2013
@@ -22,14 +22,15 @@
#ifndef QPID_LEGACYSTORE_MESSAGESTOREIMPL_H
#define QPID_LEGACYSTORE_MESSAGESTOREIMPL_H
+#include <iomanip>
#include <string>
#include "db-inc.h"
#include "qpid/linearstore/Cursor.h"
-#include "qpid/linearstore/EmptyFilePoolManagerImpl.h"
#include "qpid/linearstore/IdDbt.h"
#include "qpid/linearstore/IdSequence.h"
#include "qpid/linearstore/JournalImpl.h"
+#include "qpid/linearstore/JournalLogImpl.h"
#include "qpid/linearstore/jrnl/jcfg.h"
#include "qpid/linearstore/jrnl/EmptyFilePoolTypes.h"
#include "qpid/linearstore/PreparedTransaction.h"
@@ -101,10 +102,10 @@ class MessageStoreImpl : public qpid::br
// Default store settings
static const bool defTruncateFlag = false;
- static const uint32_t defWCachePageSizeKib = JRNL_WMGR_DEF_PAGE_SIZE_KIB;
+ static const uint32_t defWCachePageSizeKib = QLS_WMGR_DEF_PAGE_SIZE_KIB;
static const uint32_t defTplWCachePageSizeKib = defWCachePageSizeKib / 8;
static const uint16_t defEfpPartition = 1;
- static const uint64_t defEfpFileSizeKib = 512 * JRNL_SBLK_SIZE_KIB;
+ static const uint64_t defEfpFileSizeKib = 512 * QLS_SBLK_SIZE_KIB;
static const std::string storeTopLevelDir;
static qpid::sys::Duration defJournalGetEventsTimeout;
@@ -143,7 +144,8 @@ class MessageStoreImpl : public qpid::br
bool isInit;
const char* envPath;
qpid::broker::Broker* broker;
- boost::shared_ptr<EmptyFilePoolManagerImpl> efpMgr;
+ JournalLogImpl jrnlLog;
+ boost::shared_ptr<qpid::qls_jrnl::EmptyFilePoolManager> efpMgr;
qmf::org::apache::qpid::linearstore::Store::shared_ptr mgmtObject;
qpid::management::ManagementAgent* agent;
Modified: qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/StorePlugin.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/StorePlugin.cpp?rev=1534383&r1=1534382&r2=1534383&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/StorePlugin.cpp (original)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/StorePlugin.cpp Mon Oct 21 21:26:10 2013
@@ -23,7 +23,7 @@
#include "qpid/Plugin.h"
#include "qpid/Options.h"
#include "qpid/DataDir.h"
-#include "qpid/linearstore/QpidLog.h"
+#include "qpid/linearstore/JournalLogImpl.h"
#include "qpid/linearstore/MessageStoreImpl.h"
using qpid::linearstore::MessageStoreImpl;
Modified: qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/AtomicCounter.h
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/AtomicCounter.h?rev=1534383&r1=1534382&r2=1534383&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/AtomicCounter.h (original)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/AtomicCounter.h Mon Oct 21 21:26:10 2013
@@ -31,105 +31,93 @@ template <class T>
class AtomicCounter
{
private:
- T count;
+ T count_;
mutable smutex countMutex;
public:
- AtomicCounter(const T& initValue = T(0)) : count(initValue) {}
+ AtomicCounter(const T& initValue = T(0)) : count_(initValue) {}
virtual ~AtomicCounter() {}
T get() const {
slock l(countMutex);
- return count;
+ return count_;
}
T increment() {
slock l(countMutex);
- return ++count;
+ return ++count_;
}
T add(const T& a) {
slock l(countMutex);
- count += a;
- return count;
+ count_ += a;
+ return count_;
}
T addLimit(const T& a, const T& limit, const uint32_t jerr) {
slock l(countMutex);
- if (count + a > limit) throw jexception(jerr, "AtomicCounter", "addLimit");
- count += a;
- return count;
+ if (count_ + a > limit) throw jexception(jerr, "AtomicCounter", "addLimit");
+ count_ += a;
+ return count_;
}
T decrement() {
slock l(countMutex);
- return --count;
+ return --count_;
}
T decrementLimit(const T& limit = T(0), const uint32_t jerr = jerrno::JERR__UNDERFLOW) {
slock l(countMutex);
- if (count < limit + 1) {
+ if (count_ < limit + 1) {
throw jexception(jerr, "AtomicCounter", "decrementLimit");
}
- return --count;
+ return --count_;
}
T subtract(const T& s) {
slock l(countMutex);
- count -= s;
- return count;
+ count_ -= s;
+ return count_;
}
T subtractLimit(const T& s, const T& limit = T(0), const uint32_t jerr = jerrno::JERR__UNDERFLOW) {
slock l(countMutex);
- if (count < limit + s) throw jexception(jerr, "AtomicCounter", "subtractLimit");
- count -= s;
- return count;
+ if (count_ < limit + s) throw jexception(jerr, "AtomicCounter", "subtractLimit");
+ count_ -= s;
+ return count_;
}
bool operator==(const T& o) const {
slock l(countMutex);
- return count == o;
+ return count_ == o;
}
bool operator<(const T& o) const {
slock l(countMutex);
- return count < o;
+ return count_ < o;
}
bool operator<=(const T& o) const {
slock l(countMutex);
- return count <= o;
+ return count_ <= o;
}
friend T operator-(const T& a, const AtomicCounter& b) {
slock l(b.countMutex);
- return a - b.count;
+ return a - b.count_;
}
friend T operator-(const AtomicCounter& a, const T& b) {
slock l(a.countMutex);
- return a.count - b;
+ return a.count_ - b;
}
friend T operator-(const AtomicCounter&a, const AtomicCounter& b) {
slock l1(a.countMutex);
slock l2(b.countMutex);
- return a.count - b.count;
+ return a.count_ - b.count_;
}
-
-/*
- friend std::ostream& operator<<(std::ostream& out, const AtomicCounter& a) {
- T temp; // Use temp so lock is not held while streaming to out.
- {
- slock l(a.countMutex);
- temp = a.count;
- }
- out << temp;
- return out;
- }
-*/
};
}} // namespace qpid::qls_jrnl
Modified: qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/EmptyFilePool.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/EmptyFilePool.cpp?rev=1534383&r1=1534382&r2=1534383&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/EmptyFilePool.cpp (original)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/EmptyFilePool.cpp Mon Oct 21 21:26:10 2013
@@ -23,229 +23,274 @@
#include <cctype>
#include <fstream>
+#include "qpid/linearstore/jrnl/EmptyFilePoolPartition.h"
#include "qpid/linearstore/jrnl/jcfg.h"
#include "qpid/linearstore/jrnl/jdir.h"
#include "qpid/linearstore/jrnl/JournalFile.h"
+#include "qpid/linearstore/jrnl/JournalLog.h"
#include "qpid/linearstore/jrnl/slock.h"
#include "qpid/linearstore/jrnl/utils/file_hdr.h"
#include <sys/stat.h>
+#include <unistd.h>
#include <uuid/uuid.h>
#include <vector>
-#include <iostream> // DEBUG
-
namespace qpid {
namespace qls_jrnl {
-EmptyFilePool::EmptyFilePool(const std::string& efpDirectory_,
- const EmptyFilePoolPartition* partitionPtr_) :
- efpDirectory(efpDirectory_),
- efpDataSize_kib(fileSizeKbFromDirName(efpDirectory_, partitionPtr_->partitionNumber())),
- partitionPtr(partitionPtr_)
+EmptyFilePool::EmptyFilePool(const std::string& efpDirectory,
+ const EmptyFilePoolPartition* partitionPtr,
+ JournalLog& journalLogRef) :
+ efpDirectory_(efpDirectory),
+ efpDataSize_kib_(fileSizeKbFromDirName(efpDirectory, partitionPtr->getPartitionNumber())),
+ partitionPtr_(partitionPtr),
+ journalLogRef_(journalLogRef)
{}
EmptyFilePool::~EmptyFilePool() {}
-void
-EmptyFilePool::initialize() {
- //std::cout << "Reading " << efpDirectory << std::endl; // DEBUG
+void EmptyFilePool::initialize() {
std::vector<std::string> dirList;
- jdir::read_dir(efpDirectory, dirList, false, true, false, false);
+ jdir::read_dir(efpDirectory_, dirList, false, true, false, false);
for (std::vector<std::string>::iterator i = dirList.begin(); i != dirList.end(); ++i) {
size_t dotPos = i->rfind(".");
if (dotPos != std::string::npos) {
if (i->substr(dotPos).compare(".jrnl") == 0 && i->length() == 41) {
- std::string emptyFile(efpDirectory + "/" + (*i));
+ std::string emptyFile(efpDirectory_ + "/" + (*i));
if (validateEmptyFile(emptyFile)) {
pushEmptyFile(emptyFile);
}
}
}
}
- //std::cout << "Found " << emptyFileList.size() << " files" << std::endl; // DEBUG
}
-efpDataSize_kib_t
-EmptyFilePool::dataSize_kib() const {
- return efpDataSize_kib;
+efpDataSize_kib_t EmptyFilePool::dataSize_kib() const {
+ return efpDataSize_kib_;
}
-efpFileSize_kib_t
-EmptyFilePool::fileSize_kib() const {
- return efpDataSize_kib + (QLS_JRNL_FHDR_RES_SIZE_SBLKS * JRNL_SBLK_SIZE_KIB);
+efpFileSize_kib_t EmptyFilePool::fileSize_kib() const {
+ return efpDataSize_kib_ + (QLS_JRNL_FHDR_RES_SIZE_SBLKS * QLS_SBLK_SIZE_KIB);
}
-efpDataSize_sblks_t
-EmptyFilePool::dataSize_sblks() const {
- return efpDataSize_kib / JRNL_SBLK_SIZE_KIB;
+efpDataSize_sblks_t EmptyFilePool::dataSize_sblks() const {
+ return efpDataSize_kib_ / QLS_SBLK_SIZE_KIB;
}
-efpFileSize_sblks_t
-EmptyFilePool::fileSize_sblks() const {
- return (efpDataSize_kib / JRNL_SBLK_SIZE_KIB) + QLS_JRNL_FHDR_RES_SIZE_SBLKS;
+efpFileSize_sblks_t EmptyFilePool::fileSize_sblks() const {
+ return (efpDataSize_kib_ / QLS_SBLK_SIZE_KIB) + QLS_JRNL_FHDR_RES_SIZE_SBLKS;
}
-efpFileCount_t
-EmptyFilePool::numEmptyFiles() const {
- slock l(emptyFileListMutex);
- return efpFileCount_t(emptyFileList.size());
+efpFileCount_t EmptyFilePool::numEmptyFiles() const {
+ slock l(emptyFileListMutex_);
+ return efpFileCount_t(emptyFileList_.size());
}
-efpDataSize_kib_t
-EmptyFilePool::cumFileSize_kib() const {
- slock l(emptyFileListMutex);
- return efpDataSize_kib_t(emptyFileList.size()) * efpDataSize_kib;
+efpDataSize_kib_t EmptyFilePool::cumFileSize_kib() const {
+ slock l(emptyFileListMutex_);
+ return efpDataSize_kib_t(emptyFileList_.size()) * efpDataSize_kib_;
}
-efpPartitionNumber_t
-EmptyFilePool::getPartitionNumber() const {
- return partitionPtr->partitionNumber();
+efpPartitionNumber_t EmptyFilePool::getPartitionNumber() const {
+ return partitionPtr_->getPartitionNumber();
}
-const EmptyFilePoolPartition*
-EmptyFilePool::getPartition() const {
- return partitionPtr;
+const EmptyFilePoolPartition* EmptyFilePool::getPartition() const {
+ return partitionPtr_;
}
-const efpIdentity_t
-EmptyFilePool::getIdentity() const {
- return efpIdentity_t(partitionPtr->partitionNumber(), efpDataSize_kib);
+const efpIdentity_t EmptyFilePool::getIdentity() const {
+ return efpIdentity_t(partitionPtr_->getPartitionNumber(), efpDataSize_kib_);
}
-std::string
-EmptyFilePool::takeEmptyFile(const std::string& destDirectory) {
+std::string EmptyFilePool::takeEmptyFile(const std::string& destDirectory) {
std::string emptyFileName = popEmptyFile();
std::string newFileName = destDirectory + emptyFileName.substr(emptyFileName.rfind('/')); // NOTE: substr() includes leading '/'
- if (::rename(emptyFileName.c_str(), newFileName.c_str())) {
- pushEmptyFile(emptyFileName);
- std::ostringstream oss;
- oss << "file=\"" << emptyFileName << "\" dest=\"" << newFileName << "\"" << FORMAT_SYSERR(errno);
- throw jexception(jerrno::JERR_JDIR_FMOVE, oss.str(), "EmptyFilePool", "takeEmptyFile");
+ if (moveEmptyFile(emptyFileName.c_str(), newFileName.c_str())) {
+ // Try again with new UUID for file name
+ newFileName = destDirectory + "/" + getEfpFileName();
+ if (moveEmptyFile(emptyFileName.c_str(), newFileName.c_str())) {
+ pushEmptyFile(emptyFileName);
+ std::ostringstream oss;
+ oss << "file=\"" << emptyFileName << "\" dest=\"" << newFileName << "\"" << FORMAT_SYSERR(errno);
+ throw jexception(jerrno::JERR_JDIR_FMOVE, oss.str(), "EmptyFilePool", "takeEmptyFile");
+ }
}
return newFileName;
}
-bool
-EmptyFilePool::returnEmptyFile(const JournalFile* srcFile) {
- std::string emptyFileName(efpDirectory + srcFile->getFileName());
- // TODO: reset file here
- if (::rename(srcFile->getFqFileName().c_str(), emptyFileName.c_str())) {
- std::ostringstream oss;
- oss << "file=\"" << srcFile << "\" dest=\"" << emptyFileName << "\"" << FORMAT_SYSERR(errno);
- throw jexception(jerrno::JERR_JDIR_FMOVE, oss.str(), "EmptyFilePool", "returnEmptyFile");
+void EmptyFilePool::returnEmptyFile(const std::string& fqSrcFile) {
+ std::string emptyFileName(efpDirectory_ + fqSrcFile.substr(fqSrcFile.rfind('/'))); // NOTE: substr() includes leading '/'
+ if (moveEmptyFile(fqSrcFile.c_str(), emptyFileName.c_str())) {
+ // Try again with new UUID for file name
+ emptyFileName = efpDirectory_ + "/" + getEfpFileName();
+ if (moveEmptyFile(fqSrcFile.c_str(), emptyFileName.c_str())) {
+ // Failed twice in a row - delete file
+ ::unlink(fqSrcFile.c_str());
+ return;
+ }
}
+ resetEmptyFileHeader(emptyFileName);
pushEmptyFile(emptyFileName);
- return true;
}
-// protected
+// --- protected functions ---
+
+void EmptyFilePool::createEmptyFile() {
+ ::file_hdr_t fh;
+ ::file_hdr_create(&fh, QLS_FILE_MAGIC, QLS_JRNL_VERSION, QLS_JRNL_FHDR_RES_SIZE_SBLKS, partitionPtr_->getPartitionNumber(), efpDataSize_kib_);
+ std::string efpfn = getEfpFileName();
+ std::ofstream ofs(efpfn.c_str(), std::ofstream::out | std::ofstream::binary);
+ if (ofs.good()) {
+ ofs.write((char*)&fh, sizeof(::file_hdr_t));
+ uint64_t rem = ((efpDataSize_kib_ + (QLS_JRNL_FHDR_RES_SIZE_SBLKS * QLS_SBLK_SIZE_KIB)) * 1024) - sizeof(::file_hdr_t);
+ while (rem--)
+ ofs.put('\0');
+ ofs.close();
+ pushEmptyFile(efpfn);
+//std::cout << "WARNING: EFP " << efpDirectory << " is empty - created new journal file " << efpfn.substr(efpfn.rfind('/') + 1) << " on the fly" << std::endl;
+ } else {
+//std::cerr << "ERROR: Unable to open file \"" << efpfn << "\"" << std::endl; // DEBUG
+ }
+}
-void
-EmptyFilePool::pushEmptyFile(const std::string fqFileName_) {
- slock l(emptyFileListMutex);
- emptyFileList.push_back(fqFileName_);
+std::string EmptyFilePool::getEfpFileName() {
+ uuid_t uuid;
+ ::uuid_generate(uuid); // NOTE: uuid_generate() is not thread safe
+ char uuid_str[37]; // 36 char uuid + trailing \0
+ ::uuid_unparse(uuid, uuid_str);
+ std::ostringstream oss;
+ oss << efpDirectory_ << "/" << uuid_str << QLS_JRNL_FILE_EXTENSION;
+ return oss.str();
}
-std::string
-EmptyFilePool::popEmptyFile() {
+std::string EmptyFilePool::popEmptyFile() {
std::string emptyFileName;
bool isEmpty = false;
{
- slock l(emptyFileListMutex);
- isEmpty = emptyFileList.empty();
+ slock l(emptyFileListMutex_);
+ isEmpty = emptyFileList_.empty();
}
if (isEmpty) {
createEmptyFile();
}
{
- slock l(emptyFileListMutex);
- emptyFileName = emptyFileList.front();
- emptyFileList.pop_front();
+ slock l(emptyFileListMutex_);
+ emptyFileName = emptyFileList_.front();
+ emptyFileList_.pop_front();
}
return emptyFileName;
}
-void
-EmptyFilePool::createEmptyFile() {
- ::file_hdr_t fh;
- ::file_hdr_create(&fh, QLS_FILE_MAGIC, QLS_JRNL_VERSION, QLS_JRNL_FHDR_RES_SIZE_SBLKS, partitionPtr->partitionNumber(), efpDataSize_kib);
- std::string efpfn = getEfpFileName();
- std::ofstream ofs(efpfn.c_str(), std::ofstream::out | std::ofstream::binary);
- if (ofs.good()) {
- ofs.write((char*)&fh, sizeof(::file_hdr_t));
- uint64_t rem = ((efpDataSize_kib + (QLS_JRNL_FHDR_RES_SIZE_SBLKS * JRNL_SBLK_SIZE_KIB)) * 1024) - sizeof(::file_hdr_t);
- while (rem--)
- ofs.put('\0');
- ofs.close();
- pushEmptyFile(efpfn);
- std::cout << "WARNING: EFP " << efpDirectory << " is empty - created new journal file " <<
- efpfn.substr(efpfn.rfind('/') + 1) << " on the fly" << std::endl;
+void EmptyFilePool::pushEmptyFile(const std::string fqFileName) {
+ slock l(emptyFileListMutex_);
+ emptyFileList_.push_back(fqFileName);
+}
+
+void EmptyFilePool::resetEmptyFileHeader(const std::string& fqFileName) {
+ std::fstream fs(fqFileName.c_str(), std::fstream::in | std::fstream::out | std::fstream::binary);
+ if (fs.good()) {
+ const std::streamsize buffsize = QLS_JRNL_FHDR_RES_SIZE_SBLKS * QLS_SBLK_SIZE_BYTES;
+ char buff[buffsize];
+ fs.read((char*)buff, buffsize);
+ std::streampos bytesRead = fs.tellg();
+ if (bytesRead == buffsize) {
+ ::file_hdr_reset((::file_hdr_t*)buff);
+ ::memset(buff + sizeof(::file_hdr_t), 0, MAX_FILE_HDR_LEN - sizeof(::file_hdr_t)); // set rest of buffer to 0
+ fs.seekp(0, std::fstream::beg);
+ fs.write(buff, buffsize);
+ std::streampos bytesWritten = fs.tellp();
+ if (bytesWritten != buffsize) {
+//std::cerr << "ERROR: Unable to write file header of file \"" << fqFileName_ << "\": tried to write " << buffsize << " bytes; wrote " << bytesWritten << " bytes." << std::endl;
+ }
+ } else {
+//std::cerr << "ERROR: Unable to read file header of file \"" << fqFileName_ << "\": tried to read " << sizeof(::file_hdr_t) << " bytes; read " << bytesRead << " bytes." << std::endl;
+ }
+ fs.close();
} else {
- std::cerr << "ERROR: Unable to open file \"" << efpfn << "\"" << std::endl; // DEBUG
+//std::cerr << "ERROR: Unable to open file \"" << fqFileName_ << "\" for reading" << std::endl; // DEBUG
}
}
-bool
-EmptyFilePool::validateEmptyFile(const std::string& emptyFileName_) const {
+bool EmptyFilePool::validateEmptyFile(const std::string& emptyFileName) const {
+ std::ostringstream oss;
struct stat s;
- if (::stat(emptyFileName_.c_str(), &s))
+ if (::stat(emptyFileName.c_str(), &s))
{
- std::ostringstream oss;
- oss << "stat: file=\"" << emptyFileName_ << "\"" << FORMAT_SYSERR(errno);
+ oss << "stat: file=\"" << emptyFileName << "\"" << FORMAT_SYSERR(errno);
throw jexception(jerrno::JERR_JDIR_STAT, oss.str(), "EmptyFilePool", "validateEmptyFile");
}
- efpDataSize_kib_t expectedSize = (JRNL_SBLK_SIZE_KIB + efpDataSize_kib) * 1024;
+
+ // Size matches pool
+ efpDataSize_kib_t expectedSize = (QLS_SBLK_SIZE_KIB + efpDataSize_kib_) * 1024;
if ((efpDataSize_kib_t)s.st_size != expectedSize) {
- //std::cout << "ERROR: File " << emptyFileName << ": Incorrect size: Expected=" << expectedSize << "; actual=" << s.st_size << std::endl; // DEBUG
+ oss << "ERROR: File " << emptyFileName << ": Incorrect size: Expected=" << expectedSize << "; actual=" << s.st_size;
+ journalLogRef_.log(JournalLog::LOG_ERROR, oss.str());
return false;
}
- std::ifstream ifs(emptyFileName_.c_str(), std::ifstream::in | std::ifstream::binary);
- if (!ifs) {
- //std::cout << "ERROR: File " << emptyFileName << ": Unable to open for reading" << std::endl;
+ // Open file and read header
+ std::fstream fs(emptyFileName.c_str(), std::fstream::in | std::fstream::out | std::fstream::binary);
+ if (!fs) {
+ oss << "ERROR: File " << emptyFileName << ": Unable to open for reading";
+ journalLogRef_.log(JournalLog::LOG_ERROR, oss.str());
+ return false;
+ }
+ const std::streamsize buffsize = QLS_JRNL_FHDR_RES_SIZE_SBLKS * QLS_SBLK_SIZE_BYTES;
+ char buff[buffsize];
+ fs.read((char*)buff, buffsize);
+ std::streampos bytesRead = fs.tellg();
+ if (bytesRead != buffsize) {
+ oss << "ERROR: Unable to read file header of file \"" << emptyFileName << "\": tried to read " << buffsize << " bytes; read " << bytesRead << " bytes";
+ journalLogRef_.log(JournalLog::LOG_ERROR, oss.str());
+ fs.close();
return false;
}
- const uint8_t fhFileNameBuffLen = 50;
- char fhFileNameBuff[fhFileNameBuffLen];
- ::file_hdr_t fh;
- ifs.read((char*)&fh, sizeof(::file_hdr_t));
- uint16_t fhFileNameLen = fh._queue_name_len > fhFileNameBuffLen ? fhFileNameBuffLen : fh._queue_name_len;
- ifs.read(fhFileNameBuff, fhFileNameLen);
- std::string fhFileName(fhFileNameBuff, fhFileNameLen);
- ifs.close();
-
- if (fh._rhdr._magic != QLS_FILE_MAGIC ||
- fh._rhdr._version != QLS_JRNL_VERSION ||
- fh._efp_partition != partitionPtr->partitionNumber() ||
- fh._file_size_kib != efpDataSize_kib ||
- !::is_file_hdr_reset(&fh))
+ // Check file header
+ const bool jrnlMagicError = ((::file_hdr_t*)buff)->_rhdr._magic != QLS_FILE_MAGIC;
+ const bool jrnlVersionError = ((::file_hdr_t*)buff)->_rhdr._version != QLS_JRNL_VERSION;
+ const bool jrnlPartitionError = ((::file_hdr_t*)buff)->_efp_partition != partitionPtr_->getPartitionNumber();
+ const bool jrnlFileSizeError = ((::file_hdr_t*)buff)->_file_size_kib != efpDataSize_kib_;
+ if (jrnlMagicError || jrnlVersionError || jrnlPartitionError || jrnlFileSizeError)
{
- //std::cout << "ERROR: File " << emptyFileName << ": Invalid file header" << std::endl;
+ oss << "ERROR: File " << emptyFileName << ": Invalid file header - mismatched header fields: " <<
+ (jrnlMagicError ? "magic " : "") <<
+ (jrnlVersionError ? "version " : "") <<
+ (jrnlPartitionError ? "partition" : "") <<
+ (jrnlFileSizeError ? "file-size" : "");
+ journalLogRef_.log(JournalLog::LOG_ERROR, oss.str());
+ fs.close();
return false;
}
- return true;
-}
+ // Check file header is reset
+ if (!::is_file_hdr_reset((::file_hdr_t*)buff)) {
+ ::file_hdr_reset((::file_hdr_t*)buff);
+ ::memset(buff + sizeof(::file_hdr_t), 0, MAX_FILE_HDR_LEN - sizeof(::file_hdr_t)); // set rest of buffer to 0
+ fs.seekp(0, std::fstream::beg);
+ fs.write(buff, buffsize);
+ std::streampos bytesWritten = fs.tellp();
+ if (bytesWritten != buffsize) {
+ oss << "ERROR: Unable to write file header of file \"" << emptyFileName << "\": tried to write " << buffsize << " bytes; wrote " << bytesWritten << " bytes";
+ journalLogRef_.log(JournalLog::LOG_ERROR, oss.str());
+ fs.close();
+ return false;
+ }
+ oss << "WARNING: File " << emptyFileName << ": File header not reset";
+ journalLogRef_.log(JournalLog::LOG_WARN, oss.str());
+ }
-std::string
-EmptyFilePool::getEfpFileName() {
- uuid_t uuid;
- ::uuid_generate(uuid); // NOTE: NOT THREAD SAFE
- char uuid_str[37]; // 36 char uuid + trailing \0
- ::uuid_unparse(uuid, uuid_str);
- std::ostringstream oss;
- oss << efpDirectory << "/" << uuid_str << QLS_JRNL_FILE_EXTENSION;
- return oss.str();
+ // Close file
+ fs.close();
+ return true;
}
-// protected
// static
-efpDataSize_kib_t
-EmptyFilePool::fileSizeKbFromDirName(const std::string& dirName_,
- const efpPartitionNumber_t partitionNumber_) {
+efpDataSize_kib_t EmptyFilePool::fileSizeKbFromDirName(const std::string& dirName,
+ const efpPartitionNumber_t partitionNumber) {
// Check for dirName format 'NNNk', where NNN is a number, convert NNN into an integer. NNN cannot be 0.
- std::string n(dirName_.substr(dirName_.rfind('/')+1));
+ std::string n(dirName.substr(dirName.rfind('/')+1));
bool valid = true;
for (uint16_t charNum = 0; charNum < n.length(); ++charNum) {
if (charNum < n.length()-1) {
@@ -258,12 +303,24 @@ EmptyFilePool::fileSizeKbFromDirName(con
}
}
efpDataSize_kib_t s = ::atol(n.c_str());
- if (!valid || s == 0 || s % JRNL_SBLK_SIZE_KIB != 0) {
+ if (!valid || s == 0 || s % QLS_SBLK_SIZE_KIB != 0) {
std::ostringstream oss;
- oss << "Partition: " << partitionNumber_ << "; EFP directory: \'" << n << "\'";
+ oss << "Partition: " << partitionNumber << "; EFP directory: \'" << n << "\'";
throw jexception(jerrno::JERR_EFP_BADEFPDIRNAME, oss.str(), "EmptyFilePool", "fileSizeKbFromDirName");
}
return s;
}
+// static
+int EmptyFilePool::moveEmptyFile(const std::string& from,
+ const std::string& to) {
+ if (::rename(from.c_str(), to.c_str())) {
+ if (errno == EEXIST) return errno; // File name exists
+ std::ostringstream oss;
+ oss << "file=\"" << from << "\" dest=\"" << to << "\"" << FORMAT_SYSERR(errno);
+ throw jexception(jerrno::JERR_JDIR_FMOVE, oss.str(), "EmptyFilePool", "returnEmptyFile");
+ }
+ return 0;
+}
+
}} // namespace qpid::qls_jrnl
Modified: qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/EmptyFilePool.h
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/EmptyFilePool.h?rev=1534383&r1=1534382&r2=1534383&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/EmptyFilePool.h (original)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/EmptyFilePool.h Mon Oct 21 21:26:10 2013
@@ -30,15 +30,16 @@ namespace qls_jrnl {
}} // namespace qpid::qls_jrnl
#include <deque>
-#include "qpid/linearstore/jrnl/EmptyFilePoolPartition.h"
#include "qpid/linearstore/jrnl/EmptyFilePoolTypes.h"
#include "qpid/linearstore/jrnl/smutex.h"
#include <string>
namespace qpid {
namespace qls_jrnl {
+class EmptyFilePoolPartition;
class jdir;
class JournalFile;
+class JournalLog;
class EmptyFilePool
{
@@ -46,17 +47,19 @@ protected:
typedef std::deque<std::string> emptyFileList_t;
typedef emptyFileList_t::iterator emptyFileListItr_t;
- const std::string efpDirectory;
- const efpDataSize_kib_t efpDataSize_kib;
- const EmptyFilePoolPartition* partitionPtr;
+ const std::string efpDirectory_;
+ const efpDataSize_kib_t efpDataSize_kib_;
+ const EmptyFilePoolPartition* partitionPtr_;
+ JournalLog& journalLogRef_;
private:
- emptyFileList_t emptyFileList;
- smutex emptyFileListMutex;
+ emptyFileList_t emptyFileList_;
+ smutex emptyFileListMutex_;
public:
- EmptyFilePool(const std::string& efpDirectory_,
- const EmptyFilePoolPartition* partitionPtr_);
+ EmptyFilePool(const std::string& efpDirectory,
+ const EmptyFilePoolPartition* partitionPtr,
+ JournalLog& journalLogRef);
virtual ~EmptyFilePool();
void initialize();
@@ -70,17 +73,21 @@ public:
const EmptyFilePoolPartition* getPartition() const;
const efpIdentity_t getIdentity() const;
- std::string takeEmptyFile(const std::string& destDirectory_);
- bool returnEmptyFile(const JournalFile* srcFile_);
+ std::string takeEmptyFile(const std::string& destDirectory);
+ void returnEmptyFile(const std::string& srcFile);
protected:
- void pushEmptyFile(const std::string fqFileName_);
- std::string popEmptyFile();
void createEmptyFile();
- bool validateEmptyFile(const std::string& emptyFileName_) const;
std::string getEfpFileName();
- static efpDataSize_kib_t fileSizeKbFromDirName(const std::string& dirName_,
- const efpPartitionNumber_t partitionNumber_);
+ std::string popEmptyFile();
+ void pushEmptyFile(const std::string fqFileName);
+ void resetEmptyFileHeader(const std::string& fqFileName);
+ bool validateEmptyFile(const std::string& emptyFileName) const;
+
+ static efpDataSize_kib_t fileSizeKbFromDirName(const std::string& dirName,
+ const efpPartitionNumber_t partitionNumber);
+ static int moveEmptyFile(const std::string& fromFqPath,
+ const std::string& toFqPath);
};
}} // namespace qpid::qls_jrnl
Modified: qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/EmptyFilePoolManager.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/EmptyFilePoolManager.cpp?rev=1534383&r1=1534382&r2=1534383&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/EmptyFilePoolManager.cpp (original)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/EmptyFilePoolManager.cpp Mon Oct 21 21:26:10 2013
@@ -27,39 +27,37 @@
#include "qpid/linearstore/jrnl/slock.h"
#include <vector>
-// DEBUG
-//#include <iostream>
-
namespace qpid {
namespace qls_jrnl {
-EmptyFilePoolManager::EmptyFilePoolManager(const std::string& qlsStorePath_) :
- qlsStorePath(qlsStorePath_)
+EmptyFilePoolManager::EmptyFilePoolManager(const std::string& qlsStorePath,
+ JournalLog& journalLogRef) :
+ qlsStorePath_(qlsStorePath),
+ journalLogRef_(journalLogRef)
{}
EmptyFilePoolManager::~EmptyFilePoolManager() {
- slock l(partitionMapMutex);
- for (partitionMapItr_t i = partitionMap.begin(); i != partitionMap.end(); ++i) {
+ slock l(partitionMapMutex_);
+ for (partitionMapItr_t i = partitionMap_.begin(); i != partitionMap_.end(); ++i) {
delete i->second;
}
- partitionMap.clear();
+ partitionMap_.clear();
}
-void
-EmptyFilePoolManager::findEfpPartitions() {
+void EmptyFilePoolManager::findEfpPartitions() {
//std::cout << "*** Reading " << qlsStorePath << std::endl; // DEBUG
std::vector<std::string> dirList;
- jdir::read_dir(qlsStorePath, dirList, true, false, true, false);
+ jdir::read_dir(qlsStorePath_, dirList, true, false, true, false);
for (std::vector<std::string>::iterator i = dirList.begin(); i != dirList.end(); ++i) {
if ((*i)[0] == 'p' && i->length() == 4) { // Filter: look only at names pNNN
efpPartitionNumber_t pn = ::atoi(i->c_str() + 1);
- std::string fullDirPath(qlsStorePath + "/" + (*i));
+ std::string fullDirPath(qlsStorePath_ + "/" + (*i));
EmptyFilePoolPartition* efppp = 0;
try {
- efppp = new EmptyFilePoolPartition(pn, fullDirPath);
+ efppp = new EmptyFilePoolPartition(pn, fullDirPath, journalLogRef_);
{
- slock l(partitionMapMutex);
- partitionMap[pn] = efppp;
+ slock l(partitionMapMutex_);
+ partitionMap_[pn] = efppp;
}
} catch (const std::exception& e) {
if (efppp != 0) {
@@ -72,34 +70,63 @@ EmptyFilePoolManager::findEfpPartitions(
efppp->findEmptyFilePools();
}
}
+ // TODO: Log results
+/*
+ QLS_LOG(info, "EFP Manager initialization complete");
+ std::vector<qpid::qls_jrnl::EmptyFilePoolPartition*> partitionList;
+ std::vector<qpid::qls_jrnl::EmptyFilePool*> filePoolList;
+ getEfpPartitions(partitionList);
+ if (partitionList.size() == 0) {
+ QLS_LOG(error, "NO EFP PARTITIONS FOUND! No queue creation is possible.")
+ } else {
+ QLS_LOG(info, "> EFP Partitions found: " << partitionList.size());
+ for (std::vector<qpid::qls_jrnl::EmptyFilePoolPartition*>::const_iterator i=partitionList.begin(); i!= partitionList.end(); ++i) {
+ filePoolList.clear();
+ (*i)->getEmptyFilePools(filePoolList);
+ QLS_LOG(info, " * Partition " << (*i)->partitionNumber() << " containing " << filePoolList.size() << " pool" <<
+ (filePoolList.size()>1 ? "s" : "") << " at \'" << (*i)->partitionDirectory() << "\'");
+ for (std::vector<qpid::qls_jrnl::EmptyFilePool*>::const_iterator j=filePoolList.begin(); j!=filePoolList.end(); ++j) {
+ QLS_LOG(info, " - EFP \'" << (*j)->dataSize_kib() << "k\' containing " << (*j)->numEmptyFiles() <<
+ " files of size " << (*j)->dataSize_kib() << " KiB totaling " << (*j)->cumFileSize_kib() << " KiB");
+ }
+ }
+ }
+*/
}
-uint16_t
-EmptyFilePoolManager::getNumEfpPartitions() const {
- return partitionMap.size();
+void EmptyFilePoolManager::getEfpFileSizes(std::vector<efpDataSize_kib_t>& efpFileSizeList,
+ const efpPartitionNumber_t efpPartitionNumber) const {
+ if (efpPartitionNumber == 0) {
+ for (partitionMapConstItr_t i=partitionMap_.begin(); i!=partitionMap_.end(); ++i) {
+ i->second->getEmptyFilePoolSizes_kib(efpFileSizeList);
+ }
+ } else {
+ partitionMapConstItr_t i = partitionMap_.find(efpPartitionNumber);
+ if (i != partitionMap_.end()) {
+ i->second->getEmptyFilePoolSizes_kib(efpFileSizeList);
+ }
+ }
}
-EmptyFilePoolPartition*
-EmptyFilePoolManager::getEfpPartition(const efpPartitionNumber_t partitionNumber) {
- partitionMapItr_t i = partitionMap.find(partitionNumber);
- if (i == partitionMap.end())
+EmptyFilePoolPartition* EmptyFilePoolManager::getEfpPartition(const efpPartitionNumber_t partitionNumber) {
+ partitionMapItr_t i = partitionMap_.find(partitionNumber);
+ if (i == partitionMap_.end())
return 0;
else
return i->second;
}
-void
-EmptyFilePoolManager::getEfpPartitionNumbers(std::vector<efpPartitionNumber_t>& partitionNumberList,
- const efpDataSize_kib_t efpFileSizeKb) const {
- slock l(partitionMapMutex);
- for (partitionMapConstItr_t i=partitionMap.begin(); i!=partitionMap.end(); ++i) {
- if (efpFileSizeKb == 0) {
+void EmptyFilePoolManager::getEfpPartitionNumbers(std::vector<efpPartitionNumber_t>& partitionNumberList,
+ const efpDataSize_kib_t efpDataSize_kib) const {
+ slock l(partitionMapMutex_);
+ for (partitionMapConstItr_t i=partitionMap_.begin(); i!=partitionMap_.end(); ++i) {
+ if (efpDataSize_kib == 0) {
partitionNumberList.push_back(i->first);
} else {
std::vector<efpDataSize_kib_t> efpFileSizeList;
- i->second->getEmptyFilePoolSizesKb(efpFileSizeList);
+ i->second->getEmptyFilePoolSizes_kib(efpFileSizeList);
for (std::vector<efpDataSize_kib_t>::iterator j=efpFileSizeList.begin(); j!=efpFileSizeList.end(); ++j) {
- if (*j == efpFileSizeKb) {
+ if (*j == efpDataSize_kib) {
partitionNumberList.push_back(i->first);
break;
}
@@ -108,18 +135,17 @@ EmptyFilePoolManager::getEfpPartitionNum
}
}
-void
-EmptyFilePoolManager::getEfpPartitions(std::vector<EmptyFilePoolPartition*>& partitionList,
- const efpDataSize_kib_t efpFileSizeKb) {
- slock l(partitionMapMutex);
- for (partitionMapConstItr_t i=partitionMap.begin(); i!=partitionMap.end(); ++i) {
- if (efpFileSizeKb == 0) {
+void EmptyFilePoolManager::getEfpPartitions(std::vector<EmptyFilePoolPartition*>& partitionList,
+ const efpDataSize_kib_t efpDataSize_kib) {
+ slock l(partitionMapMutex_);
+ for (partitionMapConstItr_t i=partitionMap_.begin(); i!=partitionMap_.end(); ++i) {
+ if (efpDataSize_kib == 0) {
partitionList.push_back(i->second);
} else {
std::vector<efpDataSize_kib_t> efpFileSizeList;
- i->second->getEmptyFilePoolSizesKb(efpFileSizeList);
+ i->second->getEmptyFilePoolSizes_kib(efpFileSizeList);
for (std::vector<efpDataSize_kib_t>::iterator j=efpFileSizeList.begin(); j!=efpFileSizeList.end(); ++j) {
- if (*j == efpFileSizeKb) {
+ if (*j == efpDataSize_kib) {
partitionList.push_back(i->second);
break;
}
@@ -128,48 +154,34 @@ EmptyFilePoolManager::getEfpPartitions(s
}
}
-void
-EmptyFilePoolManager::getEfpFileSizes(std::vector<efpDataSize_kib_t>& efpFileSizeList,
- const efpPartitionNumber_t efpPartitionNumber) const {
- if (efpPartitionNumber == 0) {
- for (partitionMapConstItr_t i=partitionMap.begin(); i!=partitionMap.end(); ++i) {
- i->second->getEmptyFilePoolSizesKb(efpFileSizeList);
- }
- } else {
- partitionMapConstItr_t i = partitionMap.find(efpPartitionNumber);
- if (i != partitionMap.end()) {
- i->second->getEmptyFilePoolSizesKb(efpFileSizeList);
- }
- }
+EmptyFilePool* EmptyFilePoolManager::getEmptyFilePool(const efpIdentity_t efpIdentity) {
+ return getEmptyFilePool(efpIdentity.first, efpIdentity.second);
+}
+
+EmptyFilePool* EmptyFilePoolManager::getEmptyFilePool(const efpPartitionNumber_t partitionNumber,
+ const efpDataSize_kib_t efpDataSize_kib) {
+ EmptyFilePoolPartition* efppp = getEfpPartition(partitionNumber);
+ if (efppp != 0)
+ return efppp->getEmptyFilePool(efpDataSize_kib);
+ return 0;
}
-void
-EmptyFilePoolManager::getEmptyFilePools(std::vector<EmptyFilePool*>& emptyFilePoolList,
- const efpPartitionNumber_t efpPartitionNumber) {
+void EmptyFilePoolManager::getEmptyFilePools(std::vector<EmptyFilePool*>& emptyFilePoolList,
+ const efpPartitionNumber_t efpPartitionNumber) {
if (efpPartitionNumber == 0) {
- for (partitionMapConstItr_t i=partitionMap.begin(); i!=partitionMap.end(); ++i) {
+ for (partitionMapConstItr_t i=partitionMap_.begin(); i!=partitionMap_.end(); ++i) {
i->second->getEmptyFilePools(emptyFilePoolList);
}
} else {
- partitionMapConstItr_t i = partitionMap.find(efpPartitionNumber);
- if (i != partitionMap.end()) {
+ partitionMapConstItr_t i = partitionMap_.find(efpPartitionNumber);
+ if (i != partitionMap_.end()) {
i->second->getEmptyFilePools(emptyFilePoolList);
}
}
}
-EmptyFilePool*
-EmptyFilePoolManager::getEmptyFilePool(const efpPartitionNumber_t partitionNumber,
- const efpDataSize_kib_t efpFileSizeKib) {
- EmptyFilePoolPartition* efppp = getEfpPartition(partitionNumber);
- if (efppp != 0)
- return efppp->getEmptyFilePool(efpFileSizeKib);
- return 0;
-}
-
-EmptyFilePool*
-EmptyFilePoolManager::getEmptyFilePool(const efpIdentity_t efpIdentity) {
- return getEmptyFilePool(efpIdentity.first, efpIdentity.second);
+uint16_t EmptyFilePoolManager::getNumEfpPartitions() const {
+ return partitionMap_.size();
}
}} // namespace qpid::qls_jrnl
Modified: qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/EmptyFilePoolManager.h
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/EmptyFilePoolManager.h?rev=1534383&r1=1534382&r2=1534383&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/EmptyFilePoolManager.h (original)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/EmptyFilePoolManager.h Mon Oct 21 21:26:10 2013
@@ -25,7 +25,6 @@
#include <map>
#include "qpid/linearstore/jrnl/EmptyFilePoolPartition.h"
#include "qpid/linearstore/jrnl/smutex.h"
-#include <string>
namespace qpid {
namespace qls_jrnl {
@@ -37,25 +36,30 @@ protected:
typedef partitionMap_t::iterator partitionMapItr_t;
typedef partitionMap_t::const_iterator partitionMapConstItr_t;
- std::string qlsStorePath;
- partitionMap_t partitionMap;
- smutex partitionMapMutex;
+ std::string qlsStorePath_;
+ JournalLog& journalLogRef_;
+ partitionMap_t partitionMap_;
+ smutex partitionMapMutex_;
public:
- EmptyFilePoolManager(const std::string& qlsStorePath_);
+ EmptyFilePoolManager(const std::string& qlsStorePath_,
+ JournalLog& journalLogRef_);
virtual ~EmptyFilePoolManager();
- void findEfpPartitions();
- uint16_t getNumEfpPartitions() const;
+ void findEfpPartitions();
+ void getEfpFileSizes(std::vector<efpDataSize_kib_t>& efpFileSizeList,
+ const efpPartitionNumber_t efpPartitionNumber = 0) const;
EmptyFilePoolPartition* getEfpPartition(const efpPartitionNumber_t partitionNumber);
- void getEfpPartitionNumbers(std::vector<efpPartitionNumber_t>& partitionNumberList, const efpDataSize_kib_t efpFileSizeKb = 0) const;
- void getEfpPartitions(std::vector<EmptyFilePoolPartition*>& partitionList, const efpDataSize_kib_t efpFileSizeKb = 0);
-
- void getEfpFileSizes(std::vector<efpDataSize_kib_t>& efpFileSizeList, const efpPartitionNumber_t efpPartitionNumber = 0) const;
- void getEmptyFilePools(std::vector<EmptyFilePool*>& emptyFilePoolList, const efpPartitionNumber_t efpPartitionNumber = 0);
-
- EmptyFilePool* getEmptyFilePool(const efpPartitionNumber_t partitionNumber, const efpDataSize_kib_t efpFileSizeKb);
+ void getEfpPartitionNumbers(std::vector<efpPartitionNumber_t>& partitionNumberList,
+ const efpDataSize_kib_t efpDataSize_kib = 0) const;
+ void getEfpPartitions(std::vector<EmptyFilePoolPartition*>& partitionList,
+ const efpDataSize_kib_t efpDataSize_kib = 0);
EmptyFilePool* getEmptyFilePool(const efpIdentity_t efpIdentity);
+ EmptyFilePool* getEmptyFilePool(const efpPartitionNumber_t partitionNumber,
+ const efpDataSize_kib_t efpDataSize_kib);
+ void getEmptyFilePools(std::vector<EmptyFilePool*>& emptyFilePoolList,
+ const efpPartitionNumber_t efpPartitionNumber = 0);
+ uint16_t getNumEfpPartitions() const;
};
}} // namespace qpid::qls_jrnl
Modified: qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/EmptyFilePoolPartition.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/EmptyFilePoolPartition.cpp?rev=1534383&r1=1534382&r2=1534383&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/EmptyFilePoolPartition.cpp (original)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/EmptyFilePoolPartition.cpp Mon Oct 21 21:26:10 2013
@@ -27,62 +27,54 @@
#include "qpid/linearstore/jrnl/jexception.h"
#include "qpid/linearstore/jrnl/slock.h"
-//#include <iostream> // DEBUG
-
namespace qpid {
namespace qls_jrnl {
-const std::string EmptyFilePoolPartition::efpTopLevelDir("efp"); // Sets the top-level efp dir within a partition
+// static
+const std::string EmptyFilePoolPartition::s_efpTopLevelDir_("efp"); // Sets the top-level efp dir within a partition
-EmptyFilePoolPartition::EmptyFilePoolPartition(const efpPartitionNumber_t partitionNum_, const std::string& partitionDir_) :
- partitionNum(partitionNum_),
- partitionDir(partitionDir_)
+EmptyFilePoolPartition::EmptyFilePoolPartition(const efpPartitionNumber_t partitionNum,
+ const std::string& partitionDir,
+ JournalLog& journalLogRef) :
+ partitionNum_(partitionNum),
+ partitionDir_(partitionDir),
+ journalLogRef_(journalLogRef)
{
validatePartitionDir();
}
EmptyFilePoolPartition::~EmptyFilePoolPartition() {
- slock l(efpMapMutex);
- for (efpMapItr_t i = efpMap.begin(); i != efpMap.end(); ++i) {
+ slock l(efpMapMutex_);
+ for (efpMapItr_t i = efpMap_.begin(); i != efpMap_.end(); ++i) {
delete i->second;
}
- efpMap.clear();
-}
-
-void
-EmptyFilePoolPartition::validatePartitionDir() {
- if (!jdir::is_dir(partitionDir)) {
- std::ostringstream ss;
- ss << "Invalid partition directory: \'" << partitionDir << "\' is not a directory";
- throw jexception(jerrno::JERR_EFP_BADPARTITIONDIR, ss.str(), "EmptyFilePoolPartition", "validatePartitionDir");
- }
- // TODO: other validity checks here
+ efpMap_.clear();
}
void
EmptyFilePoolPartition::findEmptyFilePools() {
//std::cout << "Reading " << partitionDir << std::endl; // DEBUG
std::vector<std::string> dirList;
- jdir::read_dir(partitionDir, dirList, true, false, false, false);
+ jdir::read_dir(partitionDir_, dirList, true, false, false, false);
bool foundEfpDir = false;
for (std::vector<std::string>::iterator i = dirList.begin(); i != dirList.end(); ++i) {
- if (i->compare(efpTopLevelDir) == 0) {
+ if (i->compare(s_efpTopLevelDir_) == 0) {
foundEfpDir = true;
break;
}
}
if (foundEfpDir) {
- std::string efpDir(partitionDir + "/" + efpTopLevelDir);
+ std::string efpDir(partitionDir_ + "/" + s_efpTopLevelDir_);
//std::cout << "Reading " << efpDir << std::endl; // DEBUG
dirList.clear();
jdir::read_dir(efpDir, dirList, true, false, false, true);
for (std::vector<std::string>::iterator i = dirList.begin(); i != dirList.end(); ++i) {
EmptyFilePool* efpp = 0;
try {
- efpp = new EmptyFilePool(*i, this);
+ efpp = new EmptyFilePool(*i, this, journalLogRef_);
{
- slock l(efpMapMutex);
- efpMap[efpp->dataSize_kib()] = efpp;
+ slock l(efpMapMutex_);
+ efpMap_[efpp->dataSize_kib()] = efpp;
}
}
catch (const std::exception& e) {
@@ -98,36 +90,42 @@ EmptyFilePoolPartition::findEmptyFilePoo
}
}
-efpPartitionNumber_t
-EmptyFilePoolPartition::partitionNumber() const {
- return partitionNum;
+EmptyFilePool* EmptyFilePoolPartition::getEmptyFilePool(const efpDataSize_kib_t efpDataSize_kib) {
+ efpMapItr_t i = efpMap_.find(efpDataSize_kib);
+ if (i == efpMap_.end())
+ return 0;
+ return i->second;
}
-std::string
-EmptyFilePoolPartition::partitionDirectory() const {
- return partitionDir;
+void EmptyFilePoolPartition::getEmptyFilePools(std::vector<EmptyFilePool*>& efpList) {
+ for (efpMapItr_t i=efpMap_.begin(); i!=efpMap_.end(); ++i) {
+ efpList.push_back(i->second);
+ }
}
-EmptyFilePool*
-EmptyFilePoolPartition::getEmptyFilePool(const efpDataSize_kib_t efpFileSizeKb) {
- efpMapItr_t i = efpMap.find(efpFileSizeKb);
- if (i == efpMap.end())
- return 0;
- return i->second;
+void EmptyFilePoolPartition::getEmptyFilePoolSizes_kib(std::vector<efpDataSize_kib_t>& efpDataSizesList_kib) const {
+ for (efpMapConstItr_t i=efpMap_.begin(); i!=efpMap_.end(); ++i) {
+ efpDataSizesList_kib.push_back(i->first);
+ }
}
-void
-EmptyFilePoolPartition::getEmptyFilePoolSizesKb(std::vector<efpDataSize_kib_t>& efpFileSizesKbList) const {
- for (efpMapConstItr_t i=efpMap.begin(); i!=efpMap.end(); ++i) {
- efpFileSizesKbList.push_back(i->first);
- }
+std::string EmptyFilePoolPartition::getPartitionDirectory() const {
+ return partitionDir_;
}
-void
-EmptyFilePoolPartition::getEmptyFilePools(std::vector<EmptyFilePool*>& efpList) {
- for (efpMapItr_t i=efpMap.begin(); i!=efpMap.end(); ++i) {
- efpList.push_back(i->second);
+efpPartitionNumber_t EmptyFilePoolPartition::getPartitionNumber() const {
+ return partitionNum_;
+}
+
+// --- protected functions ---
+
+void EmptyFilePoolPartition::validatePartitionDir() {
+ if (!jdir::is_dir(partitionDir_)) {
+ std::ostringstream ss;
+ ss << "Invalid partition directory: \'" << partitionDir_ << "\' is not a directory";
+ throw jexception(jerrno::JERR_EFP_BADPARTITIONDIR, ss.str(), "EmptyFilePoolPartition", "validatePartitionDir");
}
+ // TODO: other validity checks here
}
}} // namespace qpid::qls_jrnl
Modified: qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/EmptyFilePoolPartition.h
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/EmptyFilePoolPartition.h?rev=1534383&r1=1534382&r2=1534383&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/EmptyFilePoolPartition.h (original)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/EmptyFilePoolPartition.h Mon Oct 21 21:26:10 2013
@@ -38,33 +38,38 @@ namespace qls_jrnl {
namespace qpid {
namespace qls_jrnl {
+class JournalLog;
class EmptyFilePoolPartition
{
public:
- static const std::string efpTopLevelDir;
+ static const std::string s_efpTopLevelDir_;
protected:
typedef std::map<efpDataSize_kib_t, EmptyFilePool*> efpMap_t;
typedef efpMap_t::iterator efpMapItr_t;
typedef efpMap_t::const_iterator efpMapConstItr_t;
- const efpPartitionNumber_t partitionNum;
- const std::string partitionDir;
- efpMap_t efpMap;
- smutex efpMapMutex;
-
- void validatePartitionDir();
+ const efpPartitionNumber_t partitionNum_;
+ const std::string partitionDir_;
+ JournalLog& journalLogRef_;
+ efpMap_t efpMap_;
+ smutex efpMapMutex_;
public:
- EmptyFilePoolPartition(const efpPartitionNumber_t partitionNum_, const std::string& partitionDir_);
+ EmptyFilePoolPartition(const efpPartitionNumber_t partitionNum,
+ const std::string& partitionDir,
+ JournalLog& journalLogRef);
virtual ~EmptyFilePoolPartition();
- void findEmptyFilePools();
- efpPartitionNumber_t partitionNumber() const;
- std::string partitionDirectory() const;
- EmptyFilePool* getEmptyFilePool(const efpDataSize_kib_t efpFileSizeKb);
- void getEmptyFilePoolSizesKb(std::vector<efpDataSize_kib_t>& efpFileSizesKbList) const;
+ void findEmptyFilePools();
+ EmptyFilePool* getEmptyFilePool(const efpDataSize_kib_t efpDataSize_kib);
void getEmptyFilePools(std::vector<EmptyFilePool*>& efpList);
+ void getEmptyFilePoolSizes_kib(std::vector<efpDataSize_kib_t>& efpDataSizesList) const;
+ std::string getPartitionDirectory() const;
+ efpPartitionNumber_t getPartitionNumber() const;
+
+protected:
+ void validatePartitionDir();
};
}} // namespace qpid::qls_jrnl
Modified: qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/EmptyFilePoolTypes.h
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/EmptyFilePoolTypes.h?rev=1534383&r1=1534382&r2=1534383&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/EmptyFilePoolTypes.h (original)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/EmptyFilePoolTypes.h Mon Oct 21 21:26:10 2013
@@ -28,13 +28,13 @@
namespace qpid {
namespace qls_jrnl {
- typedef uint64_t efpDataSize_kib_t; // Size of data part of file (excluding file header) in kib
- typedef uint64_t efpFileSize_kib_t; // Size of file (header + data) in kib
- typedef uint32_t efpDataSize_sblks_t; // Size of data part of file (excluding file header) in sblks
- typedef uint32_t efpFileSize_sblks_t; // Size of file (header + data) in sblks
- typedef uint32_t efpFileCount_t;
- typedef uint16_t efpPartitionNumber_t;
- typedef std::pair<efpPartitionNumber_t, efpDataSize_kib_t> efpIdentity_t;
+ typedef uint64_t efpDataSize_kib_t; ///< Size of data part of file (excluding file header) in kib
+ typedef uint64_t efpFileSize_kib_t; ///< Size of file (header + data) in kib
+ typedef uint32_t efpDataSize_sblks_t; ///< Size of data part of file (excluding file header) in sblks
+ typedef uint32_t efpFileSize_sblks_t; ///< Size of file (header + data) in sblks
+ typedef uint32_t efpFileCount_t; ///< Number of files in a partition or pool
+ typedef uint16_t efpPartitionNumber_t; ///< Number assigned to a partition
+ typedef std::pair<efpPartitionNumber_t, efpDataSize_kib_t> efpIdentity_t; ///< Unique identity of a pool, consisting of the partition number and data size
}} // namespace qpid::qls_jrnl
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org