You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by tr...@apache.org on 2013/07/10 20:20:20 UTC

svn commit: r1501895 [3/10] - in /qpid/branches/linearstore/qpid/cpp/src: ./ qpid/linearstore/ qpid/linearstore/jrnl/

Added: qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/MessageStoreImpl.h
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/MessageStoreImpl.h?rev=1501895&view=auto
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/MessageStoreImpl.h (added)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/MessageStoreImpl.h Wed Jul 10 18:20:19 2013
@@ -0,0 +1,380 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#ifndef QPID_LEGACYSTORE_MESSAGESTOREIMPL_H
+#define QPID_LEGACYSTORE_MESSAGESTOREIMPL_H
+
+#include <string>
+
+#include "db-inc.h"
+#include "qpid/legacystore/Cursor.h"
+#include "qpid/legacystore/IdDbt.h"
+#include "qpid/legacystore/IdSequence.h"
+#include "qpid/legacystore/JournalImpl.h"
+#include "qpid/legacystore/jrnl/jcfg.h"
+#include "qpid/legacystore/PreparedTransaction.h"
+#include "qpid/broker/Broker.h"
+#include "qpid/broker/MessageStore.h"
+#include "qpid/management/Manageable.h"
+#include "qmf/org/apache/qpid/legacystore/Store.h"
+#include "qpid/legacystore/TxnCtxt.h"
+
+// Assume DB_VERSION_MAJOR == 4
+#if (DB_VERSION_MINOR == 2)
+#include <errno.h>
+#define DB_BUFFER_SMALL ENOMEM
+#endif
+
+namespace qpid { namespace sys {
+class Timer;
+}}
+
+namespace mrg {
+namespace msgstore {
+
+/**
+ * An implementation of the MessageStore interface based on Berkeley DB
+ */
+class MessageStoreImpl : public qpid::broker::MessageStore, public qpid::management::Manageable
+{
+  public:
+    typedef boost::shared_ptr<Db> db_ptr;
+    typedef boost::shared_ptr<DbEnv> dbEnv_ptr;
+
+    struct StoreOptions : public qpid::Options {
+        StoreOptions(const std::string& name="Store Options");
+        std::string clusterName;
+        std::string storeDir;
+        u_int16_t numJrnlFiles;
+        bool      autoJrnlExpand;
+        u_int16_t autoJrnlExpandMaxFiles;
+        u_int32_t jrnlFsizePgs;
+        bool      truncateFlag;
+        u_int32_t wCachePageSizeKib;
+        u_int16_t tplNumJrnlFiles;
+        u_int32_t tplJrnlFsizePgs;
+        u_int32_t tplWCachePageSizeKib;
+    };
+
+  protected:
+    typedef std::map<u_int64_t, qpid::broker::RecoverableQueue::shared_ptr> queue_index;
+    typedef std::map<u_int64_t, qpid::broker::RecoverableExchange::shared_ptr> exchange_index;
+    typedef std::map<u_int64_t, qpid::broker::RecoverableMessage::shared_ptr> message_index;
+
+    typedef LockedMappings::map txn_lock_map;
+    typedef boost::ptr_list<PreparedTransaction> txn_list;
+
+    // Structs for Transaction Recover List (TPL) recover state
+    struct TplRecoverStruct {
+        u_int64_t rid; // rid of TPL record
+        bool deq_flag;
+        bool commit_flag;
+        bool tpc_flag;
+        TplRecoverStruct(const u_int64_t _rid, const bool _deq_flag, const bool _commit_flag, const bool _tpc_flag);
+    };
+    typedef TplRecoverStruct TplRecover;
+    typedef std::pair<std::string, TplRecover> TplRecoverMapPair;
+    typedef std::map<std::string, TplRecover> TplRecoverMap;
+    typedef TplRecoverMap::const_iterator TplRecoverMapCitr;
+
+    typedef std::map<std::string, JournalImpl*> JournalListMap;
+    typedef JournalListMap::iterator JournalListMapItr;
+
+    // Default store settings
+    static const u_int16_t defNumJrnlFiles = 8;
+    static const u_int32_t defJrnlFileSizePgs = 24;
+    static const bool      defTruncateFlag = false;
+    static const u_int32_t defWCachePageSize = JRNL_WMGR_DEF_PAGE_SIZE * JRNL_DBLK_SIZE * JRNL_SBLK_SIZE / 1024;
+    static const u_int16_t defTplNumJrnlFiles = 8;
+    static const u_int32_t defTplJrnlFileSizePgs = 24;
+    static const u_int32_t defTplWCachePageSize = defWCachePageSize / 8;
+    // TODO: set defAutoJrnlExpand to true and defAutoJrnlExpandMaxFiles to 16 when auto-expand comes on-line
+    static const bool      defAutoJrnlExpand = false;
+    static const u_int16_t defAutoJrnlExpandMaxFiles = 0;
+
+    static const std::string storeTopLevelDir;
+    static qpid::sys::Duration defJournalGetEventsTimeout;
+    static qpid::sys::Duration defJournalFlushTimeout;
+
+    std::list<db_ptr> dbs;
+    dbEnv_ptr dbenv;
+    db_ptr queueDb;
+    db_ptr configDb;
+    db_ptr exchangeDb;
+    db_ptr mappingDb;
+    db_ptr bindingDb;
+    db_ptr generalDb;
+
+    // Pointer to Transaction Prepared List (TPL) journal instance
+    boost::shared_ptr<TplJournalImpl> tplStorePtr;
+    TplRecoverMap tplRecoverMap;
+    qpid::sys::Mutex tplInitLock;
+    JournalListMap journalList;
+    qpid::sys::Mutex journalListLock;
+    qpid::sys::Mutex bdbLock;
+
+    IdSequence queueIdSequence;
+    IdSequence exchangeIdSequence;
+    IdSequence generalIdSequence;
+    IdSequence messageIdSequence;
+    std::string storeDir;
+    u_int16_t numJrnlFiles;
+    bool      autoJrnlExpand;
+    u_int16_t autoJrnlExpandMaxFiles;
+    u_int32_t jrnlFsizeSblks;
+    bool      truncateFlag;
+    u_int32_t wCachePgSizeSblks;
+    u_int16_t wCacheNumPages;
+    u_int16_t tplNumJrnlFiles;
+    u_int32_t tplJrnlFsizeSblks;
+    u_int32_t tplWCachePgSizeSblks;
+    u_int16_t tplWCacheNumPages;
+    u_int64_t highestRid;
+    bool isInit;
+    const char* envPath;
+    qpid::broker::Broker* broker;
+
+    qmf::org::apache::qpid::legacystore::Store::shared_ptr mgmtObject;
+    qpid::management::ManagementAgent* agent;
+
+
+    // Parameter validation and calculation
+    static u_int16_t chkJrnlNumFilesParam(const u_int16_t param,
+                                          const std::string paramName);
+    static u_int32_t chkJrnlFileSizeParam(const u_int32_t param,
+                                          const std::string paramName,
+                                          const u_int32_t wCachePgSizeSblks = 0);
+    static u_int32_t chkJrnlWrPageCacheSize(const u_int32_t param,
+                                            const std::string paramName,
+                                            const u_int16_t jrnlFsizePgs);
+    static u_int16_t getJrnlWrNumPages(const u_int32_t wrPageSizeKib);
+    void chkJrnlAutoExpandOptions(const MessageStoreImpl::StoreOptions* opts,
+                                  bool& autoJrnlExpand,
+                                  u_int16_t& autoJrnlExpandMaxFiles,
+                                  const std::string& autoJrnlExpandMaxFilesParamName,
+                                  const u_int16_t numJrnlFiles,
+                                  const std::string& numJrnlFilesParamName);
+
+    void init();
+
+    void recoverQueues(TxnCtxt& txn,
+                       qpid::broker::RecoveryManager& recovery,
+                       queue_index& index,
+                       txn_list& locked,
+                       message_index& messages);
+    void recoverMessages(TxnCtxt& txn,
+                         qpid::broker::RecoveryManager& recovery,
+                         queue_index& index,
+                         txn_list& locked,
+                         message_index& prepared);
+    void recoverMessages(TxnCtxt& txn,
+                         qpid::broker::RecoveryManager& recovery,
+                         qpid::broker::RecoverableQueue::shared_ptr& queue,
+                         txn_list& locked,
+                         message_index& prepared,
+                         long& rcnt,
+                         long& idcnt);
+    qpid::broker::RecoverableMessage::shared_ptr getExternMessage(qpid::broker::RecoveryManager& recovery,
+                                                                  uint64_t mId,
+                                                                  unsigned& headerSize);
+    void recoverExchanges(TxnCtxt& txn,
+                          qpid::broker::RecoveryManager& recovery,
+                          exchange_index& index);
+    void recoverBindings(TxnCtxt& txn,
+                         exchange_index& exchanges,
+                         queue_index& queues);
+    void recoverGeneral(TxnCtxt& txn,
+                        qpid::broker::RecoveryManager& recovery);
+    int enqueueMessage(TxnCtxt& txn,
+                       IdDbt& msgId,
+                       qpid::broker::RecoverableMessage::shared_ptr& msg,
+                       queue_index& index,
+                       txn_list& locked,
+                       message_index& prepared);
+    void readTplStore();
+    void recoverTplStore();
+    void recoverLockedMappings(txn_list& txns);
+    TxnCtxt* check(qpid::broker::TransactionContext* ctxt);
+    u_int64_t msgEncode(std::vector<char>& buff, const boost::intrusive_ptr<qpid::broker::PersistableMessage>& message);
+    void store(const qpid::broker::PersistableQueue* queue,
+               TxnCtxt* txn,
+               const boost::intrusive_ptr<qpid::broker::PersistableMessage>& message,
+               bool newId);
+    void async_dequeue(qpid::broker::TransactionContext* ctxt,
+                       const boost::intrusive_ptr<qpid::broker::PersistableMessage>& msg,
+                       const qpid::broker::PersistableQueue& queue);
+    void destroy(db_ptr db,
+                 const qpid::broker::Persistable& p);
+    bool create(db_ptr db,
+                IdSequence& seq,
+                const qpid::broker::Persistable& p);
+    void completed(TxnCtxt& txn,
+                   bool commit);
+    void deleteBindingsForQueue(const qpid::broker::PersistableQueue& queue);
+    void deleteBinding(const qpid::broker::PersistableExchange& exchange,
+                       const qpid::broker::PersistableQueue& queue,
+                       const std::string& key);
+
+    void put(db_ptr db,
+             DbTxn* txn,
+             Dbt& key,
+             Dbt& value);
+    void open(db_ptr db,
+              DbTxn* txn,
+              const char* file,
+              bool dupKey);
+    void closeDbs();
+
+    // journal functions
+    void createJrnlQueue(const qpid::broker::PersistableQueue& queue);
+    u_int32_t bHash(const std::string str);
+    std::string getJrnlDir(const qpid::broker::PersistableQueue& queue); //for exmaple /var/rhm/ + queueDir/
+    std::string getJrnlHashDir(const std::string& queueName);
+    std::string getJrnlBaseDir();
+    std::string getBdbBaseDir();
+    std::string getTplBaseDir();
+    inline void checkInit() {
+        // TODO: change the default dir to ~/.qpidd
+        if (!isInit) { init("/tmp"); isInit = true; }
+    }
+    void chkTplStoreInit();
+
+    // debug aid for printing XIDs that may contain non-printable chars
+    static std::string xid2str(const std::string xid) {
+        std::ostringstream oss;
+        oss << std::hex << std::setfill('0');
+        for (unsigned i=0; i<xid.size(); i++) {
+            if (isprint(xid[i]))
+                oss << xid[i];
+            else
+                oss << "/" << std::setw(2) << (int)((char)xid[i]);
+        }
+        return oss.str();
+    }
+
+  public:
+    typedef boost::shared_ptr<MessageStoreImpl> shared_ptr;
+
+    MessageStoreImpl(qpid::broker::Broker* broker, const char* envpath = 0);
+
+    virtual ~MessageStoreImpl();
+
+    bool init(const qpid::Options* options);
+
+    bool init(const std::string& dir,
+              u_int16_t jfiles = defNumJrnlFiles,
+              u_int32_t jfileSizePgs = defJrnlFileSizePgs,
+              const bool truncateFlag = false,
+              u_int32_t wCachePageSize = defWCachePageSize,
+              u_int16_t tplJfiles = defTplNumJrnlFiles,
+              u_int32_t tplJfileSizePgs = defTplJrnlFileSizePgs,
+              u_int32_t tplWCachePageSize = defTplWCachePageSize,
+              bool      autoJExpand = defAutoJrnlExpand,
+              u_int16_t autoJExpandMaxFiles = defAutoJrnlExpandMaxFiles);
+
+    void truncateInit(const bool saveStoreContent = false);
+
+    void initManagement ();
+
+    void finalize();
+
+    void create(qpid::broker::PersistableQueue& queue,
+                const qpid::framing::FieldTable& args);
+
+    void destroy(qpid::broker::PersistableQueue& queue);
+
+    void create(const qpid::broker::PersistableExchange& queue,
+                const qpid::framing::FieldTable& args);
+
+    void destroy(const qpid::broker::PersistableExchange& queue);
+
+    void bind(const qpid::broker::PersistableExchange& exchange,
+              const qpid::broker::PersistableQueue& queue,
+              const std::string& key,
+              const qpid::framing::FieldTable& args);
+
+    void unbind(const qpid::broker::PersistableExchange& exchange,
+                const qpid::broker::PersistableQueue& queue,
+                const std::string& key,
+                const qpid::framing::FieldTable& args);
+
+    void create(const qpid::broker::PersistableConfig& config);
+
+    void destroy(const qpid::broker::PersistableConfig& config);
+
+    void recover(qpid::broker::RecoveryManager& queues);
+
+    void stage(const boost::intrusive_ptr<qpid::broker::PersistableMessage>& msg);
+
+    void destroy(qpid::broker::PersistableMessage& msg);
+
+    void appendContent(const boost::intrusive_ptr<const qpid::broker::PersistableMessage>& msg,
+                       const std::string& data);
+
+    void loadContent(const qpid::broker::PersistableQueue& queue,
+                     const boost::intrusive_ptr<const qpid::broker::PersistableMessage>& msg,
+                     std::string& data,
+                     uint64_t offset,
+                     uint32_t length);
+
+    void enqueue(qpid::broker::TransactionContext* ctxt,
+                 const boost::intrusive_ptr<qpid::broker::PersistableMessage>& msg,
+                 const qpid::broker::PersistableQueue& queue);
+
+    void dequeue(qpid::broker::TransactionContext* ctxt,
+                 const boost::intrusive_ptr<qpid::broker::PersistableMessage>& msg,
+                 const qpid::broker::PersistableQueue& queue);
+
+    void flush(const qpid::broker::PersistableQueue& queue);
+
+    u_int32_t outstandingQueueAIO(const qpid::broker::PersistableQueue& queue);
+
+    void collectPreparedXids(std::set<std::string>& xids);
+
+    std::auto_ptr<qpid::broker::TransactionContext> begin();
+
+    std::auto_ptr<qpid::broker::TPCTransactionContext> begin(const std::string& xid);
+
+    void prepare(qpid::broker::TPCTransactionContext& ctxt);
+
+    void localPrepare(TxnCtxt* ctxt);
+
+    void commit(qpid::broker::TransactionContext& ctxt);
+
+    void abort(qpid::broker::TransactionContext& ctxt);
+
+    qpid::management::ManagementObject::shared_ptr GetManagementObject (void) const
+        { return mgmtObject; }
+
+    inline qpid::management::Manageable::status_t ManagementMethod (u_int32_t, qpid::management::Args&, std::string&)
+        { return qpid::management::Manageable::STATUS_OK; }
+
+    std::string getStoreDir() const;
+
+  private:
+    void journalDeleted(JournalImpl&);
+
+}; // class MessageStoreImpl
+
+} // namespace msgstore
+} // namespace mrg
+
+#endif // ifndef QPID_LEGACYSTORE_MESSAGESTOREIMPL_H

Propchange: qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/MessageStoreImpl.h
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/MessageStoreImpl.h
------------------------------------------------------------------------------
    svn:keywords = Author Date Id Rev URL

Added: qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/PreparedTransaction.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/PreparedTransaction.cpp?rev=1501895&view=auto
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/PreparedTransaction.cpp (added)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/PreparedTransaction.cpp Wed Jul 10 18:20:19 2013
@@ -0,0 +1,81 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "qpid/legacystore/PreparedTransaction.h"
+#include <algorithm>
+
+using namespace mrg::msgstore;
+using std::string;
+
+void LockedMappings::add(queue_id queue, message_id message)
+{
+    locked.push_back(std::make_pair(queue, message));
+}
+
+bool LockedMappings::isLocked(queue_id queue, message_id message)
+{
+    idpair op( std::make_pair(queue, message) );
+    return find(locked.begin(), locked.end(), op) != locked.end();
+}
+
+void LockedMappings::add(LockedMappings::map& map, std::string& key, queue_id queue, message_id message)
+{
+    LockedMappings::map::iterator i = map.find(key);
+    if (i == map.end()) {
+        LockedMappings::shared_ptr ptr(new LockedMappings());
+        i = map.insert(std::make_pair(key, ptr)).first;
+    }
+    i->second->add(queue, message);
+}
+
+bool PreparedTransaction::isLocked(queue_id queue, message_id message)
+{
+    return (enqueues.get() && enqueues->isLocked(queue, message))
+        || (dequeues.get() && dequeues->isLocked(queue, message));
+}
+
+
+bool PreparedTransaction::isLocked(PreparedTransaction::list& txns, queue_id queue, message_id message)
+{
+    for (PreparedTransaction::list::iterator i = txns.begin(); i != txns.end(); i++) {
+        if (i->isLocked(queue, message)) {
+            return true;
+        }
+    }
+    return false;
+}
+
+PreparedTransaction::list::iterator PreparedTransaction::getLockedPreparedTransaction(PreparedTransaction::list& txns, queue_id queue, message_id message)
+{
+    for (PreparedTransaction::list::iterator i = txns.begin(); i != txns.end(); i++) {
+        if (i->isLocked(queue, message)) {
+            return i;
+        }
+    }
+    return txns.end();
+}
+
+PreparedTransaction::PreparedTransaction(const std::string& _xid,
+                                         LockedMappings::shared_ptr _enqueues,
+                                         LockedMappings::shared_ptr _dequeues)
+
+    : xid(_xid), enqueues(_enqueues), dequeues(_dequeues) {}
+

Propchange: qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/PreparedTransaction.cpp
------------------------------------------------------------------------------
    svn:eol-style = native

Added: qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/PreparedTransaction.h
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/PreparedTransaction.h?rev=1501895&view=auto
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/PreparedTransaction.h (added)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/PreparedTransaction.h Wed Jul 10 18:20:19 2013
@@ -0,0 +1,74 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#ifndef QPID_LEGACYSTORE_PREPAREDTRANSACTION_H
+#define QPID_LEGACYSTORE_PREPAREDTRANSACTION_H
+
+#include <list>
+#include <map>
+#include <set>
+#include <string>
+#include <boost/shared_ptr.hpp>
+#include <boost/ptr_container/ptr_list.hpp>
+
+namespace mrg{
+namespace msgstore{
+
+typedef u_int64_t queue_id;
+typedef u_int64_t message_id;
+
+class LockedMappings
+{
+public:
+    typedef boost::shared_ptr<LockedMappings> shared_ptr;
+    typedef std::map<std::string, shared_ptr> map;
+    typedef std::pair<queue_id, message_id> idpair;
+    typedef std::list<idpair>::iterator iterator;
+
+    void add(queue_id queue, message_id message);
+    bool isLocked(queue_id queue, message_id message);
+    std::size_t size() { return locked.size(); }
+    iterator begin() { return locked.begin(); }
+    iterator end() { return locked.end(); }
+
+    static void add(LockedMappings::map& map, std::string& key, queue_id queue, message_id message);
+
+private:
+    std::list<idpair> locked;
+};
+
+struct PreparedTransaction
+{
+    typedef boost::ptr_list<PreparedTransaction> list;
+
+    const std::string xid;
+    const LockedMappings::shared_ptr enqueues;
+    const LockedMappings::shared_ptr dequeues;
+
+    PreparedTransaction(const std::string& xid, LockedMappings::shared_ptr enqueues, LockedMappings::shared_ptr dequeues);
+    bool isLocked(queue_id queue, message_id message);
+    static bool isLocked(PreparedTransaction::list& txns, queue_id queue, message_id message);
+    static PreparedTransaction::list::iterator getLockedPreparedTransaction(PreparedTransaction::list& txns, queue_id queue, message_id message);
+};
+
+}}
+
+#endif // ifndef QPID_LEGACYSTORE_PREPAREDTRANSACTION_H

Propchange: qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/PreparedTransaction.h
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/PreparedTransaction.h
------------------------------------------------------------------------------
    svn:keywords = Author Date Id Rev URL

Added: qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/StoreException.h
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/StoreException.h?rev=1501895&view=auto
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/StoreException.h (added)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/StoreException.h Wed Jul 10 18:20:19 2013
@@ -0,0 +1,56 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#ifndef QPID_LEGACYSTORE_STOREEXCEPTION_H
+#define QPID_LEGACYSTORE_STOREEXCEPTION_H
+
+#include "qpid/legacystore/IdDbt.h"
+#include <boost/format.hpp>
+
+namespace mrg{
+namespace msgstore{
+
+class StoreException : public std::exception
+{
+    std::string text;
+public:
+    StoreException(const std::string& _text) : text(_text) {}
+    StoreException(const std::string& _text, const DbException& cause) : text(_text + ": " + cause.what()) {}
+    virtual ~StoreException() throw() {}
+    virtual const char* what() const throw() { return text.c_str(); }
+};
+
+class StoreFullException : public StoreException
+{
+public:
+    StoreFullException(const std::string& _text) : StoreException(_text) {}
+    StoreFullException(const std::string& _text, const DbException& cause) : StoreException(_text, cause) {}
+    virtual ~StoreFullException() throw() {}
+
+};
+
+#define THROW_STORE_EXCEPTION(MESSAGE) throw StoreException(boost::str(boost::format("%s (%s:%d)") % (MESSAGE) % __FILE__ % __LINE__))
+#define THROW_STORE_EXCEPTION_2(MESSAGE, EXCEPTION) throw StoreException(boost::str(boost::format("%s (%s:%d)") % (MESSAGE) % __FILE__ % __LINE__), EXCEPTION)
+#define THROW_STORE_FULL_EXCEPTION(MESSAGE) throw StoreFullException(boost::str(boost::format("%s (%s:%d)") % (MESSAGE) % __FILE__ % __LINE__))
+
+}}
+
+#endif // ifndef QPID_LEGACYSTORE_STOREEXCEPTION_H

Propchange: qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/StoreException.h
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/StoreException.h
------------------------------------------------------------------------------
    svn:keywords = Author Date Id Rev URL

Added: qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/StorePlugin.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/StorePlugin.cpp?rev=1501895&view=auto
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/StorePlugin.cpp (added)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/StorePlugin.cpp Wed Jul 10 18:20:19 2013
@@ -0,0 +1,81 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "qpid/broker/Broker.h"
+#include "qpid/Plugin.h"
+#include "qpid/Options.h"
+#include "qpid/DataDir.h"
+#include "qpid/log/Statement.h"
+#include "qpid/legacystore/MessageStoreImpl.h"
+
+using mrg::msgstore::MessageStoreImpl;
+
+namespace qpid {
+namespace broker {
+
+using namespace std;
+
+struct StorePlugin : public Plugin {
+
+    MessageStoreImpl::StoreOptions options;
+    boost::shared_ptr<MessageStoreImpl> store;
+
+    Options* getOptions() { return &options; }
+
+    void earlyInitialize (Plugin::Target& target)
+    {
+        Broker* broker = dynamic_cast<Broker*>(&target);
+        if (!broker) return;
+        store.reset(new MessageStoreImpl(broker));
+        DataDir& dataDir = broker->getDataDir ();
+        if (options.storeDir.empty ())
+        {
+            if (!dataDir.isEnabled ())
+                throw Exception ("msgstore: If --data-dir is blank or --no-data-dir is specified, --store-dir must be present.");
+
+            options.storeDir = dataDir.getPath ();
+        }
+        store->init(&options);
+        boost::shared_ptr<qpid::broker::MessageStore> brokerStore(store);
+        broker->setStore(brokerStore);
+        target.addFinalizer(boost::bind(&StorePlugin::finalize, this));
+    }
+
+    void initialize(Plugin::Target& target)
+    {
+        Broker* broker = dynamic_cast<Broker*>(&target);
+        if (!broker) return;
+        if (!store) return;
+        QPID_LOG(info, "Enabling management instrumentation for the store.");
+        store->initManagement();
+    }
+
+    void finalize()
+    {
+        store.reset();
+    }
+
+    const char* id() {return "StorePlugin";}
+};
+
+static StorePlugin instance; // Static initialization.
+
+}} // namespace qpid::broker

Propchange: qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/StorePlugin.cpp
------------------------------------------------------------------------------
    svn:eol-style = native

Added: qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/TxnCtxt.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/TxnCtxt.cpp?rev=1501895&view=auto
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/TxnCtxt.cpp (added)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/TxnCtxt.cpp Wed Jul 10 18:20:19 2013
@@ -0,0 +1,184 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "qpid/legacystore/TxnCtxt.h"
+
+#include <sstream>
+
+#include "qpid/legacystore/jrnl/jexception.h"
+#include "qpid/legacystore/StoreException.h"
+
+namespace mrg {
+namespace msgstore {
+
+void TxnCtxt::completeTxn(bool commit) {
+    sync();
+    for (ipqItr i = impactedQueues.begin(); i != impactedQueues.end(); i++) {
+        commitTxn(static_cast<JournalImpl*>(*i), commit);
+    }
+    impactedQueues.clear();
+    if (preparedXidStorePtr)
+        commitTxn(preparedXidStorePtr, commit);
+}
+
+void TxnCtxt::commitTxn(JournalImpl* jc, bool commit) {
+    if (jc && loggedtx) { /* if using journal */
+        boost::intrusive_ptr<DataTokenImpl> dtokp(new DataTokenImpl);
+        dtokp->addRef();
+        dtokp->set_external_rid(true);
+        dtokp->set_rid(loggedtx->next());
+        try {
+            if (commit) {
+                jc->txn_commit(dtokp.get(), getXid());
+                sync();
+            } else {
+                jc->txn_abort(dtokp.get(), getXid());
+            }
+        } catch (const journal::jexception& e) {
+            THROW_STORE_EXCEPTION(std::string("Error commit") + e.what());
+        }
+    }
+}
+
+// static
+uuid_t TxnCtxt::uuid;
+
+// static
+IdSequence TxnCtxt::uuidSeq;
+
+// static
+bool TxnCtxt::staticInit = TxnCtxt::setUuid();
+
+// static
+bool TxnCtxt::setUuid() {
+    ::uuid_generate(uuid);
+    return true;
+}
+
+TxnCtxt::TxnCtxt(IdSequence* _loggedtx) : loggedtx(_loggedtx), dtokp(new DataTokenImpl), preparedXidStorePtr(0), txn(0) {
+    if (loggedtx) {
+//         // Human-readable tid: 53 bytes
+//         // uuit_t is a char[16]
+//         tid.reserve(53);
+//         u_int64_t* u1 = (u_int64_t*)uuid;
+//         u_int64_t* u2 = (u_int64_t*)(uuid + sizeof(u_int64_t));
+//         std::stringstream s;
+//         s << "tid:" << std::hex << std::setfill('0') << std::setw(16) << uuidSeq.next() << ":" << std::setw(16) << *u1 << std::setw(16) << *u2;
+//         tid.assign(s.str());
+
+        // Binary tid: 24 bytes
+        tid.reserve(24);
+        u_int64_t c = uuidSeq.next();
+        tid.append((char*)&c, sizeof(c));
+        tid.append((char*)&uuid, sizeof(uuid));
+    }
+}
+
+TxnCtxt::TxnCtxt(std::string _tid, IdSequence* _loggedtx) : loggedtx(_loggedtx), dtokp(new DataTokenImpl), preparedXidStorePtr(0), tid(_tid), txn(0) {}
+
+TxnCtxt::~TxnCtxt() { abort(); }
+
+void TxnCtxt::sync() {
+    if (loggedtx) {
+        try {
+            for (ipqItr i = impactedQueues.begin(); i != impactedQueues.end(); i++)
+                jrnl_flush(static_cast<JournalImpl*>(*i));
+            if (preparedXidStorePtr)
+                jrnl_flush(preparedXidStorePtr);
+            for (ipqItr i = impactedQueues.begin(); i != impactedQueues.end(); i++)
+                jrnl_sync(static_cast<JournalImpl*>(*i), &journal::jcntl::_aio_cmpl_timeout);
+            if (preparedXidStorePtr)
+                jrnl_sync(preparedXidStorePtr, &journal::jcntl::_aio_cmpl_timeout);
+        } catch (const journal::jexception& e) {
+            THROW_STORE_EXCEPTION(std::string("Error during txn sync: ") + e.what());
+        }
+    }
+}
+
+void TxnCtxt::jrnl_flush(JournalImpl* jc) {
+    if (jc && !(jc->is_txn_synced(getXid())))
+        jc->flush();
+}
+
+void TxnCtxt::jrnl_sync(JournalImpl* jc, timespec* timeout) {
+    if (!jc || jc->is_txn_synced(getXid()))
+        return;
+    while (jc->get_wr_aio_evt_rem()) {
+        if (jc->get_wr_events(timeout) == journal::jerrno::AIO_TIMEOUT && timeout)
+            THROW_STORE_EXCEPTION(std::string("Error: timeout waiting for TxnCtxt::jrnl_sync()"));
+    }
+}
+
+void TxnCtxt::begin(DbEnv* env, bool sync) {
+    int err;
+    try { err = env->txn_begin(0, &txn, 0); }
+    catch (const DbException&) { txn = 0; throw; }
+    if (err != 0) {
+        std::ostringstream oss;
+        oss << "Error: Env::txn_begin() returned error code: " << err;
+        THROW_STORE_EXCEPTION(oss.str());
+    }
+    if (sync)
+        globalHolder = AutoScopedLock(new qpid::sys::Mutex::ScopedLock(globalSerialiser));
+}
+
+void TxnCtxt::commit() {
+    if (txn) {
+        txn->commit(0);
+        txn = 0;
+        globalHolder.reset();
+    }
+}
+
+void TxnCtxt::abort(){
+    if (txn) {
+        txn->abort();
+        txn = 0;
+        globalHolder.reset();
+    }
+}
+
+DbTxn* TxnCtxt::get() { return txn; }
+
+bool TxnCtxt::isTPC() { return false; }
+
+const std::string& TxnCtxt::getXid() { return tid; }
+
+void TxnCtxt::addXidRecord(qpid::broker::ExternalQueueStore* queue) { impactedQueues.insert(queue); }
+
+void TxnCtxt::complete(bool commit) { completeTxn(commit); }
+
+bool TxnCtxt::impactedQueuesEmpty() { return impactedQueues.empty(); }
+
+DataTokenImpl* TxnCtxt::getDtok() { return dtokp.get(); }
+
+void TxnCtxt::incrDtokRef() { dtokp->addRef(); }
+
+void TxnCtxt::recoverDtok(const u_int64_t rid, const std::string xid) {
+    dtokp->set_rid(rid);
+    dtokp->set_wstate(DataTokenImpl::ENQ);
+    dtokp->set_xid(xid);
+    dtokp->set_external_rid(true);
+}
+
+TPCTxnCtxt::TPCTxnCtxt(const std::string& _xid, IdSequence* _loggedtx) : TxnCtxt(_loggedtx), xid(_xid) {}
+
+}}

Propchange: qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/TxnCtxt.cpp
------------------------------------------------------------------------------
    svn:eol-style = native

Added: qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/TxnCtxt.h
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/TxnCtxt.h?rev=1501895&view=auto
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/TxnCtxt.h (added)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/TxnCtxt.h Wed Jul 10 18:20:19 2013
@@ -0,0 +1,117 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#ifndef QPID_LEGACYSTORE_TXNCTXT_H
+#define QPID_LEGACYSTORE_TXNCTXT_H
+
+#include "db-inc.h"
+#include <memory>
+#include <set>
+#include <string>
+
+#include "qpid/legacystore/DataTokenImpl.h"
+#include "qpid/legacystore/IdSequence.h"
+#include "qpid/legacystore/JournalImpl.h"
+#include "qpid/broker/PersistableQueue.h"
+#include "qpid/broker/TransactionalStore.h"
+#include "qpid/sys/Mutex.h"
+#include "qpid/sys/uuid.h"
+
+#include <boost/intrusive_ptr.hpp>
+
+namespace mrg {
+namespace msgstore {
+
+class TxnCtxt : public qpid::broker::TransactionContext
+{
+  protected:
+    static qpid::sys::Mutex globalSerialiser;
+
+    static uuid_t uuid;
+    static IdSequence uuidSeq;
+    static bool staticInit;
+    static bool setUuid();
+
+    typedef std::set<qpid::broker::ExternalQueueStore*> ipqdef;
+    typedef ipqdef::iterator ipqItr;
+    typedef std::auto_ptr<qpid::sys::Mutex::ScopedLock> AutoScopedLock;
+
+    ipqdef impactedQueues; // list of Queues used in the txn
+    IdSequence* loggedtx;
+    boost::intrusive_ptr<DataTokenImpl> dtokp;
+    AutoScopedLock globalHolder;
+    JournalImpl* preparedXidStorePtr;
+
+    /**
+     * local txn id, if non XA.
+     */
+    std::string tid;
+    DbTxn* txn;
+
+    virtual void completeTxn(bool commit);
+    void commitTxn(JournalImpl* jc, bool commit);
+    void jrnl_flush(JournalImpl* jc);
+    void jrnl_sync(JournalImpl* jc, timespec* timeout);
+
+  public:
+    TxnCtxt(IdSequence* _loggedtx=NULL);
+    TxnCtxt(std::string _tid, IdSequence* _loggedtx);
+    virtual ~TxnCtxt();
+
+    /**
+     * Call to make sure all the data for this txn is written to safe store
+     *
+     *@return if the data successfully synced.
+     */
+    void sync();
+    void begin(DbEnv* env, bool sync = false);
+    void commit();
+    void abort();
+    DbTxn* get();
+    virtual bool isTPC();
+    virtual const std::string& getXid();
+
+    void addXidRecord(qpid::broker::ExternalQueueStore* queue);
+    inline void prepare(JournalImpl* _preparedXidStorePtr) { preparedXidStorePtr = _preparedXidStorePtr; }
+    void complete(bool commit);
+    bool impactedQueuesEmpty();
+    DataTokenImpl* getDtok();
+    void incrDtokRef();
+    void recoverDtok(const u_int64_t rid, const std::string xid);
+};
+
+
+class TPCTxnCtxt : public TxnCtxt, public qpid::broker::TPCTransactionContext
+{
+  protected:
+    const std::string xid;
+
+  public:
+    TPCTxnCtxt(const std::string& _xid, IdSequence* _loggedtx);
+    inline virtual bool isTPC() { return true; }
+    inline virtual const std::string& getXid() { return xid; }
+};
+
+}}
+
+#endif // ifndef QPID_LEGACYSTORE_TXNCTXT_H
+
+

Propchange: qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/TxnCtxt.h
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/TxnCtxt.h
------------------------------------------------------------------------------
    svn:keywords = Author Date Id Rev URL

Added: qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/aio.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/aio.cpp?rev=1501895&view=auto
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/aio.cpp (added)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/aio.cpp Wed Jul 10 18:20:19 2013
@@ -0,0 +1,41 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+/**
+ * \file aio.cpp
+ *
+ * Qpid asynchronous store plugin library
+ *
+ * File containing code for class mrg::journal::aio (libaio interface
+ * encapsulation). See comments in file aio.h for details.
+ *
+ * \author Kim van der Riet
+ */
+
+#include "qpid/legacystore/jrnl/aio.h"
+
+namespace mrg
+{
+namespace journal
+{
+
+} // namespace journal
+} // namespace mrg

Propchange: qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/aio.cpp
------------------------------------------------------------------------------
    svn:eol-style = native

Added: qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/aio.h
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/aio.h?rev=1501895&view=auto
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/aio.h (added)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/aio.h Wed Jul 10 18:20:19 2013
@@ -0,0 +1,153 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+/**
+ * \file aio.h
+ *
+ * Qpid asynchronous store plugin library
+ *
+ * This file contains an encapsulation of the libaio interface used
+ * by the journal.
+ *
+ * \author Kim van der Riet
+ */
+
+#ifndef QPID_LEGACYSTORE_JRNL_AIO_H
+#define QPID_LEGACYSTORE_JRNL_AIO_H
+
+#include <libaio.h>
+#include <cstring>
+#include <sys/types.h>
+#include <string.h>
+
+namespace mrg
+{
+namespace journal
+{
+
+typedef iocb aio_cb;
+typedef io_event aio_event;
+
+/**
+ * \brief This class is a C++ wrapper class for the libaio functions used by the journal. Note that only those
+ * functions used by the journal are included here. This is not a complete implementation of all libaio functions.
+ */
+class aio
+{
+public:
+    static inline int queue_init(int maxevents, io_context_t* ctxp)
+    {
+        return ::io_queue_init(maxevents, ctxp);
+    }
+
+    static inline int queue_release(io_context_t ctx)
+    {
+        return ::io_queue_release(ctx);
+    }
+
+    static inline int submit(io_context_t ctx, long nr, aio_cb* aios[])
+    {
+        return ::io_submit(ctx, nr, aios);
+    }
+
+    static inline int getevents(io_context_t ctx, long min_nr, long nr, aio_event* events, timespec* const timeout)
+    {
+        return ::io_getevents(ctx, min_nr, nr, events, timeout);
+    }
+
+    /**
+     * \brief This function allows iocbs to be initialized with a pointer that can be re-used. This prepares an
+     * aio_cb struct for read use. (This is a wrapper for libaio's ::io_prep_pread() function.)
+     *
+     * \param aiocbp Pointer to the aio_cb struct to be prepared.
+     * \param fd File descriptor to be used for read.
+     * \param buf Pointer to buffer in which read data is to be placed.
+     * \param count Number of bytes to read - buffer must be large enough.
+     * \param offset Offset within file from which data will be read.
+     */
+    static inline void prep_pread(aio_cb* aiocbp, int fd, void* buf, std::size_t count, int64_t offset)
+    {
+        ::io_prep_pread(aiocbp, fd, buf, count, offset);
+    }
+
+    /**
+     * \brief Special version of libaio's io_prep_pread() which preserves the value of the data pointer. This allows
+     * iocbs to be initialized with a pointer that can be re-used. This prepares a aio_cb struct for read use.
+     *
+     * \param aiocbp Pointer to the aio_cb struct to be prepared.
+     * \param fd File descriptor to be used for read.
+     * \param buf Pointer to buffer in which read data is to be placed.
+     * \param count Number of bytes to read - buffer must be large enough.
+     * \param offset Offset within file from which data will be read.
+     */
+    static inline void prep_pread_2(aio_cb* aiocbp, int fd, void* buf, std::size_t count, int64_t offset)
+    {
+        std::memset((void*) ((char*) aiocbp + sizeof(void*)), 0, sizeof(aio_cb) - sizeof(void*));
+        aiocbp->aio_fildes = fd;
+        aiocbp->aio_lio_opcode = IO_CMD_PREAD;
+        aiocbp->aio_reqprio = 0;
+        aiocbp->u.c.buf = buf;
+        aiocbp->u.c.nbytes = count;
+        aiocbp->u.c.offset = offset;
+    }
+
+    /**
+     * \brief This function allows iocbs to be initialized with a pointer that can be re-used. This function prepares
+     * an aio_cb struct for write use. (This is a wrapper for libaio's ::io_prep_pwrite() function.)
+     *
+     * \param aiocbp Pointer to the aio_cb struct to be prepared.
+     * \param fd File descriptor to be used for write.
+     * \param buf Pointer to buffer in which data to be written is located.
+     * \param count Number of bytes to write.
+     * \param offset Offset within file to which data will be written.
+     */
+    static inline void prep_pwrite(aio_cb* aiocbp, int fd, void* buf, std::size_t count, int64_t offset)
+    {
+        ::io_prep_pwrite(aiocbp, fd, buf, count, offset);
+    }
+
+    /**
+     * \brief Special version of libaio's io_prep_pwrite() which preserves the value of the data pointer. This allows
+     * iocbs to be initialized with a pointer that can be re-used. This function prepares an aio_cb struct for write
+     * use.
+     *
+     * \param aiocbp Pointer to the aio_cb struct to be prepared.
+     * \param fd File descriptor to be used for write.
+     * \param buf Pointer to buffer in which data to be written is located.
+     * \param count Number of bytes to write.
+     * \param offset Offset within file to which data will be written.
+     */
+    static inline void prep_pwrite_2(aio_cb* aiocbp, int fd, void* buf, std::size_t count, int64_t offset)
+    {
+        std::memset((void*) ((char*) aiocbp + sizeof(void*)), 0, sizeof(aio_cb) - sizeof(void*));
+        aiocbp->aio_fildes = fd;
+        aiocbp->aio_lio_opcode = IO_CMD_PWRITE;
+        aiocbp->aio_reqprio = 0;
+        aiocbp->u.c.buf = buf;
+        aiocbp->u.c.nbytes = count;
+        aiocbp->u.c.offset = offset;
+    }
+};
+
+} // namespace journal
+} // namespace mrg
+
+#endif // ifndef QPID_LEGACYSTORE_JRNL_AIO_H

Propchange: qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/aio.h
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/aio.h
------------------------------------------------------------------------------
    svn:keywords = Author Date Id Rev URL

Added: qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/aio_callback.h
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/aio_callback.h?rev=1501895&view=auto
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/aio_callback.h (added)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/aio_callback.h Wed Jul 10 18:20:19 2013
@@ -0,0 +1,57 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+/**
+ * \file aio_callback.h
+ *
+ * Qpid asynchronous store plugin library
+ *
+ * This file contains the definition for the AIO callback function
+ * pointer.
+ *
+ * \author Kim van der Riet
+ */
+
+#ifndef QPID_LEGACYSTORE_JRNL_AIO_CALLBACK_H
+#define QPID_LEGACYSTORE_JRNL_AIO_CALLBACK_H
+
+#include <vector>
+#include <sys/types.h>
+
+namespace mrg
+{
+namespace journal
+{
+
+    class data_tok;
+
+    class aio_callback
+    {
+    public:
+        virtual ~aio_callback() {}
+        virtual void wr_aio_cb(std::vector<data_tok*>& dtokl) = 0;
+        virtual void rd_aio_cb(std::vector<u_int16_t>& pil) = 0;
+    };
+
+} // namespace journal
+} // namespace mrg
+
+#endif // ifndef QPID_LEGACYSTORE_JRNL_AIO_CALLBACK_H

Propchange: qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/aio_callback.h
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/aio_callback.h
------------------------------------------------------------------------------
    svn:keywords = Author Date Id Rev URL

Added: qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/cvar.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/cvar.cpp?rev=1501895&view=auto
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/cvar.cpp (added)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/cvar.cpp Wed Jul 10 18:20:19 2013
@@ -0,0 +1,33 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+/**
+ * \file cvar.cpp
+ *
+ * Qpid asynchronous store plugin library
+ *
+ * File containing code for class mrg::journal::cvar (condition variable). See
+ * comments in file cvar.h for details.
+ *
+ * \author Kim van der Riet
+ */
+
+#include "qpid/legacystore/jrnl/cvar.h"

Propchange: qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/cvar.cpp
------------------------------------------------------------------------------
    svn:eol-style = native

Added: qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/cvar.h
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/cvar.h?rev=1501895&view=auto
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/cvar.h (added)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/cvar.h Wed Jul 10 18:20:19 2013
@@ -0,0 +1,87 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+/**
+ * \file cvar.h
+ *
+ * Qpid asynchronous store plugin library
+ *
+ * This file contains a posix condition variable class.
+ *
+ * \author Kim van der Riet
+ */
+
+#ifndef QPID_LEGACYSTORE_JRNL_CVAR_H
+#define QPID_LEGACYSTORE_JRNL_CVAR_H
+
+#include <cstring>
+#include "qpid/legacystore/jrnl/jerrno.h"
+#include "qpid/legacystore/jrnl/jexception.h"
+#include "qpid/legacystore/jrnl/smutex.h"
+#include "qpid/legacystore/jrnl/time_ns.h"
+#include <pthread.h>
+#include <sstream>
+
+namespace mrg
+{
+namespace journal
+{
+
+    // Ultra-simple thread condition variable class
+    class cvar
+    {
+    private:
+        const smutex& _sm;
+        pthread_cond_t _c;
+    public:
+        inline cvar(const smutex& sm) : _sm(sm) { ::pthread_cond_init(&_c, 0); }
+        inline ~cvar() { ::pthread_cond_destroy(&_c); }
+        inline void wait()
+        {
+            PTHREAD_CHK(::pthread_cond_wait(&_c, _sm.get()), "::pthread_cond_wait", "cvar", "wait");
+        }
+        inline void timedwait(timespec& ts)
+        {
+            PTHREAD_CHK(::pthread_cond_timedwait(&_c, _sm.get(), &ts), "::pthread_cond_timedwait", "cvar", "timedwait");
+        }
+        inline bool waitintvl(const long intvl_ns)
+        {
+            time_ns t; t.now(); t+=intvl_ns;
+            int ret = ::pthread_cond_timedwait(&_c, _sm.get(), &t);
+            if (ret == ETIMEDOUT)
+                return true;
+            PTHREAD_CHK(ret, "::pthread_cond_timedwait", "cvar", "waitintvl");
+            return false;
+        }
+        inline void signal()
+        {
+            PTHREAD_CHK(::pthread_cond_signal(&_c), "::pthread_cond_signal", "cvar", "notify");
+        }
+        inline void broadcast()
+        {
+            PTHREAD_CHK(::pthread_cond_broadcast(&_c), "::pthread_cond_broadcast", "cvar", "broadcast");
+        }
+    };
+
+} // namespace journal
+} // namespace mrg
+
+#endif // ifndef QPID_LEGACYSTORE_JRNL_CVAR_H

Propchange: qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/cvar.h
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/cvar.h
------------------------------------------------------------------------------
    svn:keywords = Author Date Id Rev URL

Added: qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/data_tok.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/data_tok.cpp?rev=1501895&view=auto
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/data_tok.cpp (added)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/data_tok.cpp Wed Jul 10 18:20:19 2013
@@ -0,0 +1,194 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+/**
+ * \file data_tok.cpp
+ *
+ * Qpid asynchronous store plugin library
+ *
+ * File containing code for class mrg::journal::data_tok (data block token).
+ * See comments in file data_tok.h for details.
+ *
+ * \author Kim van der Riet
+ */
+
+#include "qpid/legacystore/jrnl/data_tok.h"
+
+#include <iomanip>
+#include "qpid/legacystore/jrnl/jerrno.h"
+#include "qpid/legacystore/jrnl/jexception.h"
+#include "qpid/legacystore/jrnl/slock.h"
+#include <sstream>
+
+namespace mrg
+{
+namespace journal
+{
+
+// Static members
+
+u_int64_t data_tok::_cnt = 0;
+smutex data_tok::_mutex;
+
+data_tok::data_tok():
+    _wstate(NONE),
+    _rstate(UNREAD),
+    _dsize(0),
+    _dblks_written(0),
+    _dblks_read(0),
+    _pg_cnt(0),
+    _fid(0),
+    _rid(0),
+    _xid(),
+    _dequeue_rid(0),
+    _external_rid(false)
+{
+    slock s(_mutex);
+    _icnt = _cnt++;
+}
+
+data_tok::~data_tok() {}
+
+const char*
+data_tok::wstate_str() const
+{
+    return wstate_str(_wstate);
+}
+
+const char*
+data_tok::wstate_str(write_state wstate)
+{
+    switch (wstate)
+    {
+        case NONE:
+            return "NONE";
+        case ENQ_CACHED:
+            return "ENQ_CACHED";
+        case ENQ_PART:
+            return "ENQ_PART";
+        case ENQ_SUBM:
+            return "ENQ_SUBM";
+        case ENQ:
+            return "ENQ";
+        case DEQ_CACHED:
+            return "DEQ_CACHED";
+        case DEQ_PART:
+            return "DEQ_PART";
+        case DEQ_SUBM:
+            return "DEQ_SUBM";
+        case DEQ:
+            return "DEQ";
+        case ABORT_CACHED:
+            return "ABORT_CACHED";
+        case ABORT_PART:
+            return "ABORT_PART";
+        case ABORT_SUBM:
+            return "ABORT_SUBM";
+        case ABORTED:
+            return "ABORTED";
+        case COMMIT_CACHED:
+            return "COMMIT_CACHED";
+        case COMMIT_PART:
+            return "COMMIT_PART";
+        case COMMIT_SUBM:
+            return "COMMIT_SUBM";
+        case COMMITTED:
+            return "COMMITTED";
+    }
+    // Not using default: forces compiler to ensure all cases are covered.
+    return "<wstate unknown>";
+}
+
+const char*
+data_tok::rstate_str() const
+{
+    return rstate_str(_rstate);
+}
+
+const char*
+data_tok::rstate_str(read_state rstate)
+{
+    switch (rstate)
+    {
+        case NONE:
+            return "NONE";
+        case READ_PART:
+            return "READ_PART";
+        case SKIP_PART:
+            return "SKIP_PART";
+        case READ:
+            return "READ";
+    // Not using default: forces compiler to ensure all cases are covered.
+    }
+    return "<rstate unknown>";
+}
+
+void
+data_tok::set_rstate(const read_state rstate)
+{
+    if (_wstate != ENQ && rstate != UNREAD)
+    {
+        std::ostringstream oss;
+        oss << "Attempted to change read state to " << rstate_str(rstate);
+        oss << " while write state is not enqueued (wstate ENQ); wstate=" << wstate_str() << ".";
+        throw jexception(jerrno::JERR_DTOK_ILLEGALSTATE, oss.str(), "data_tok",
+                "set_rstate");
+    }
+    _rstate = rstate;
+}
+
+void
+data_tok::reset()
+{
+    _wstate = NONE;
+    _rstate = UNREAD;
+    _dsize = 0;
+    _dblks_written = 0;
+    _dblks_read = 0;
+    _pg_cnt = 0;
+    _fid = 0;
+    _rid = 0;
+    _xid.clear();
+}
+
+// debug aid
+std::string
+data_tok::status_str() const
+{
+    std::ostringstream oss;
+    oss << std::hex << std::setfill('0');
+    oss << "dtok id=0x" << _icnt << "; ws=" << wstate_str() << "; rs=" << rstate_str();
+    oss << "; fid=0x" << _fid << "; rid=0x" << _rid << "; xid=";
+    for (unsigned i=0; i<_xid.size(); i++)
+    {
+        if (isprint(_xid[i]))
+            oss << _xid[i];
+        else
+            oss << "/" << std::setw(2) << (int)((char)_xid[i]);
+    }
+    oss << "; drid=0x" << _dequeue_rid << " extrid=" << (_external_rid?"T":"F");
+    oss << "; ds=0x" << _dsize << "; dw=0x" << _dblks_written << "; dr=0x" << _dblks_read;
+    oss << " pc=0x" << _pg_cnt;
+    return oss.str();
+}
+
+} // namespace journal
+} // namespace mrg

Propchange: qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/data_tok.cpp
------------------------------------------------------------------------------
    svn:eol-style = native

Added: qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/data_tok.h
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/data_tok.h?rev=1501895&view=auto
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/data_tok.h (added)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/data_tok.h Wed Jul 10 18:20:19 2013
@@ -0,0 +1,172 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+/**
+ * \file data_tok.h
+ *
+ * Qpid asynchronous store plugin library
+ *
+ * File containing code for class mrg::journal::data_tok (data block token).
+ * See class documentation for details.
+ *
+ * \author Kim van der Riet
+ */
+
+#ifndef QPID_LEGACYSTORE_JRNL_DATA_TOK_H
+#define QPID_LEGACYSTORE_JRNL_DATA_TOK_H
+
+namespace mrg
+{
+namespace journal
+{
+class data_tok;
+}
+}
+
+#include <cassert>
+#include <cstddef>
+#include "qpid/legacystore/jrnl/smutex.h"
+#include <pthread.h>
+#include <string>
+#include <sys/types.h>
+
+namespace mrg
+{
+
+namespace journal
+{
+
+    /**
+    * \class data_tok
+    * \brief Data block token (data_tok) used to track wstate of a data block through asynchronous
+    *     I/O process
+    */
+    class data_tok
+    {
+    public:
+        // TODO: Fix this, separate write state from operation
+        // ie: wstate = NONE, CACHED, PART, SUBM, COMPL
+        //     op = ENQUEUE, DEQUEUE, ABORT, COMMIT
+        enum write_state
+        {
+            NONE,       ///< Data block not sent to journal
+            ENQ_CACHED, ///< Data block enqueue written to page cache
+            ENQ_PART,   ///< Data block part-submitted to AIO, waiting for page buffer to free up
+            ENQ_SUBM,   ///< Data block enqueue submitted to AIO
+            ENQ,        ///< Data block enqueue AIO write complete (enqueue complete)
+            DEQ_CACHED, ///< Data block dequeue written to page cache
+            DEQ_PART,   ///< Data block part-submitted to AIO, waiting for page buffer to free up
+            DEQ_SUBM,   ///< Data block dequeue submitted to AIO
+            DEQ,        ///< Data block dequeue AIO write complete (dequeue complete)
+            ABORT_CACHED,
+            ABORT_PART,
+            ABORT_SUBM,
+            ABORTED,
+            COMMIT_CACHED,
+            COMMIT_PART,
+            COMMIT_SUBM,
+            COMMITTED
+        };
+
+        enum read_state
+        {
+            UNREAD,     ///< Data block not read
+            READ_PART,  ///< Data block is part-read; waiting for page buffer to fill
+            SKIP_PART,  ///< Prev. dequeued dblock is part-skipped; waiting for page buffer to fill
+            READ        ///< Data block is fully read
+        };
+
+    protected:
+        static smutex _mutex;
+        static u_int64_t _cnt;
+        u_int64_t   _icnt;
+        write_state _wstate;        ///< Enqueued / dequeued state of data
+        read_state  _rstate;        ///< Read state of data
+        std::size_t _dsize;         ///< Data size in bytes
+        u_int32_t   _dblks_written; ///< Data blocks read/written
+        u_int32_t   _dblks_read;    ///< Data blocks read/written
+        u_int32_t   _pg_cnt;        ///< Page counter - incr for each page containing part of data
+        u_int16_t   _fid;           ///< FID containing header of enqueue record
+        u_int64_t   _rid;           ///< RID of data set by enqueue operation
+        std::string _xid;           ///< XID set by enqueue operation
+        u_int64_t   _dequeue_rid;   ///< RID of data set by dequeue operation
+        bool        _external_rid;  ///< Flag to indicate external setting of rid
+
+    public:
+        data_tok();
+        virtual ~data_tok();
+
+        inline u_int64_t id() const { return _icnt; }
+        inline write_state wstate() const { return _wstate; }
+        const char* wstate_str() const;
+        static const char* wstate_str(write_state wstate);
+        inline read_state rstate() const { return _rstate; }
+        const char* rstate_str() const;
+        static const char* rstate_str(read_state rstate);
+        inline bool is_writable() const { return _wstate == NONE || _wstate == ENQ_PART; }
+        inline bool is_enqueued() const { return _wstate == ENQ; }
+        inline bool is_readable() const { return _wstate == ENQ; }
+        inline bool is_read() const { return _rstate == READ; }
+        inline bool is_dequeueable() const { return _wstate == ENQ || _wstate == DEQ_PART; }
+        inline void set_wstate(const write_state wstate) { _wstate = wstate; }
+        void set_rstate(const read_state rstate);
+        inline std::size_t dsize() const { return _dsize; }
+        inline void set_dsize(std::size_t dsize) { _dsize = dsize; }
+
+        inline u_int32_t dblocks_written() const { return _dblks_written; }
+        inline void incr_dblocks_written(u_int32_t dblks_written)
+                { _dblks_written += dblks_written; }
+        inline void set_dblocks_written(u_int32_t dblks_written) { _dblks_written = dblks_written; }
+
+        inline u_int32_t dblocks_read() const { return _dblks_read; }
+        inline void incr_dblocks_read(u_int32_t dblks_read) { _dblks_read += dblks_read; }
+        inline void set_dblocks_read(u_int32_t dblks_read) { _dblks_read = dblks_read; }
+
+        inline u_int32_t pg_cnt() const { return _pg_cnt; }
+        inline u_int32_t incr_pg_cnt() { return ++_pg_cnt; }
+        inline u_int32_t decr_pg_cnt() { assert(_pg_cnt != 0); return --_pg_cnt; }
+
+        inline u_int16_t fid() const { return _fid; }
+        inline void set_fid(const u_int16_t fid) { _fid = fid; }
+        inline u_int64_t rid() const { return _rid; }
+        inline void set_rid(const u_int64_t rid) { _rid = rid; }
+        inline u_int64_t dequeue_rid() const {return _dequeue_rid; }
+        inline void set_dequeue_rid(const u_int64_t rid) { _dequeue_rid = rid; }
+        inline bool external_rid() const { return _external_rid; }
+        inline void set_external_rid(const bool external_rid) { _external_rid = external_rid; }
+
+        inline bool has_xid() const { return !_xid.empty(); }
+        inline const std::string& xid() const { return _xid; }
+        inline void clear_xid() { _xid.clear(); }
+        inline void set_xid(const std::string& xid) { _xid.assign(xid); }
+        inline void set_xid(const void* xidp, const std::size_t xid_len)
+                { _xid.assign((const char*)xidp, xid_len); }
+
+        void reset();
+
+        // debug aid
+        std::string status_str() const;
+    };
+
+} // namespace journal
+} // namespace mrg
+
+#endif // ifndef QPID_LEGACYSTORE_JRNL_DATA_TOK_H

Propchange: qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/data_tok.h
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/data_tok.h
------------------------------------------------------------------------------
    svn:keywords = Author Date Id Rev URL

Added: qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/deq_hdr.h
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/deq_hdr.h?rev=1501895&view=auto
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/deq_hdr.h (added)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/deq_hdr.h Wed Jul 10 18:20:19 2013
@@ -0,0 +1,141 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+/**
+ * \file deq_hdr.h
+ *
+ * Qpid asynchronous store plugin library
+ *
+ * File containing code for class mrg::journal::deq_hdr (dequeue record),
+ * used to dequeue a previously enqueued record.
+ *
+ * \author Kim van der Riet
+ */
+
+#ifndef QPID_LEGACYSTORE_JRNL_DEQ_HDR_H
+#define QPID_LEGACYSTORE_JRNL_DEQ_HDR_H
+
+#include <cstddef>
+#include "qpid/legacystore/jrnl/rec_hdr.h"
+
+namespace mrg
+{
+namespace journal
+{
+
+#pragma pack(1)
+
+    /**
+    * \brief Struct for dequeue record.
+    *
+    * Struct for dequeue record. If this record has a non-zero xidsize field (i.e., there is a
+    * valid XID), then this header is followed by the XID of xidsize bytes and a rec_tail. If,
+    * on the other hand, this record has a zero xidsize (i.e., there is no XID), then the rec_tail
+    * is absent.
+    *
+    * Note that this record had its own rid distinct from the rid of the record it is dequeueing.
+    * The rid field below is the rid of the dequeue record itself; the deq-rid field is the rid of a
+    * previous enqueue record being dequeued by this record.
+    *
+    * Record header info in binary format (32 bytes):
+    * <pre>
+    *   0                           7
+    * +---+---+---+---+---+---+---+---+  -+
+    * |     magic     | v | e | flags |   |
+    * +---+---+---+---+---+---+---+---+   | struct hdr
+    * |              rid              |   |
+    * +---+---+---+---+---+---+---+---+  -+
+    * |            deq-rid            |
+    * +---+---+---+---+---+---+---+---+
+    * |            xidsize            |
+    * +---+---+---+---+---+---+---+---+
+    * v = file version (If the format or encoding of this file changes, then this
+    *     number should be incremented)
+    * e = endian flag, false (0x00) for little endian, true (0x01) for big endian
+    * </pre>
+    *
+    * Note that journal files should be transferable between 32- and 64-bit
+    * hardware of the same endianness, but not between hardware of opposite
+    * entianness without some sort of binary conversion utility. Thus buffering
+    * will be needed for types that change size between 32- and 64-bit compiles.
+    */
+    struct deq_hdr : rec_hdr
+    {
+        u_int64_t _deq_rid;     ///< Record ID of dequeued record
+#if defined(JRNL_BIG_ENDIAN) && defined(JRNL_32_BIT)
+        u_int32_t _filler0;     ///< Big-endian filler for 32-bit size_t
+#endif
+        std::size_t _xidsize;   ///< XID size
+#if defined(JRNL_LITTLE_ENDIAN) && defined(JRNL_32_BIT)
+        u_int32_t _filler0;     ///< Little-endian filler for 32-bit size_t
+#endif
+        static const u_int16_t DEQ_HDR_TXNCMPLCOMMIT_MASK = 0x10;
+
+        /**
+        * \brief Default constructor, which sets all values to 0.
+        */
+        inline deq_hdr(): rec_hdr(), _deq_rid(0),
+#if defined(JRNL_BIG_ENDIAN) && defined(JRNL_32_BIT)
+            _filler0(0),
+#endif
+            _xidsize(0)
+#if defined(JRNL_LITTLE_ENDIAN) && defined(JRNL_32_BIT)
+            , _filler0(0)
+#endif
+        {}
+
+        /**
+        * \brief Convenience constructor which initializes values during construction.
+        */
+        inline deq_hdr(const u_int32_t magic, const u_int8_t version, const u_int64_t rid,
+                const u_int64_t deq_rid, const std::size_t xidsize, const bool owi,
+                const bool txn_coml_commit = false):
+                rec_hdr(magic, version, rid, owi), _deq_rid(deq_rid),
+#if defined(JRNL_BIG_ENDIAN) && defined(JRNL_32_BIT)
+            _filler0(0),
+#endif
+            _xidsize(xidsize)
+#if defined(JRNL_LITTLE_ENDIAN) && defined(JRNL_32_BIT)
+            , _filler0(0)
+#endif
+        { set_txn_coml_commit(txn_coml_commit); }
+
+
+        inline bool is_txn_coml_commit() const { return _uflag & DEQ_HDR_TXNCMPLCOMMIT_MASK; }
+
+        inline void set_txn_coml_commit(const bool commit)
+        {
+            _uflag = commit ? _uflag | DEQ_HDR_TXNCMPLCOMMIT_MASK :
+                    _uflag & (~DEQ_HDR_TXNCMPLCOMMIT_MASK);
+        }
+
+        /**
+        * \brief Returns the size of the header in bytes.
+        */
+        inline static std::size_t size() { return sizeof(deq_hdr); }
+    };
+
+#pragma pack()
+
+} // namespace journal
+} // namespace mrg
+
+#endif // ifndef QPID_LEGACYSTORE_JRNL_DEQ_HDR_H

Propchange: qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/deq_hdr.h
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/deq_hdr.h
------------------------------------------------------------------------------
    svn:keywords = Author Date Id Rev URL



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org