You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kp...@apache.org on 2013/10/22 21:09:58 UTC

svn commit: r1534736 [1/8] - in /qpid/trunk/qpid/cpp/src: ./ qpid/linearstore/ qpid/linearstore/jrnl/ qpid/linearstore/jrnl/utils/ tests/linearstore/

Author: kpvdr
Date: Tue Oct 22 19:09:56 2013
New Revision: 1534736

URL: http://svn.apache.org/r1534736
Log:
QPID-4984: WIP: Copy of work over from branch to trunk. Non-tx works with some known bugs, tx not yet operational.

Added:
    qpid/trunk/qpid/cpp/src/linearstore.cmake
    qpid/trunk/qpid/cpp/src/qpid/linearstore/
    qpid/trunk/qpid/cpp/src/qpid/linearstore/BindingDbt.cpp
    qpid/trunk/qpid/cpp/src/qpid/linearstore/BindingDbt.h
    qpid/trunk/qpid/cpp/src/qpid/linearstore/BufferValue.cpp
    qpid/trunk/qpid/cpp/src/qpid/linearstore/BufferValue.h
    qpid/trunk/qpid/cpp/src/qpid/linearstore/Cursor.h
    qpid/trunk/qpid/cpp/src/qpid/linearstore/DataTokenImpl.cpp
    qpid/trunk/qpid/cpp/src/qpid/linearstore/DataTokenImpl.h
    qpid/trunk/qpid/cpp/src/qpid/linearstore/IdDbt.cpp
    qpid/trunk/qpid/cpp/src/qpid/linearstore/IdDbt.h
    qpid/trunk/qpid/cpp/src/qpid/linearstore/IdSequence.cpp
    qpid/trunk/qpid/cpp/src/qpid/linearstore/IdSequence.h
    qpid/trunk/qpid/cpp/src/qpid/linearstore/JournalImpl.cpp
    qpid/trunk/qpid/cpp/src/qpid/linearstore/JournalImpl.h
    qpid/trunk/qpid/cpp/src/qpid/linearstore/JournalLogImpl.cpp
    qpid/trunk/qpid/cpp/src/qpid/linearstore/JournalLogImpl.h
    qpid/trunk/qpid/cpp/src/qpid/linearstore/MessageStoreImpl.cpp
    qpid/trunk/qpid/cpp/src/qpid/linearstore/MessageStoreImpl.h
    qpid/trunk/qpid/cpp/src/qpid/linearstore/PreparedTransaction.cpp
    qpid/trunk/qpid/cpp/src/qpid/linearstore/PreparedTransaction.h
    qpid/trunk/qpid/cpp/src/qpid/linearstore/StoreException.h
    qpid/trunk/qpid/cpp/src/qpid/linearstore/StorePlugin.cpp
    qpid/trunk/qpid/cpp/src/qpid/linearstore/TxnCtxt.cpp
    qpid/trunk/qpid/cpp/src/qpid/linearstore/TxnCtxt.h
    qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/
    qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/AtomicCounter.h
    qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/EmptyFilePool.cpp
    qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/EmptyFilePool.h
    qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/EmptyFilePoolManager.cpp
    qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/EmptyFilePoolManager.h
    qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/EmptyFilePoolPartition.cpp
    qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/EmptyFilePoolPartition.h
    qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/EmptyFilePoolTypes.h
    qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/JournalFile.cpp
    qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/JournalFile.h
    qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/JournalLog.cpp
    qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/JournalLog.h
    qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/LinearFileController.cpp
    qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/LinearFileController.h
    qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/RecoveryManager.cpp
    qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/RecoveryManager.h
    qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/aio.h
    qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/aio_callback.h
    qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/cvar.h
    qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/data_tok.cpp
    qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/data_tok.h
    qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/deq_rec.cpp
    qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/deq_rec.h
    qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/enq_map.cpp
    qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/enq_map.h
    qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/enq_rec.cpp
    qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/enq_rec.h
    qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/enums.h
    qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/jcfg.h
    qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/jcntl.cpp
    qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/jcntl.h
    qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/jdir.cpp
    qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/jdir.h
    qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/jerrno.cpp
    qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/jerrno.h
    qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/jexception.cpp
    qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/jexception.h
    qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/jrec.cpp
    qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/jrec.h
    qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/pmgr.cpp
    qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/pmgr.h
    qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/slock.h
    qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/smutex.h
    qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/time_ns.cpp
    qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/time_ns.h
    qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/txn_map.cpp
    qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/txn_map.h
    qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/txn_rec.cpp
    qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/txn_rec.h
    qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/utils/
    qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/utils/deq_hdr.c
    qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/utils/deq_hdr.h
    qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/utils/enq_hdr.c
    qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/utils/enq_hdr.h
    qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/utils/file_hdr.c
    qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/utils/file_hdr.h
    qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/utils/rec_hdr.c
    qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/utils/rec_hdr.h
    qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/utils/rec_tail.c
    qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/utils/rec_tail.h
    qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/utils/txn_hdr.c
    qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/utils/txn_hdr.h
    qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/wmgr.cpp
    qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/wmgr.h
    qpid/trunk/qpid/cpp/src/qpid/linearstore/management-schema.xml
    qpid/trunk/qpid/cpp/src/tests/linearstore/
    qpid/trunk/qpid/cpp/src/tests/linearstore/linearstoredirsetup.sh   (with props)
Modified:
    qpid/trunk/qpid/cpp/src/CMakeLists.txt

Modified: qpid/trunk/qpid/cpp/src/CMakeLists.txt
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/CMakeLists.txt?rev=1534736&r1=1534735&r2=1534736&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/CMakeLists.txt (original)
+++ qpid/trunk/qpid/cpp/src/CMakeLists.txt Tue Oct 22 19:09:56 2013
@@ -227,6 +227,7 @@ execute_process(COMMAND ${RUBY_EXECUTABL
                  ${CMAKE_CURRENT_SOURCE_DIR}/qpid/acl/management-schema.xml
                  ${CMAKE_CURRENT_SOURCE_DIR}/qpid/ha/management-schema.xml
                  ${CMAKE_CURRENT_SOURCE_DIR}/qpid/legacystore/management-schema.xml
+                 ${CMAKE_CURRENT_SOURCE_DIR}/qpid/linearstore/management-schema.xml
   )
   set(mgen_dir ${qpid-cpp_SOURCE_DIR}/managementgen)
   set(regen_mgmt OFF)
@@ -1509,6 +1510,10 @@ endif (NOT WIN32)
 # Legacy store
 #
 include (legacystore.cmake)
+#
+# Linear store
+#
+include (linearstore.cmake)
 
 # Now create the config file from all the info learned above.
 configure_file(${CMAKE_CURRENT_SOURCE_DIR}/config.h.cmake
@@ -1516,6 +1521,7 @@ configure_file(${CMAKE_CURRENT_SOURCE_DI
 add_subdirectory(qpid/store)
 add_subdirectory(tests)
 add_subdirectory(tests/legacystore)
+add_subdirectory(tests/linearstore)
 
 # Support for pkg-config
 

Added: qpid/trunk/qpid/cpp/src/linearstore.cmake
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/linearstore.cmake?rev=1534736&view=auto
==============================================================================
--- qpid/trunk/qpid/cpp/src/linearstore.cmake (added)
+++ qpid/trunk/qpid/cpp/src/linearstore.cmake Tue Oct 22 19:09:56 2013
@@ -0,0 +1,174 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+#
+# Legacy store library CMake fragment, to be included in CMakeLists.txt
+# 
+
+if (DEFINED linearstore_force)
+    set (linearstore_default ${linearstore_force})
+else (DEFINED linearstore_force)
+    set (linearstore_default OFF)
+    if (UNIX)
+        #
+        # Find required BerkelyDB
+        #
+        include (finddb.cmake)
+        if (DB_FOUND)
+	    #
+	    # find libaio
+	    #
+	    CHECK_LIBRARY_EXISTS (aio io_queue_init "" HAVE_AIO)
+	    CHECK_INCLUDE_FILES (libaio.h HAVE_AIO_H)
+	    if (HAVE_AIO AND HAVE_AIO_H)
+	        #
+		# find libuuid
+		#
+  	        CHECK_LIBRARY_EXISTS (uuid uuid_compare "" HAVE_UUID)
+		CHECK_INCLUDE_FILES(uuid/uuid.h HAVE_UUID_H)
+		IF (HAVE_UUID AND HAVE_UUID_H)
+		    #
+		    # allow linearstore to be built
+		    #
+		    set (linearstore_default ON)
+		ENDIF (HAVE_UUID AND HAVE_UUID_H)
+	    endif (HAVE_AIO AND HAVE_AIO_H)
+        endif (DB_FOUND)
+    endif (UNIX)
+endif (DEFINED linearstore_force)
+
+option(BUILD_LINEARSTORE "Build linearstore persistent store" ${linearstore_default})
+
+if (BUILD_LINEARSTORE)
+    if (NOT UNIX)
+        message(FATAL_ERROR "Linearstore produced only on Unix platforms")
+    endif (NOT UNIX)
+    if (NOT DB_FOUND)
+        message(FATAL_ERROR "Linearstore requires BerkeleyDB which is absent.")
+    endif (NOT DB_FOUND)
+    if (NOT HAVE_AIO)
+        message(FATAL_ERROR "Linearstore requires libaio which is absent.")
+    endif (NOT HAVE_AIO)
+    if (NOT HAVE_AIO_H)
+        message(FATAL_ERROR "Linearstore requires libaio.h which is absent.")
+    endif (NOT HAVE_AIO_H)
+    if (NOT HAVE_UUID)
+        message(FATAL_ERROR "Linearstore requires uuid which is absent.")
+    endif (NOT HAVE_UUID)
+    if (NOT HAVE_UUID_H)
+        message(FATAL_ERROR "Linearstore requires uuid.h which is absent.")
+    endif (NOT HAVE_UUID_H)
+
+    # Journal source files
+    set (linear_jrnl_SOURCES
+        qpid/linearstore/jrnl/data_tok.cpp
+        qpid/linearstore/jrnl/deq_rec.cpp
+        qpid/linearstore/jrnl/EmptyFilePool.cpp
+        qpid/linearstore/jrnl/EmptyFilePoolManager.cpp
+        qpid/linearstore/jrnl/EmptyFilePoolPartition.cpp
+        qpid/linearstore/jrnl/enq_map.cpp
+        qpid/linearstore/jrnl/enq_rec.cpp
+        qpid/linearstore/jrnl/jcntl.cpp
+        qpid/linearstore/jrnl/jdir.cpp
+        qpid/linearstore/jrnl/jerrno.cpp
+        qpid/linearstore/jrnl/jexception.cpp
+		qpid/linearstore/jrnl/JournalFile.cpp
+		qpid/linearstore/jrnl/JournalLog.cpp
+        qpid/linearstore/jrnl/jrec.cpp
+        qpid/linearstore/jrnl/LinearFileController.cpp
+        qpid/linearstore/jrnl/pmgr.cpp
+        qpid/linearstore/jrnl/RecoveryManager.cpp
+        qpid/linearstore/jrnl/time_ns.cpp
+        qpid/linearstore/jrnl/txn_map.cpp
+        qpid/linearstore/jrnl/txn_rec.cpp
+        qpid/linearstore/jrnl/wmgr.cpp
+    )
+
+    # linearstore source files
+    set (linear_store_SOURCES
+        qpid/linearstore/StorePlugin.cpp
+        qpid/linearstore/BindingDbt.cpp
+        qpid/linearstore/BufferValue.cpp
+        qpid/linearstore/DataTokenImpl.cpp
+        qpid/linearstore/IdDbt.cpp
+        qpid/linearstore/IdSequence.cpp
+        qpid/linearstore/JournalImpl.cpp
+        qpid/linearstore/MessageStoreImpl.cpp
+        qpid/linearstore/PreparedTransaction.cpp
+        qpid/linearstore/JournalLogImpl.cpp
+        qpid/linearstore/TxnCtxt.cpp
+    )
+
+    set (util_SOURCES
+        qpid/linearstore/jrnl/utils/deq_hdr.c
+        qpid/linearstore/jrnl/utils/enq_hdr.c
+        qpid/linearstore/jrnl/utils/file_hdr.c
+        qpid/linearstore/jrnl/utils/rec_hdr.c
+        qpid/linearstore/jrnl/utils/rec_tail.c
+        qpid/linearstore/jrnl/utils/txn_hdr.c
+    )
+
+    # linearstore include directories
+    get_property(dirs DIRECTORY ${CMAKE_CURRENT_SOURCE_DIR} PROPERTY INCLUDE_DIRECTORIES)
+    set (linear_include_DIRECTORIES
+        ${dirs}
+        ${CMAKE_CURRENT_SOURCE_DIR}/qpid/linearstore
+    )
+
+    if(NOT EXISTS ${CMAKE_CURRENT_BINARY_DIR}/db-inc.h)
+      message(STATUS "Including BDB from ${DB_INCLUDE_DIR}/db_cxx.h")
+        file(WRITE 
+             ${CMAKE_CURRENT_BINARY_DIR}/db-inc.h
+             "#include <${DB_INCLUDE_DIR}/db_cxx.h>\n")
+    endif()
+
+    add_library (linearstoreutils SHARED
+        ${util_SOURCES}
+    )
+
+    target_link_libraries (linearstoreutils
+        rt
+    )
+
+    add_library (linearstore MODULE
+        ${linear_jrnl_SOURCES}
+        ${linear_store_SOURCES}
+        ${linear_qmf_SOURCES}
+    )
+
+    set_target_properties (linearstore PROPERTIES
+        PREFIX ""
+        COMPILE_DEFINITIONS _IN_QPID_BROKER
+        OUTPUT_NAME linearstore
+        INCLUDE_DIRECTORIES "${linear_include_DIRECTORIES}"
+    )
+
+    target_link_libraries (linearstore
+        aio
+        uuid
+        qpidcommon qpidtypes qpidbroker linearstoreutils
+        ${DB_LIBRARY}
+    )
+
+install(TARGETS linearstore
+        DESTINATION ${QPIDD_MODULE_DIR}
+        COMPONENT ${QPID_COMPONENT_BROKER})
+
+else (BUILD_LINEARSTORE)
+    message(STATUS "Linearstore is excluded from build.")
+endif (BUILD_LINEARSTORE)

Added: qpid/trunk/qpid/cpp/src/qpid/linearstore/BindingDbt.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/linearstore/BindingDbt.cpp?rev=1534736&view=auto
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/linearstore/BindingDbt.cpp (added)
+++ qpid/trunk/qpid/cpp/src/qpid/linearstore/BindingDbt.cpp Tue Oct 22 19:09:56 2013
@@ -0,0 +1,50 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "qpid/linearstore/BindingDbt.h"
+
+namespace qpid {
+namespace linearstore {
+
+BindingDbt::BindingDbt(const qpid::broker::PersistableExchange& e, const qpid::broker::PersistableQueue& q, const std::string& k, const qpid::framing::FieldTable& a)
+  : data(new char[encodedSize(e, q, k, a)]),
+    buffer(data, encodedSize(e, q, k, a))
+{
+    buffer.putLongLong(q.getPersistenceId());
+    buffer.putShortString(q.getName());
+    buffer.putShortString(k);
+    buffer.put(a);
+
+    set_data(data);
+    set_size(encodedSize(e, q, k, a));
+}
+
+BindingDbt::~BindingDbt()
+{
+  delete [] data;
+}
+
+uint32_t BindingDbt::encodedSize(const qpid::broker::PersistableExchange& /*not used*/, const qpid::broker::PersistableQueue& q, const std::string& k, const qpid::framing::FieldTable& a)
+{
+    return 8 /*queue id*/ + q.getName().size() + 1 + k.size() + 1 + a.encodedSize();
+}
+
+}}

Added: qpid/trunk/qpid/cpp/src/qpid/linearstore/BindingDbt.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/linearstore/BindingDbt.h?rev=1534736&view=auto
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/linearstore/BindingDbt.h (added)
+++ qpid/trunk/qpid/cpp/src/qpid/linearstore/BindingDbt.h Tue Oct 22 19:09:56 2013
@@ -0,0 +1,56 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#ifndef QPID_LEGACYSTORE_BINDINGDBT_H
+#define QPID_LEGACYSTORE_BINDINGDBT_H
+
+#include "db-inc.h"
+#include "qpid/broker/PersistableExchange.h"
+#include "qpid/broker/PersistableQueue.h"
+#include "qpid/framing/Buffer.h"
+#include "qpid/framing/FieldTable.h"
+
+namespace qpid{
+namespace linearstore{
+
+class BindingDbt : public Dbt
+{
+    char* data;
+    qpid::framing::Buffer buffer;
+
+    static uint32_t encodedSize(const qpid::broker::PersistableExchange& e,
+                                const qpid::broker::PersistableQueue& q,
+                                const std::string& k,
+                                const qpid::framing::FieldTable& a);
+
+public:
+    BindingDbt(const qpid::broker::PersistableExchange& e,
+               const qpid::broker::PersistableQueue& q,
+               const std::string& k,
+               const qpid::framing::FieldTable& a);
+
+    virtual ~BindingDbt();
+
+};
+
+}}
+
+#endif // ifndef QPID_LEGACYSTORE_BINDINGDBT_H

Added: qpid/trunk/qpid/cpp/src/qpid/linearstore/BufferValue.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/linearstore/BufferValue.cpp?rev=1534736&view=auto
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/linearstore/BufferValue.cpp (added)
+++ qpid/trunk/qpid/cpp/src/qpid/linearstore/BufferValue.cpp Tue Oct 22 19:09:56 2013
@@ -0,0 +1,56 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "qpid/linearstore/BufferValue.h"
+
+namespace qpid {
+namespace linearstore {
+
+
+
+BufferValue::BufferValue(uint32_t size, uint64_t offset)
+    : data(new char[size]),
+      buffer(data, size)
+{
+    set_data(data);
+    set_size(size);
+    set_flags(DB_DBT_USERMEM | DB_DBT_PARTIAL);
+    set_doff(offset);
+    set_dlen(size);
+    set_ulen(size);
+}
+
+BufferValue::BufferValue(const qpid::broker::Persistable& p)
+  : data(new char[p.encodedSize()]),
+    buffer(data, p.encodedSize())
+{
+    p.encode(buffer);
+
+    set_data(data);
+    set_size(p.encodedSize());
+}
+
+BufferValue::~BufferValue()
+{
+    delete [] data;
+}
+
+}}

Added: qpid/trunk/qpid/cpp/src/qpid/linearstore/BufferValue.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/linearstore/BufferValue.h?rev=1534736&view=auto
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/linearstore/BufferValue.h (added)
+++ qpid/trunk/qpid/cpp/src/qpid/linearstore/BufferValue.h Tue Oct 22 19:09:56 2013
@@ -0,0 +1,46 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#ifndef QPID_LEGACYSTORE_BUFFERVALUE_H
+#define QPID_LEGACYSTORE_BUFFERVALUE_H
+
+#include "db-inc.h"
+#include "qpid/broker/Persistable.h"
+#include "qpid/framing/Buffer.h"
+
+namespace qpid{
+namespace linearstore{
+
+class BufferValue : public Dbt
+{
+    char* data;
+
+public:
+    qpid::framing::Buffer buffer;
+
+    BufferValue(uint32_t size, uint64_t offset);
+    BufferValue(const qpid::broker::Persistable& p);
+    virtual ~BufferValue();
+};
+
+}}
+
+#endif // ifndef QPID_LEGACYSTORE_BUFFERVALUE_H

Added: qpid/trunk/qpid/cpp/src/qpid/linearstore/Cursor.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/linearstore/Cursor.h?rev=1534736&view=auto
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/linearstore/Cursor.h (added)
+++ qpid/trunk/qpid/cpp/src/qpid/linearstore/Cursor.h Tue Oct 22 19:09:56 2013
@@ -0,0 +1,50 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#ifndef QPID_LEGACYSTORE_CURSOR_H
+#define QPID_LEGACYSTORE_CURSOR_H
+
+#include <boost/shared_ptr.hpp>
+#include "db-inc.h"
+
+namespace qpid{
+namespace linearstore{
+
+class Cursor
+{
+    Dbc* cursor;
+public:
+    typedef boost::shared_ptr<Db> db_ptr;
+
+    Cursor() : cursor(0) {}
+    virtual ~Cursor() { if(cursor) cursor->close(); }
+
+    void open(db_ptr db, DbTxn* txn, uint32_t flags = 0) { db->cursor(txn, &cursor, flags); }
+    void close() { if(cursor) cursor->close(); cursor = 0; }
+    Dbc* get() { return cursor; }
+    Dbc* operator->() { return cursor; }
+    bool next(Dbt& key, Dbt& value) { return cursor->get(&key, &value, DB_NEXT) == 0; }
+    bool current(Dbt& key, Dbt& value) {  return cursor->get(&key, &value, DB_CURRENT) == 0; }
+};
+
+}}
+
+#endif // ifndef QPID_LEGACYSTORE_CURSOR_H

Added: qpid/trunk/qpid/cpp/src/qpid/linearstore/DataTokenImpl.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/linearstore/DataTokenImpl.cpp?rev=1534736&view=auto
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/linearstore/DataTokenImpl.cpp (added)
+++ qpid/trunk/qpid/cpp/src/qpid/linearstore/DataTokenImpl.cpp Tue Oct 22 19:09:56 2013
@@ -0,0 +1,28 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "qpid/linearstore/DataTokenImpl.h"
+
+using namespace qpid::linearstore;
+
+DataTokenImpl::DataTokenImpl():data_tok() {}
+
+DataTokenImpl::~DataTokenImpl() {}

Added: qpid/trunk/qpid/cpp/src/qpid/linearstore/DataTokenImpl.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/linearstore/DataTokenImpl.h?rev=1534736&view=auto
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/linearstore/DataTokenImpl.h (added)
+++ qpid/trunk/qpid/cpp/src/qpid/linearstore/DataTokenImpl.h Tue Oct 22 19:09:56 2013
@@ -0,0 +1,47 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#ifndef QPID_LEGACYSTORE_DATATOKENIMPL_H
+#define QPID_LEGACYSTORE_DATATOKENIMPL_H
+
+#include "qpid/linearstore/jrnl/data_tok.h"
+#include "qpid/broker/PersistableMessage.h"
+#include <boost/intrusive_ptr.hpp>
+
+namespace qpid{
+namespace linearstore{
+
+class DataTokenImpl : public qpid::qls_jrnl::data_tok, public qpid::RefCounted
+{
+  private:
+    boost::intrusive_ptr<qpid::broker::PersistableMessage> sourceMsg;
+  public:
+    DataTokenImpl();
+    virtual ~DataTokenImpl();
+
+    inline boost::intrusive_ptr<qpid::broker::PersistableMessage>& getSourceMessage() { return sourceMsg; }
+    inline void setSourceMessage(const boost::intrusive_ptr<qpid::broker::PersistableMessage>& msg) { sourceMsg = msg; }
+};
+
+} // namespace msgstore
+} // namespace mrg
+
+#endif // ifndef QPID_LEGACYSTORE_DATATOKENIMPL_H

Added: qpid/trunk/qpid/cpp/src/qpid/linearstore/IdDbt.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/linearstore/IdDbt.cpp?rev=1534736&view=auto
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/linearstore/IdDbt.cpp (added)
+++ qpid/trunk/qpid/cpp/src/qpid/linearstore/IdDbt.cpp Tue Oct 22 19:09:56 2013
@@ -0,0 +1,42 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "qpid/linearstore/IdDbt.h"
+
+using namespace qpid::linearstore;
+
+IdDbt::IdDbt() : id(0)
+{
+    init();
+}
+
+IdDbt::IdDbt(uint64_t _id) : id(_id)
+{
+    init();
+}
+
+void IdDbt::init()
+{
+    set_data(&id);
+    set_size(sizeof(uint64_t));
+    set_ulen(sizeof(uint64_t));
+    set_flags(DB_DBT_USERMEM);
+}

Added: qpid/trunk/qpid/cpp/src/qpid/linearstore/IdDbt.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/linearstore/IdDbt.h?rev=1534736&view=auto
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/linearstore/IdDbt.h (added)
+++ qpid/trunk/qpid/cpp/src/qpid/linearstore/IdDbt.h Tue Oct 22 19:09:56 2013
@@ -0,0 +1,42 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#ifndef QPID_LEGACYSTORE_IDDBT_H
+#define QPID_LEGACYSTORE_IDDBT_H
+
+#include "db-inc.h"
+
+namespace qpid{
+namespace linearstore{
+
+class IdDbt : public Dbt
+{
+    void init();
+public:
+    uint64_t id;
+
+    IdDbt(uint64_t id);
+    IdDbt();
+};
+
+}}
+
+#endif // ifndef QPID_LEGACYSTORE_IDDBT_H

Added: qpid/trunk/qpid/cpp/src/qpid/linearstore/IdSequence.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/linearstore/IdSequence.cpp?rev=1534736&view=auto
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/linearstore/IdSequence.cpp (added)
+++ qpid/trunk/qpid/cpp/src/qpid/linearstore/IdSequence.cpp Tue Oct 22 19:09:56 2013
@@ -0,0 +1,40 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "qpid/linearstore/IdSequence.h"
+
+using namespace qpid::linearstore;
+using qpid::sys::Mutex;
+
+IdSequence::IdSequence() : id(1) {}
+
+uint64_t IdSequence::next()
+{
+    Mutex::ScopedLock guard(lock);
+    if (!id) id++; // avoid 0 when folding around
+    return id++;
+}
+
+void IdSequence::reset(uint64_t value)
+{
+    //deliberately not threadsafe, used only on recovery
+    id = value;
+}

Added: qpid/trunk/qpid/cpp/src/qpid/linearstore/IdSequence.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/linearstore/IdSequence.h?rev=1534736&view=auto
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/linearstore/IdSequence.h (added)
+++ qpid/trunk/qpid/cpp/src/qpid/linearstore/IdSequence.h Tue Oct 22 19:09:56 2013
@@ -0,0 +1,43 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#ifndef QPID_LEGACYSTORE_IDSEQUENCE_H
+#define QPID_LEGACYSTORE_IDSEQUENCE_H
+
+#include "qpid/framing/amqp_types.h"
+#include "qpid/sys/Mutex.h"
+
+namespace qpid{
+namespace linearstore{
+
+class IdSequence
+{
+    qpid::sys::Mutex lock;
+    uint64_t id;
+public:
+    IdSequence();
+    uint64_t next();
+    void reset(uint64_t value);
+};
+
+}}
+
+#endif // ifndef QPID_LEGACYSTORE_IDSEQUENCE_H

Added: qpid/trunk/qpid/cpp/src/qpid/linearstore/JournalImpl.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/linearstore/JournalImpl.cpp?rev=1534736&view=auto
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/linearstore/JournalImpl.cpp (added)
+++ qpid/trunk/qpid/cpp/src/qpid/linearstore/JournalImpl.cpp Tue Oct 22 19:09:56 2013
@@ -0,0 +1,653 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "qpid/linearstore/JournalImpl.h"
+
+#include "qpid/linearstore/JournalLogImpl.h"
+#include "qpid/linearstore/jrnl/jerrno.h"
+#include "qpid/linearstore/jrnl/jexception.h"
+#include "qpid/linearstore/jrnl/EmptyFilePool.h"
+#include "qpid/linearstore/StoreException.h"
+#include "qpid/log/Statement.h"
+#include "qpid/management/ManagementAgent.h"
+#include "qpid/sys/Monitor.h"
+#include "qpid/sys/Timer.h"
+
+#include "qmf/org/apache/qpid/linearstore/EventCreated.h"
+#include "qmf/org/apache/qpid/linearstore/EventEnqThresholdExceeded.h"
+#include "qmf/org/apache/qpid/linearstore/EventFull.h"
+#include "qmf/org/apache/qpid/linearstore/EventRecovered.h"
+
+using namespace qpid::qls_jrnl;
+using namespace qpid::linearstore;
+using qpid::management::ManagementAgent;
+namespace _qmf = qmf::org::apache::qpid::linearstore;
+
+InactivityFireEvent::InactivityFireEvent(JournalImpl* p, const qpid::sys::Duration timeout):
+    qpid::sys::TimerTask(timeout, "JournalInactive:"+p->id()), _parent(p) {}
+
+void InactivityFireEvent::fire() { qpid::sys::Mutex::ScopedLock sl(_ife_lock); if (_parent) _parent->flushFire(); }
+
+GetEventsFireEvent::GetEventsFireEvent(JournalImpl* p, const qpid::sys::Duration timeout):
+    qpid::sys::TimerTask(timeout, "JournalGetEvents:"+p->id()), _parent(p) {}
+
+void GetEventsFireEvent::fire() { qpid::sys::Mutex::ScopedLock sl(_gefe_lock); if (_parent) _parent->getEventsFire(); }
+
+JournalImpl::JournalImpl(qpid::sys::Timer& timer_,
+                         const std::string& journalId,
+                         const std::string& journalDirectory,
+                         JournalLogImpl& journalLogRef,
+                         const qpid::sys::Duration getEventsTimeout,
+                         const qpid::sys::Duration flushTimeout,
+                         qpid::management::ManagementAgent* a,
+                         DeleteCallback onDelete):
+                         jcntl(journalId, journalDirectory, journalLogRef),
+                         timer(timer_),
+                         _journalLogRef(journalLogRef),
+                         getEventsTimerSetFlag(false),
+//                         lastReadRid(0),
+                         writeActivityFlag(false),
+                         flushTriggeredFlag(true),
+//                         _xidp(0),
+//                         _datap(0),
+//                         _dlen(0),
+//                         _dtok(),
+//                         _external(false),
+                         deleteCallback(onDelete)
+{
+    getEventsFireEventsPtr = new GetEventsFireEvent(this, getEventsTimeout);
+    inactivityFireEventPtr = new InactivityFireEvent(this, flushTimeout);
+    {
+        timer.start();
+        timer.add(inactivityFireEventPtr);
+    }
+
+    initManagement(a);
+
+    QLS_LOG2(notice, _jid, "Created");
+    std::ostringstream oss;
+    oss << "Journal directory = \"" << journalDirectory << "\"";
+    QLS_LOG2(debug, _jid, oss.str());
+}
+
+JournalImpl::~JournalImpl()
+{
+    if (deleteCallback) deleteCallback(*this);
+    if (_init_flag && !_stop_flag){
+    	try { stop(true); } // NOTE: This will *block* until all outstanding disk aio calls are complete!
+        catch (const jexception& e) { QLS_LOG2(error, _jid, e.what()); }
+	}
+    getEventsFireEventsPtr->cancel();
+    inactivityFireEventPtr->cancel();
+//    free_read_buffers();
+
+    if (_mgmtObject.get() != 0) {
+        _mgmtObject->resourceDestroy();
+	_mgmtObject.reset();
+    }
+
+    QLS_LOG2(notice, _jid, "Destroyed");
+}
+
+void
+JournalImpl::initManagement(qpid::management::ManagementAgent* a)
+{
+    _agent = a;
+    if (_agent != 0)
+    {
+        _mgmtObject = _qmf::Journal::shared_ptr (
+            new _qmf::Journal(_agent, this));
+
+        _mgmtObject->set_name(_jid);
+        _mgmtObject->set_directory(_jdir.dirname());
+//        _mgmtObject->set_baseFileName(_base_filename);
+//        _mgmtObject->set_readPageSize(JRNL_RMGR_PAGE_SIZE * JRNL_SBLK_SIZE);
+//        _mgmtObject->set_readPages(JRNL_RMGR_PAGES);
+
+        // The following will be set on initialize(), but being properties, these must be set to 0 in the meantime
+        //_mgmtObject->set_initialFileCount(0);
+        //_mgmtObject->set_dataFileSize(0);
+        //_mgmtObject->set_currentFileCount(0);
+        _mgmtObject->set_writePageSize(0);
+        _mgmtObject->set_writePages(0);
+
+        _agent->addObject(_mgmtObject, 0, true);
+    }
+}
+
+
+void
+JournalImpl::initialize(qpid::qls_jrnl::EmptyFilePool* efpp_,
+                        const uint16_t wcache_num_pages,
+                        const uint32_t wcache_pgsize_sblks,
+                        qpid::qls_jrnl::aio_callback* const cbp)
+{
+//    efpp->createJournal(_jdir);
+//    QLS_LOG2(notice, _jid, "Initialized");
+//    std::ostringstream oss;
+////    oss << "Initialize; num_jfiles=" << num_jfiles << " jfsize_sblks=" << jfsize_sblks;
+//    oss << "Initialize; efpPartitionNumber=" << efpp_->getPartitionNumber();
+//    oss << " efpFileSizeKb=" << efpp_->fileSizeKib();
+//    oss << " wcache_pgsize_sblks=" << wcache_pgsize_sblks;
+//    oss << " wcache_num_pages=" << wcache_num_pages;
+//    QLS_LOG2(debug, _jid, oss.str());
+    jcntl::initialize(/*num_jfiles, auto_expand, ae_max_jfiles, jfsize_sblks,*/ efpp_, wcache_num_pages, wcache_pgsize_sblks, cbp);
+//    QLS_LOG2(debug, _jid, "Initialization complete");
+    // TODO: replace for linearstore: _lpmgr
+/*
+    if (_mgmtObject.get() != 0)
+    {
+        _mgmtObject->set_initialFileCount(_lpmgr.num_jfiles());
+        _mgmtObject->set_autoExpand(_lpmgr.is_ae());
+        _mgmtObject->set_currentFileCount(_lpmgr.num_jfiles());
+        _mgmtObject->set_maxFileCount(_lpmgr.ae_max_jfiles());
+        _mgmtObject->set_dataFileSize(_jfsize_sblks * JRNL_SBLK_SIZE);
+        _mgmtObject->set_writePageSize(wcache_pgsize_sblks * JRNL_SBLK_SIZE);
+        _mgmtObject->set_writePages(wcache_num_pages);
+    }
+    if (_agent != 0)
+        _agent->raiseEvent::(qmf::org::apache::qpid::linearstore::EventCreated(_jid, _jfsize_sblks * JRNL_SBLK_SIZE, _lpmgr.num_jfiles()),
+                           qpid::management::ManagementAgent::SEV_NOTE);
+*/
+}
+
+void
+JournalImpl::recover(/*const uint16_t num_jfiles,
+                     const bool auto_expand,
+                     const uint16_t ae_max_jfiles,
+                     const uint32_t jfsize_sblks,*/
+                     boost::shared_ptr<qpid::qls_jrnl::EmptyFilePoolManager> efpm,
+                     const uint16_t wcache_num_pages,
+                     const uint32_t wcache_pgsize_sblks,
+                     qpid::qls_jrnl::aio_callback* const cbp,
+                     boost::ptr_list<PreparedTransaction>* prep_tx_list_ptr,
+                     uint64_t& highest_rid,
+                     uint64_t queue_id)
+{
+    std::ostringstream oss1;
+//    oss1 << "Recover; num_jfiles=" << num_jfiles << " jfsize_sblks=" << jfsize_sblks;
+    oss1 << "Recover;";
+    oss1 << " queue_id = 0x" << std::hex << queue_id << std::dec;
+    oss1 << " wcache_pgsize_sblks=" << wcache_pgsize_sblks;
+    oss1 << " wcache_num_pages=" << wcache_num_pages;
+    QLS_LOG2(debug, _jid, oss1.str());
+    // TODO: replace for linearstore: _lpmgr
+/*
+    if (_mgmtObject.get() != 0)
+    {
+        _mgmtObject->set_initialFileCount(_lpmgr.num_jfiles());
+        _mgmtObject->set_autoExpand(_lpmgr.is_ae());
+        _mgmtObject->set_currentFileCount(_lpmgr.num_jfiles());
+        _mgmtObject->set_maxFileCount(_lpmgr.ae_max_jfiles());
+        _mgmtObject->set_dataFileSize(_jfsize_sblks * JRNL_SBLK_SIZE);
+        _mgmtObject->set_writePageSize(wcache_pgsize_sblks * JRNL_SBLK_SIZE);
+        _mgmtObject->set_writePages(wcache_num_pages);
+    }
+*/
+
+    if (prep_tx_list_ptr) {
+        // Create list of prepared xids
+        std::vector<std::string> prep_xid_list;
+        for (PreparedTransaction::list::iterator i = prep_tx_list_ptr->begin(); i != prep_tx_list_ptr->end(); i++) {
+            prep_xid_list.push_back(i->xid);
+        }
+
+        jcntl::recover(/*num_jfiles, auto_expand, ae_max_jfiles, jfsize_sblks,*/efpm.get(), wcache_num_pages, wcache_pgsize_sblks,
+                cbp, &prep_xid_list, highest_rid);
+    } else {
+        jcntl::recover(/*num_jfiles, auto_expand, ae_max_jfiles, jfsize_sblks,*/efpm.get(), wcache_num_pages, wcache_pgsize_sblks,
+                cbp, 0, highest_rid);
+    }
+
+    // Populate PreparedTransaction lists from _tmap
+    if (prep_tx_list_ptr)
+    {
+        for (PreparedTransaction::list::iterator i = prep_tx_list_ptr->begin(); i != prep_tx_list_ptr->end(); i++) {
+            txn_data_list tdl = _tmap.get_tdata_list(i->xid); // tdl will be empty if xid not found
+            for (tdl_itr tdl_itr = tdl.begin(); tdl_itr < tdl.end(); tdl_itr++) {
+                if (tdl_itr->_enq_flag) { // enqueue op
+                    i->enqueues->add(queue_id, tdl_itr->_rid);
+                } else { // dequeue op
+                    i->dequeues->add(queue_id, tdl_itr->_drid);
+                }
+            }
+        }
+    }
+    std::ostringstream oss2;
+    oss2 << "Recover phase 1 complete; highest rid found = 0x" << std::hex << highest_rid;
+    oss2 << std::dec << "; emap.size=" << _emap.size() << "; tmap.size=" << _tmap.size();
+    oss2 << "; journal now read-only.";
+    QLS_LOG2(debug, _jid, oss2.str());
+
+    if (_mgmtObject.get() != 0)
+    {
+        _mgmtObject->inc_recordDepth(_emap.size());
+        _mgmtObject->inc_enqueues(_emap.size());
+        _mgmtObject->inc_txn(_tmap.size());
+        _mgmtObject->inc_txnEnqueues(_tmap.enq_cnt());
+        _mgmtObject->inc_txnDequeues(_tmap.deq_cnt());
+    }
+}
+
+void
+JournalImpl::recover_complete()
+{
+    jcntl::recover_complete();
+    QLS_LOG2(debug, _jid, "Recover phase 2 complete; journal now writable.");
+    // TODO: replace for linearstore: _lpmgr
+/*
+    if (_agent != 0)
+        _agent->raiseEvent(qmf::org::apache::qpid::linearstore::EventRecovered(_jid, _jfsize_sblks * JRNL_SBLK_SIZE, _lpmgr.num_jfiles(),
+                        _emap.size(), _tmap.size(), _tmap.enq_cnt(), _tmap.deq_cnt()), qpid::management::ManagementAgent::SEV_NOTE);
+*/
+}
+
+//#define MAX_AIO_SLEEPS 1000000 // tot: ~10 sec
+//#define AIO_SLEEP_TIME_US   10 // 0.01 ms
+// Return true if content is recovered from store; false if content is external and must be recovered from an external store.
+// Throw exception for all errors.
+/*
+bool
+JournalImpl::loadMsgContent(uint64_t rid, std::string& data, size_t length, size_t offset)
+{
+    qpid::sys::Mutex::ScopedLock sl(_read_lock);
+    if (_dtok.rid() != rid)
+    {
+        // Free any previous msg
+        free_read_buffers();
+
+        // Last read encountered out-of-order rids, check if this rid is in that list
+        bool oooFlag = false;
+        for (std::vector<uint64_t>::const_iterator i=oooRidList.begin(); i!=oooRidList.end() && !oooFlag; i++) {
+            if (*i == rid) {
+                oooFlag = true;
+            }
+        }
+
+        // TODO: This is a brutal approach - very inefficient and slow. Rather introduce a system of remembering
+        // jumpover points and allow the read to jump back to the first known jumpover point - but this needs
+        // a mechanism in rrfc to accomplish it. Also helpful is a struct containing a journal address - a
+        // combination of lid/offset.
+        // NOTE: The second part of the if stmt (rid < lastReadRid) is required to handle browsing.
+        if (oooFlag || rid < lastReadRid) {
+            _rmgr.invalidate();
+            oooRidList.clear();
+        }
+        _dlen = 0;
+        _dtok.reset();
+        _dtok.set_wstate(DataTokenImpl::ENQ);
+        _dtok.set_rid(0);
+        _external = false;
+        size_t xlen = 0;
+        bool transient = false;
+        bool done = false;
+        bool rid_found = false;
+        while (!done) {
+            iores res = read_data_record(&_datap, _dlen, &_xidp, xlen, transient, _external, &_dtok);
+            switch (res) {
+                case qpid::qls_jrnl::RHM_IORES_SUCCESS:
+                    if (_dtok.rid() != rid) {
+                        // Check if this is an out-of-order rid that may impact next read
+                        if (_dtok.rid() > rid)
+                            oooRidList.push_back(_dtok.rid());
+                        free_read_buffers();
+                        // Reset data token for next read
+                        _dlen = 0;
+                        _dtok.reset();
+                        _dtok.set_wstate(DataTokenImpl::ENQ);
+                        _dtok.set_rid(0);
+                    } else {
+                        rid_found = _dtok.rid() == rid;
+                        lastReadRid = rid;
+                        done = true;
+                    }
+                    break;
+                case qpid::qls_jrnl::RHM_IORES_PAGE_AIOWAIT:
+                    if (get_wr_events(&_aio_cmpl_timeout) == qpid::qls_jrnl::jerrno::AIO_TIMEOUT) {
+                        std::stringstream ss;
+                        ss << "read_data_record() returned " << qpid::qls_jrnl::iores_str(res);
+                        ss << "; timed out waiting for page to be processed.";
+                        throw jexception(qpid::qls_jrnl::jerrno::JERR__TIMEOUT, ss.str().c_str(), "JournalImpl",
+                            "loadMsgContent");
+                    }
+                    break;
+                default:
+                    std::stringstream ss;
+                    ss << "read_data_record() returned " << qpid::qls_jrnl::iores_str(res);
+                    throw jexception(qpid::qls_jrnl::jerrno::JERR__UNEXPRESPONSE, ss.str().c_str(), "JournalImpl",
+                        "loadMsgContent");
+            }
+        }
+        if (!rid_found) {
+            std::stringstream ss;
+            ss << "read_data_record() was unable to find rid 0x" << std::hex << rid << std::dec;
+            ss << " (" << rid << "); last rid found was 0x" << std::hex << _dtok.rid() << std::dec;
+            ss << " (" << _dtok.rid() << ")";
+            throw jexception(qpid::qls_jrnl::jerrno::JERR__RECNFOUND, ss.str().c_str(), "JournalImpl", "loadMsgContent");
+        }
+    }
+
+    if (_external) return false;
+
+    uint32_t hdr_offs = qpid::framing::Buffer(static_cast<char*>(_datap), sizeof(uint32_t)).getLong() + sizeof(uint32_t);
+    if (hdr_offs + offset + length > _dlen) {
+        data.append((const char*)_datap + hdr_offs + offset, _dlen - hdr_offs - offset);
+    } else {
+        data.append((const char*)_datap + hdr_offs + offset, length);
+    }
+    return true;
+}
+*/
+
+void
+JournalImpl::enqueue_data_record(const void* const data_buff, const size_t tot_data_len,
+        const size_t this_data_len, data_tok* dtokp, const bool transient)
+{
+    handleIoResult(jcntl::enqueue_data_record(data_buff, tot_data_len, this_data_len, dtokp, transient));
+
+    if (_mgmtObject.get() != 0)
+    {
+        _mgmtObject->inc_enqueues();
+        _mgmtObject->inc_recordDepth();
+    }
+}
+
+void
+JournalImpl::enqueue_extern_data_record(const size_t tot_data_len, data_tok* dtokp,
+        const bool transient)
+{
+    handleIoResult(jcntl::enqueue_extern_data_record(tot_data_len, dtokp, transient));
+
+    if (_mgmtObject.get() != 0)
+    {
+        _mgmtObject->inc_enqueues();
+        _mgmtObject->inc_recordDepth();
+    }
+}
+
+void
+JournalImpl::enqueue_txn_data_record(const void* const data_buff, const size_t tot_data_len,
+        const size_t this_data_len, data_tok* dtokp, const std::string& xid, const bool transient)
+{
+    bool txn_incr = _mgmtObject.get() != 0 ? _tmap.in_map(xid) : false;
+
+    handleIoResult(jcntl::enqueue_txn_data_record(data_buff, tot_data_len, this_data_len, dtokp, xid, transient));
+
+    if (_mgmtObject.get() != 0)
+    {
+        if (!txn_incr) // If this xid was not in _tmap, it will be now...
+            _mgmtObject->inc_txn();
+        _mgmtObject->inc_enqueues();
+        _mgmtObject->inc_txnEnqueues();
+        _mgmtObject->inc_recordDepth();
+    }
+}
+
+void
+JournalImpl::enqueue_extern_txn_data_record(const size_t tot_data_len, data_tok* dtokp,
+        const std::string& xid, const bool transient)
+{
+    bool txn_incr = _mgmtObject.get() != 0 ? _tmap.in_map(xid) : false;
+
+    handleIoResult(jcntl::enqueue_extern_txn_data_record(tot_data_len, dtokp, xid, transient));
+
+    if (_mgmtObject.get() != 0)
+    {
+        if (!txn_incr) // If this xid was not in _tmap, it will be now...
+            _mgmtObject->inc_txn();
+        _mgmtObject->inc_enqueues();
+        _mgmtObject->inc_txnEnqueues();
+        _mgmtObject->inc_recordDepth();
+    }
+}
+
+void
+JournalImpl::dequeue_data_record(data_tok* const dtokp, const bool txn_coml_commit)
+{
+    handleIoResult(jcntl::dequeue_data_record(dtokp, txn_coml_commit));
+
+    if (_mgmtObject.get() != 0)
+    {
+        _mgmtObject->inc_dequeues();
+        _mgmtObject->inc_txnDequeues();
+        _mgmtObject->dec_recordDepth();
+    }
+}
+
+void
+JournalImpl::dequeue_txn_data_record(data_tok* const dtokp, const std::string& xid, const bool txn_coml_commit)
+{
+    bool txn_incr = _mgmtObject.get() != 0 ? _tmap.in_map(xid) : false;
+
+    handleIoResult(jcntl::dequeue_txn_data_record(dtokp, xid, txn_coml_commit));
+
+    if (_mgmtObject.get() != 0)
+    {
+        if (!txn_incr) // If this xid was not in _tmap, it will be now...
+            _mgmtObject->inc_txn();
+        _mgmtObject->inc_dequeues();
+        _mgmtObject->inc_txnDequeues();
+        _mgmtObject->dec_recordDepth();
+    }
+}
+
+void
+JournalImpl::txn_abort(data_tok* const dtokp, const std::string& xid)
+{
+    handleIoResult(jcntl::txn_abort(dtokp, xid));
+
+    if (_mgmtObject.get() != 0)
+    {
+        _mgmtObject->dec_txn();
+        _mgmtObject->inc_txnAborts();
+    }
+}
+
+void
+JournalImpl::txn_commit(data_tok* const dtokp, const std::string& xid)
+{
+    handleIoResult(jcntl::txn_commit(dtokp, xid));
+
+    if (_mgmtObject.get() != 0)
+    {
+        _mgmtObject->dec_txn();
+        _mgmtObject->inc_txnCommits();
+    }
+}
+
+void
+JournalImpl::stop(bool block_till_aio_cmpl)
+{
+    InactivityFireEvent* ifep = dynamic_cast<InactivityFireEvent*>(inactivityFireEventPtr.get());
+    assert(ifep); // dynamic_cast can return null if the cast fails
+    ifep->cancel();
+    jcntl::stop(block_till_aio_cmpl);
+
+    if (_mgmtObject.get() != 0) {
+        _mgmtObject->resourceDestroy();
+        _mgmtObject.reset();
+    }
+}
+
+iores
+JournalImpl::flush(const bool block_till_aio_cmpl)
+{
+    const iores res = jcntl::flush(block_till_aio_cmpl);
+    {
+        qpid::sys::Mutex::ScopedLock sl(_getf_lock);
+        if (_wmgr.get_aio_evt_rem() && !getEventsTimerSetFlag) { setGetEventTimer(); }
+    }
+    return res;
+}
+
+/*
+void
+JournalImpl::log(qpid::qls_jrnl::log_level ll, const std::string& log_stmt) const
+{
+    log(ll, log_stmt.c_str());
+}
+
+void
+JournalImpl::log(qpid::qls_jrnl::log_level ll, const char* const log_stmt) const
+{
+    switch (ll)
+    {
+        case LOG_TRACE:  QPID_LOG(trace, "QLS Journal \"" << _jid << "\": " << log_stmt); break;
+        case LOG_DEBUG:  QPID_LOG(debug, "QLS Journal \"" << _jid << "\": " << log_stmt); break;
+        case LOG_INFO:  QPID_LOG(info, "QLS Journal \"" << _jid << "\": " << log_stmt); break;
+        case LOG_NOTICE:  QPID_LOG(notice, "QLS Journal \"" << _jid << "\": " << log_stmt); break;
+        case LOG_WARN:  QPID_LOG(warning, "QLS Journal \"" << _jid << "\": " << log_stmt); break;
+        case LOG_ERROR: QPID_LOG(error, "QLS Journal \"" << _jid << "\": " << log_stmt); break;
+        case LOG_CRITICAL: QPID_LOG(critical, "QLS Journal \"" << _jid << "\": " << log_stmt); break;
+    }
+}
+*/
+
+void
+JournalImpl::getEventsFire()
+{
+    qpid::sys::Mutex::ScopedLock sl(_getf_lock);
+    getEventsTimerSetFlag = false;
+    if (_wmgr.get_aio_evt_rem()) { jcntl::get_wr_events(0); }
+    if (_wmgr.get_aio_evt_rem()) { setGetEventTimer(); }
+}
+
+void
+JournalImpl::flushFire()
+{
+    if (writeActivityFlag) {
+        writeActivityFlag = false;
+        flushTriggeredFlag = false;
+    } else {
+        if (!flushTriggeredFlag) {
+            flush();
+            flushTriggeredFlag = true;
+        }
+    }
+    inactivityFireEventPtr->setupNextFire();
+    {
+        timer.add(inactivityFireEventPtr);
+    }
+}
+
+void
+JournalImpl::wr_aio_cb(std::vector<data_tok*>& dtokl)
+{
+    for (std::vector<data_tok*>::const_iterator i=dtokl.begin(); i!=dtokl.end(); i++)
+    {
+        DataTokenImpl* dtokp = static_cast<DataTokenImpl*>(*i);
+	    if (/*!is_stopped() &&*/ dtokp->getSourceMessage())
+	    {
+		    switch (dtokp->wstate())
+		    {
+ 			    case data_tok::ENQ:
+//std::cout << "<<<>>> JournalImpl::wr_aio_cb() ENQ dtokp rid=0x" << std::hex << dtokp->rid() << std::dec << std::endl << std::flush; // DEBUG
+             	    dtokp->getSourceMessage()->enqueueComplete();
+ 				    break;
+			    case data_tok::DEQ:
+//std::cout << "<<<>>> JournalImpl::wr_aio_cb() DEQ dtokp rid=0x" << std::hex << dtokp->rid() << std::dec << std::endl << std::flush; // DEBUG
+/* Don't need to signal until we have a way to ack completion of dequeue in AMQP
+                    dtokp->getSourceMessage()->dequeueComplete();
+                    if ( dtokp->getSourceMessage()->isDequeueComplete()  ) // clear id after last dequeue
+                        dtokp->getSourceMessage()->setPersistenceId(0);
+*/
+				    break;
+			    default: ;
+		    }
+	    }
+	    dtokp->release();
+    }
+}
+
+void
+JournalImpl::rd_aio_cb(std::vector<uint16_t>& /*pil*/)
+{}
+
+/*
+void
+JournalImpl::free_read_buffers()
+{
+    if (_xidp) {
+        ::free(_xidp);
+        _xidp = 0;
+        _datap = 0;
+    } else if (_datap) {
+        ::free(_datap);
+        _datap = 0;
+    }
+}
+*/
+
+void
+JournalImpl::createStore() {
+
+}
+
+void
+JournalImpl::handleIoResult(const iores r)
+{
+    writeActivityFlag = true;
+    switch (r)
+    {
+        case qpid::qls_jrnl::RHM_IORES_SUCCESS:
+            return;
+/*
+        case qpid::qls_jrnl::RHM_IORES_FULL:
+            {
+                std::ostringstream oss;
+                oss << "Journal full on queue \"" << _jid << "\".";
+                QLS_LOG2(critical, _jid, "Journal full.");
+                if (_agent != 0)
+                    _agent->raiseEvent(qmf::org::apache::qpid::linearstore::EventFull(_jid, "Journal full"), qpid::management::ManagementAgent::SEV_ERROR);
+                THROW_STORE_FULL_EXCEPTION(oss.str());
+            }
+*/
+        default:
+            {
+                std::ostringstream oss;
+                oss << "Unexpected I/O response (" << qpid::qls_jrnl::iores_str(r) << ").";
+                QLS_LOG2(error, _jid, oss.str());
+                THROW_STORE_FULL_EXCEPTION(oss.str());
+            }
+    }
+}
+
+qpid::management::Manageable::status_t JournalImpl::ManagementMethod (uint32_t /*methodId*/,
+                                                                      qpid::management::Args& /*args*/,
+                                                                      std::string& /*text*/)
+{
+    Manageable::status_t status = Manageable::STATUS_UNKNOWN_METHOD;
+
+/*
+    switch (methodId)
+    {
+    case _qmf::Journal::METHOD_EXPAND :
+        //_qmf::ArgsJournalExpand& eArgs = (_qmf::ArgsJournalExpand&) args;
+
+        // Implement "expand" using eArgs.i_by (expand-by argument)
+
+        status = Manageable::STATUS_NOT_IMPLEMENTED;
+        break;
+    }
+*/
+
+    return status;
+}

Added: qpid/trunk/qpid/cpp/src/qpid/linearstore/JournalImpl.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/linearstore/JournalImpl.h?rev=1534736&view=auto
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/linearstore/JournalImpl.h (added)
+++ qpid/trunk/qpid/cpp/src/qpid/linearstore/JournalImpl.h Tue Oct 22 19:09:56 2013
@@ -0,0 +1,244 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#ifndef QPID_LINEARSTORE_JOURNALIMPL_H
+#define QPID_LINEARSTORE_JOURNALIMPL_H
+
+#include <set>
+#include "qpid/linearstore/jrnl/enums.h"
+#include "qpid/linearstore/jrnl/EmptyFilePoolTypes.h"
+#include "qpid/linearstore/jrnl/jcntl.h"
+#include "qpid/linearstore/DataTokenImpl.h"
+#include "qpid/linearstore/PreparedTransaction.h"
+#include "qpid/broker/PersistableQueue.h"
+#include "qpid/sys/Timer.h"
+#include "qpid/sys/Time.h"
+#include <boost/ptr_container/ptr_list.hpp>
+#include <boost/intrusive_ptr.hpp>
+#include "qpid/management/Manageable.h"
+#include "qmf/org/apache/qpid/linearstore/Journal.h"
+
+namespace qpid{
+
+namespace sys {
+class Timer;
+}
+namespace qls_jrnl {
+class EmptyFilePool;
+}
+
+namespace linearstore{
+
+class JournalImpl;
+class JournalLogImpl;
+
+class InactivityFireEvent : public qpid::sys::TimerTask
+{
+    JournalImpl* _parent;
+    qpid::sys::Mutex _ife_lock;
+
+  public:
+    InactivityFireEvent(JournalImpl* p, const qpid::sys::Duration timeout);
+    virtual ~InactivityFireEvent() {}
+    void fire();
+    inline void cancel() { qpid::sys::Mutex::ScopedLock sl(_ife_lock); _parent = 0; }
+};
+
+class GetEventsFireEvent : public qpid::sys::TimerTask
+{
+    JournalImpl* _parent;
+    qpid::sys::Mutex _gefe_lock;
+
+  public:
+    GetEventsFireEvent(JournalImpl* p, const qpid::sys::Duration timeout);
+    virtual ~GetEventsFireEvent() {}
+    void fire();
+    inline void cancel() { qpid::sys::Mutex::ScopedLock sl(_gefe_lock); _parent = 0; }
+};
+
+class JournalImpl : public qpid::broker::ExternalQueueStore, public qpid::qls_jrnl::jcntl, public qpid::qls_jrnl::aio_callback
+{
+  public:
+    typedef boost::function<void (JournalImpl&)> DeleteCallback;
+
+  protected:
+    qpid::sys::Timer& timer;
+    JournalLogImpl& _journalLogRef;
+    bool getEventsTimerSetFlag;
+    boost::intrusive_ptr<qpid::sys::TimerTask> getEventsFireEventsPtr;
+    qpid::sys::Mutex _getf_lock;
+    qpid::sys::Mutex _read_lock;
+
+    bool writeActivityFlag;
+    bool flushTriggeredFlag;
+    boost::intrusive_ptr<qpid::sys::TimerTask> inactivityFireEventPtr;
+
+    qpid::management::ManagementAgent* _agent;
+    qmf::org::apache::qpid::linearstore::Journal::shared_ptr _mgmtObject;
+    DeleteCallback deleteCallback;
+
+  public:
+
+    JournalImpl(qpid::sys::Timer& timer,
+                const std::string& journalId,
+                const std::string& journalDirectory,
+                JournalLogImpl& journalLogRef,
+                const qpid::sys::Duration getEventsTimeout,
+                const qpid::sys::Duration flushTimeout,
+                qpid::management::ManagementAgent* agent,
+                DeleteCallback deleteCallback=DeleteCallback() );
+
+    virtual ~JournalImpl();
+
+    void initManagement(qpid::management::ManagementAgent* agent);
+
+    void initialize(qpid::qls_jrnl::EmptyFilePool* efp,
+                    const uint16_t wcache_num_pages,
+                    const uint32_t wcache_pgsize_sblks,
+                    qpid::qls_jrnl::aio_callback* const cbp);
+
+    inline void initialize(qpid::qls_jrnl::EmptyFilePool* efpp,
+                           const uint16_t wcache_num_pages,
+                           const uint32_t wcache_pgsize_sblks) {
+        initialize(efpp, wcache_num_pages, wcache_pgsize_sblks, this);
+    }
+
+    void recover(boost::shared_ptr<qpid::qls_jrnl::EmptyFilePoolManager> efpm,
+                 const uint16_t wcache_num_pages,
+                 const uint32_t wcache_pgsize_sblks,
+                 qpid::qls_jrnl::aio_callback* const cbp,
+                 boost::ptr_list<PreparedTransaction>* prep_tx_list_ptr,
+                 uint64_t& highest_rid,
+                 uint64_t queue_id);
+
+    inline void recover(boost::shared_ptr<qpid::qls_jrnl::EmptyFilePoolManager> efpm,
+                        const uint16_t wcache_num_pages,
+                        const uint32_t wcache_pgsize_sblks,
+                        boost::ptr_list<PreparedTransaction>* prep_tx_list_ptr,
+                        uint64_t& highest_rid,
+                        uint64_t queue_id) {
+        recover(efpm, wcache_num_pages, wcache_pgsize_sblks, this, prep_tx_list_ptr, highest_rid, queue_id);
+    }
+
+    void recover_complete();
+
+    // Temporary fn to read and save last msg read from journal so it can be assigned
+    // in chunks. To be replaced when coding to do this direct from the journal is ready.
+    // Returns true if the record is extern, false if local.
+//    bool loadMsgContent(uint64_t rid, std::string& data, size_t length, size_t offset = 0);
+
+    // Overrides for write inactivity timer
+    void enqueue_data_record(const void* const data_buff, const size_t tot_data_len,
+                             const size_t this_data_len, qpid::qls_jrnl::data_tok* dtokp,
+                             const bool transient = false);
+
+    void enqueue_extern_data_record(const size_t tot_data_len, qpid::qls_jrnl::data_tok* dtokp,
+                                    const bool transient = false);
+
+    void enqueue_txn_data_record(const void* const data_buff, const size_t tot_data_len,
+                                 const size_t this_data_len, qpid::qls_jrnl::data_tok* dtokp, const std::string& xid,
+                                 const bool transient = false);
+
+    void enqueue_extern_txn_data_record(const size_t tot_data_len, qpid::qls_jrnl::data_tok* dtokp,
+                                        const std::string& xid, const bool transient = false);
+
+    void dequeue_data_record(qpid::qls_jrnl::data_tok* const dtokp, const bool txn_coml_commit = false);
+
+    void dequeue_txn_data_record(qpid::qls_jrnl::data_tok* const dtokp, const std::string& xid, const bool txn_coml_commit = false);
+
+    void txn_abort(qpid::qls_jrnl::data_tok* const dtokp, const std::string& xid);
+
+    void txn_commit(qpid::qls_jrnl::data_tok* const dtokp, const std::string& xid);
+
+    void stop(bool block_till_aio_cmpl = false);
+
+    // Overrides for get_events timer
+    qpid::qls_jrnl::iores flush(const bool block_till_aio_cmpl = false);
+
+    // TimerTask callback
+    void getEventsFire();
+    void flushFire();
+
+    // AIO callbacks
+    virtual void wr_aio_cb(std::vector<qpid::qls_jrnl::data_tok*>& dtokl);
+    virtual void rd_aio_cb(std::vector<uint16_t>& pil);
+
+    qpid::management::ManagementObject::shared_ptr GetManagementObject (void) const
+    { return _mgmtObject; }
+
+    qpid::management::Manageable::status_t ManagementMethod (uint32_t,
+                                                             qpid::management::Args&,
+                                                             std::string&);
+
+    void resetDeleteCallback() { deleteCallback = DeleteCallback(); }
+
+  protected:
+//    void free_read_buffers();
+    void createStore();
+
+    inline void setGetEventTimer()
+    {
+        getEventsFireEventsPtr->setupNextFire();
+        timer.add(getEventsFireEventsPtr);
+        getEventsTimerSetFlag = true;
+    }
+    void handleIoResult(const qpid::qls_jrnl::iores r);
+
+    // Management instrumentation callbacks overridden from jcntl
+    inline void instr_incr_outstanding_aio_cnt() {
+      if (_mgmtObject.get() != 0) _mgmtObject->inc_outstandingAIOs();
+    }
+    inline void instr_decr_outstanding_aio_cnt() {
+      if (_mgmtObject.get() != 0) _mgmtObject->dec_outstandingAIOs();
+    }
+
+}; // class JournalImpl
+
+class TplJournalImpl : public JournalImpl
+{
+  public:
+    TplJournalImpl(qpid::sys::Timer& timer,
+                   const std::string& journalId,
+                   const std::string& journalDirectory,
+                   JournalLogImpl& journalLogRef,
+                   const qpid::sys::Duration getEventsTimeout,
+                   const qpid::sys::Duration flushTimeout,
+                   qpid::management::ManagementAgent* agent) :
+        JournalImpl(timer, journalId, journalDirectory, journalLogRef, getEventsTimeout, flushTimeout, agent)
+    {}
+
+    virtual ~TplJournalImpl() {}
+
+/*
+    // Special version of read_data_record that ignores transactions - needed when reading the TPL
+    inline qpid::qls_jrnl::iores read_data_record(void** const datapp, std::size_t& dsize,
+                                                void** const xidpp, std::size_t& xidsize, bool& transient, bool& external,
+                                                qpid::qls_jrnl::data_tok* const dtokp) {
+        return JournalImpl::read_data_record(datapp, dsize, xidpp, xidsize, transient, external, dtokp, true);
+    }
+    inline void read_reset() { _rmgr.invalidate(); }
+*/
+}; // class TplJournalImpl
+
+} // namespace msgstore
+} // namespace mrg
+
+#endif // ifndef QPID_LINEARSTORE_JOURNALIMPL_H

Added: qpid/trunk/qpid/cpp/src/qpid/linearstore/JournalLogImpl.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/linearstore/JournalLogImpl.cpp?rev=1534736&view=auto
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/linearstore/JournalLogImpl.cpp (added)
+++ qpid/trunk/qpid/cpp/src/qpid/linearstore/JournalLogImpl.cpp Tue Oct 22 19:09:56 2013
@@ -0,0 +1,59 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "qpid/linearstore/JournalLogImpl.h"
+
+namespace qpid {
+namespace linearstore {
+
+JournalLogImpl::JournalLogImpl(const qpid::qls_jrnl::JournalLog::log_level_t logLevelThreshold) : qpid::qls_jrnl::JournalLog(logLevelThreshold) {}
+JournalLogImpl::~JournalLogImpl() {}
+
+void
+JournalLogImpl::log(const qpid::qls_jrnl::JournalLog::log_level_t level,
+                    const std::string& log_stmt) const {
+    switch (level) {
+      case LOG_CRITICAL: QPID_LOG(critical, "Linear Store: " << log_stmt); break;
+      case LOG_ERROR: QPID_LOG(error, "Linear Store: " << log_stmt); break;
+      case LOG_WARN: QPID_LOG(warning, "Linear Store: " << log_stmt); break;
+      case LOG_NOTICE: QPID_LOG(notice, "Linear Store: " << log_stmt); break;
+      case LOG_INFO: QPID_LOG(info, "Linear Store: " << log_stmt); break;
+      case LOG_DEBUG: QPID_LOG(debug, "Linear Store: " << log_stmt); break;
+      default: QPID_LOG(trace, "Linear Store: " << log_stmt);
+    }
+}
+
+void
+JournalLogImpl::log(const qpid::qls_jrnl::JournalLog::log_level_t level,
+                    const std::string& jid,
+                    const std::string& log_stmt) const {
+    switch (level) {
+      case LOG_CRITICAL: QPID_LOG(critical, "Linear Store: Journal \'" << jid << "\":" << log_stmt); break;
+      case LOG_ERROR: QPID_LOG(error, "Linear Store: Journal \'" << jid << "\":" << log_stmt); break;
+      case LOG_WARN: QPID_LOG(warning, "Linear Store: Journal \'" << jid << "\":" << log_stmt); break;
+      case LOG_NOTICE: QPID_LOG(notice, "Linear Store: Journal \'" << jid << "\":" << log_stmt); break;
+      case LOG_INFO: QPID_LOG(info, "Linear Store: Journal \'" << jid << "\":" << log_stmt); break;
+      case LOG_DEBUG: QPID_LOG(debug, "Linear Store: Journal \'" << jid << "\":" << log_stmt); break;
+      default: QPID_LOG(trace, "Linear Store: Journal \'" << jid << "\":" << log_stmt);
+    }
+}
+
+}} // namespace qpid::linearstore

Added: qpid/trunk/qpid/cpp/src/qpid/linearstore/JournalLogImpl.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/linearstore/JournalLogImpl.h?rev=1534736&view=auto
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/linearstore/JournalLogImpl.h (added)
+++ qpid/trunk/qpid/cpp/src/qpid/linearstore/JournalLogImpl.h Tue Oct 22 19:09:56 2013
@@ -0,0 +1,48 @@
+ /*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#ifndef QPID_LEGACYSTORE_LOG_H
+#define QPID_LEGACYSTORE_LOG_H
+
+#include "qpid/linearstore/jrnl/JournalLog.h"
+#include "qpid/log/Statement.h"
+
+#define QLS_LOG(level, msg) QPID_LOG(level, "Linear Store: " << msg)
+#define QLS_LOG2(level, queue, msg) QPID_LOG(level, "Linear Store: Journal \'" << queue << "\":" << msg)
+
+namespace qpid {
+namespace linearstore {
+
+class JournalLogImpl : public qpid::qls_jrnl::JournalLog
+{
+public:
+    JournalLogImpl(const qpid::qls_jrnl::JournalLog::log_level_t logLevelThreshold);
+    virtual ~JournalLogImpl();
+    virtual void log(const qpid::qls_jrnl::JournalLog::log_level_t logLevel,
+                     const std::string& logStatement) const;
+    virtual void log(const qpid::qls_jrnl::JournalLog::log_level_t logLevel,
+                     const std::string& journalId,
+                     const std::string& logStatement) const;
+};
+
+}} // namespace qpid::linearstore
+
+#endif // QPID_LEGACYSTORE_LOG_H



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org