You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by tr...@apache.org on 2013/07/10 20:20:20 UTC
svn commit: r1501895 [1/10] - in /qpid/branches/linearstore/qpid/cpp/src: ./
qpid/linearstore/ qpid/linearstore/jrnl/
Author: tross
Date: Wed Jul 10 18:20:19 2013
New Revision: 1501895
URL: http://svn.apache.org/r1501895
Log:
NO-JIRA - Copied legacystore into linearstore as a starting point for work.
Added:
qpid/branches/linearstore/qpid/cpp/src/linearstore.cmake
qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/
qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/BindingDbt.cpp (with props)
qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/BindingDbt.h (with props)
qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/BufferValue.cpp (with props)
qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/BufferValue.h (with props)
qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/Cursor.h (with props)
qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/DataTokenImpl.cpp (with props)
qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/DataTokenImpl.h (with props)
qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/IdDbt.cpp (with props)
qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/IdDbt.h (with props)
qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/IdSequence.cpp (with props)
qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/IdSequence.h (with props)
qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/JournalImpl.cpp (with props)
qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/JournalImpl.h (with props)
qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/MessageStoreImpl.cpp (with props)
qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/MessageStoreImpl.h (with props)
qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/PreparedTransaction.cpp (with props)
qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/PreparedTransaction.h (with props)
qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/StoreException.h (with props)
qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/StorePlugin.cpp (with props)
qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/TxnCtxt.cpp (with props)
qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/TxnCtxt.h (with props)
qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/
qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/aio.cpp (with props)
qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/aio.h (with props)
qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/aio_callback.h (with props)
qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/cvar.cpp (with props)
qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/cvar.h (with props)
qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/data_tok.cpp (with props)
qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/data_tok.h (with props)
qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/deq_hdr.h (with props)
qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/deq_rec.cpp (with props)
qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/deq_rec.h (with props)
qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/enq_hdr.h (with props)
qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/enq_map.cpp (with props)
qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/enq_map.h (with props)
qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/enq_rec.cpp (with props)
qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/enq_rec.h (with props)
qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/enums.h (with props)
qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/fcntl.cpp (with props)
qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/fcntl.h (with props)
qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/file_hdr.h (with props)
qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/jcfg.h (with props)
qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/jcntl.cpp (with props)
qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/jcntl.h (with props)
qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/jdir.cpp (with props)
qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/jdir.h (with props)
qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/jerrno.cpp (with props)
qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/jerrno.h (with props)
qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/jexception.cpp (with props)
qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/jexception.h (with props)
qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/jinf.cpp (with props)
qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/jinf.h (with props)
qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/jrec.cpp (with props)
qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/jrec.h (with props)
qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/lp_map.cpp (with props)
qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/lp_map.h (with props)
qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/lpmgr.cpp (with props)
qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/lpmgr.h (with props)
qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/pmgr.cpp (with props)
qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/pmgr.h (with props)
qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/rcvdat.h (with props)
qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/rec_hdr.h (with props)
qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/rec_tail.h (with props)
qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/rfc.cpp (with props)
qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/rfc.h (with props)
qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/rmgr.cpp (with props)
qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/rmgr.h (with props)
qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/rrfc.cpp (with props)
qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/rrfc.h (with props)
qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/slock.cpp (with props)
qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/slock.h (with props)
qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/smutex.cpp (with props)
qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/smutex.h (with props)
qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/time_ns.cpp (with props)
qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/time_ns.h (with props)
qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/txn_hdr.h (with props)
qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/txn_map.cpp (with props)
qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/txn_map.h (with props)
qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/txn_rec.cpp (with props)
qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/txn_rec.h (with props)
qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/wmgr.cpp (with props)
qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/wmgr.h (with props)
qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/wrfc.cpp (with props)
qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/wrfc.h (with props)
qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/management-schema.xml (with props)
Modified:
qpid/branches/linearstore/qpid/cpp/src/CMakeLists.txt
Modified: qpid/branches/linearstore/qpid/cpp/src/CMakeLists.txt
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/CMakeLists.txt?rev=1501895&r1=1501894&r2=1501895&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/CMakeLists.txt (original)
+++ qpid/branches/linearstore/qpid/cpp/src/CMakeLists.txt Wed Jul 10 18:20:19 2013
@@ -1595,6 +1595,7 @@ install_pdb (qmfconsole ${QPID_COMPONENT
# Legacy store
#
include (legacystore.cmake)
+include (linearstore.cmake)
# Now create the config file from all the info learned above.
configure_file(${CMAKE_CURRENT_SOURCE_DIR}/config.h.cmake
Added: qpid/branches/linearstore/qpid/cpp/src/linearstore.cmake
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/linearstore.cmake?rev=1501895&view=auto
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/linearstore.cmake (added)
+++ qpid/branches/linearstore/qpid/cpp/src/linearstore.cmake Wed Jul 10 18:20:19 2013
@@ -0,0 +1,161 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+#
+# Legacy store library CMake fragment, to be included in CMakeLists.txt
+#
+
+if (DEFINED linearstore_force)
+ set (linearstore_default ${linearstore_force})
+else (DEFINED linearstore_force)
+ set (linearstore_default OFF)
+ if (UNIX)
+ #
+ # Find required BerkelyDB
+ #
+ include (finddb.cmake)
+ if (DB_FOUND)
+ #
+ # find libaio
+ #
+ CHECK_LIBRARY_EXISTS (aio io_queue_init "" HAVE_AIO)
+ CHECK_INCLUDE_FILES (libaio.h HAVE_AIO_H)
+ if (HAVE_AIO AND HAVE_AIO_H)
+ #
+ # find libuuid
+ #
+ CHECK_LIBRARY_EXISTS (uuid uuid_compare "" HAVE_UUID)
+ CHECK_INCLUDE_FILES(uuid/uuid.h HAVE_UUID_H)
+ IF (HAVE_UUID AND HAVE_UUID_H)
+ #
+ # allow linearstore to be built
+ #
+ set (linearstore_default ON)
+ ENDIF (HAVE_UUID AND HAVE_UUID_H)
+ endif (HAVE_AIO AND HAVE_AIO_H)
+ endif (DB_FOUND)
+ endif (UNIX)
+endif (DEFINED linearstore_force)
+
+option(BUILD_LINEARSTORE "Build linearstore persistent store" ${linearstore_default})
+
+if (BUILD_LINEARSTORE)
+ if (NOT UNIX)
+ message(FATAL_ERROR "Linearstore produced only on Unix platforms")
+ endif (NOT UNIX)
+ if (NOT DB_FOUND)
+ message(FATAL_ERROR "Linearstore requires BerkeleyDB which is absent.")
+ endif (NOT DB_FOUND)
+ if (NOT HAVE_AIO)
+ message(FATAL_ERROR "Linearstore requires libaio which is absent.")
+ endif (NOT HAVE_AIO)
+ if (NOT HAVE_AIO_H)
+ message(FATAL_ERROR "Linearstore requires libaio.h which is absent.")
+ endif (NOT HAVE_AIO_H)
+ if (NOT HAVE_UUID)
+ message(FATAL_ERROR "Linearstore requires uuid which is absent.")
+ endif (NOT HAVE_UUID)
+ if (NOT HAVE_UUID_H)
+ message(FATAL_ERROR "Linearstore requires uuid.h which is absent.")
+ endif (NOT HAVE_UUID_H)
+
+ # Journal source files
+ set (legacy_jrnl_SOURCES
+ qpid/linearstore/jrnl/aio.cpp
+ qpid/linearstore/jrnl/cvar.cpp
+ qpid/linearstore/jrnl/data_tok.cpp
+ qpid/linearstore/jrnl/deq_rec.cpp
+ qpid/linearstore/jrnl/enq_map.cpp
+ qpid/linearstore/jrnl/enq_rec.cpp
+ qpid/linearstore/jrnl/fcntl.cpp
+ qpid/linearstore/jrnl/jcntl.cpp
+ qpid/linearstore/jrnl/jdir.cpp
+ qpid/linearstore/jrnl/jerrno.cpp
+ qpid/linearstore/jrnl/jexception.cpp
+ qpid/linearstore/jrnl/jinf.cpp
+ qpid/linearstore/jrnl/jrec.cpp
+ qpid/linearstore/jrnl/lp_map.cpp
+ qpid/linearstore/jrnl/lpmgr.cpp
+ qpid/linearstore/jrnl/pmgr.cpp
+ qpid/linearstore/jrnl/rmgr.cpp
+ qpid/linearstore/jrnl/rfc.cpp
+ qpid/linearstore/jrnl/rrfc.cpp
+ qpid/linearstore/jrnl/slock.cpp
+ qpid/linearstore/jrnl/smutex.cpp
+ qpid/linearstore/jrnl/time_ns.cpp
+ qpid/linearstore/jrnl/txn_map.cpp
+ qpid/linearstore/jrnl/txn_rec.cpp
+ qpid/linearstore/jrnl/wmgr.cpp
+ qpid/linearstore/jrnl/wrfc.cpp
+ )
+
+ # linearstore source files
+ set (legacy_store_SOURCES
+ qpid/linearstore/StorePlugin.cpp
+ qpid/linearstore/BindingDbt.cpp
+ qpid/linearstore/BufferValue.cpp
+ qpid/linearstore/DataTokenImpl.cpp
+ qpid/linearstore/IdDbt.cpp
+ qpid/linearstore/IdSequence.cpp
+ qpid/linearstore/JournalImpl.cpp
+ qpid/linearstore/MessageStoreImpl.cpp
+ qpid/linearstore/PreparedTransaction.cpp
+ qpid/linearstore/TxnCtxt.cpp
+ )
+
+ # linearstore include directories
+ get_property(dirs DIRECTORY ${CMAKE_CURRENT_SOURCE_DIR} PROPERTY INCLUDE_DIRECTORIES)
+ set (legacy_include_DIRECTORIES
+ ${dirs}
+ ${CMAKE_CURRENT_SOURCE_DIR}/qpid/linearstore
+ )
+
+ if(NOT EXISTS ${CMAKE_CURRENT_BINARY_DIR}/db-inc.h)
+ message(STATUS "Including BDB from ${DB_INCLUDE_DIR}/db_cxx.h")
+ file(WRITE
+ ${CMAKE_CURRENT_BINARY_DIR}/db-inc.h
+ "#include <${DB_INCLUDE_DIR}/db_cxx.h>\n")
+ endif()
+
+ add_library (linearstore MODULE
+ ${legacy_jrnl_SOURCES}
+ ${legacy_store_SOURCES}
+ ${legacy_qmf_SOURCES}
+ )
+
+ set_target_properties (linearstore PROPERTIES
+ PREFIX ""
+ COMPILE_DEFINITIONS _IN_QPID_BROKER
+ OUTPUT_NAME linearstore
+ INCLUDE_DIRECTORIES "${legacy_include_DIRECTORIES}"
+ )
+
+ target_link_libraries (linearstore
+ aio
+ uuid
+ qpidcommon qpidtypes qpidbroker
+ ${DB_LIBRARY}
+ )
+
+install(TARGETS linearstore
+ DESTINATION ${QPIDD_MODULE_DIR}
+ COMPONENT ${QPID_COMPONENT_BROKER})
+
+else (BUILD_LINEARSTORE)
+ message(STATUS "Linearstore is excluded from build.")
+endif (BUILD_LINEARSTORE)
Added: qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/BindingDbt.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/BindingDbt.cpp?rev=1501895&view=auto
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/BindingDbt.cpp (added)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/BindingDbt.cpp Wed Jul 10 18:20:19 2013
@@ -0,0 +1,50 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "qpid/legacystore/BindingDbt.h"
+
+namespace mrg {
+namespace msgstore {
+
+BindingDbt::BindingDbt(const qpid::broker::PersistableExchange& e, const qpid::broker::PersistableQueue& q, const std::string& k, const qpid::framing::FieldTable& a)
+ : data(new char[encodedSize(e, q, k, a)]),
+ buffer(data, encodedSize(e, q, k, a))
+{
+ buffer.putLongLong(q.getPersistenceId());
+ buffer.putShortString(q.getName());
+ buffer.putShortString(k);
+ buffer.put(a);
+
+ set_data(data);
+ set_size(encodedSize(e, q, k, a));
+}
+
+BindingDbt::~BindingDbt()
+{
+ delete [] data;
+}
+
+uint32_t BindingDbt::encodedSize(const qpid::broker::PersistableExchange& /*not used*/, const qpid::broker::PersistableQueue& q, const std::string& k, const qpid::framing::FieldTable& a)
+{
+ return 8 /*queue id*/ + q.getName().size() + 1 + k.size() + 1 + a.encodedSize();
+}
+
+}}
Propchange: qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/BindingDbt.cpp
------------------------------------------------------------------------------
svn:eol-style = native
Added: qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/BindingDbt.h
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/BindingDbt.h?rev=1501895&view=auto
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/BindingDbt.h (added)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/BindingDbt.h Wed Jul 10 18:20:19 2013
@@ -0,0 +1,56 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#ifndef QPID_LEGACYSTORE_BINDINGDBT_H
+#define QPID_LEGACYSTORE_BINDINGDBT_H
+
+#include "db-inc.h"
+#include "qpid/broker/PersistableExchange.h"
+#include "qpid/broker/PersistableQueue.h"
+#include "qpid/framing/Buffer.h"
+#include "qpid/framing/FieldTable.h"
+
+namespace mrg{
+namespace msgstore{
+
+class BindingDbt : public Dbt
+{
+ char* data;
+ qpid::framing::Buffer buffer;
+
+ static uint32_t encodedSize(const qpid::broker::PersistableExchange& e,
+ const qpid::broker::PersistableQueue& q,
+ const std::string& k,
+ const qpid::framing::FieldTable& a);
+
+public:
+ BindingDbt(const qpid::broker::PersistableExchange& e,
+ const qpid::broker::PersistableQueue& q,
+ const std::string& k,
+ const qpid::framing::FieldTable& a);
+
+ virtual ~BindingDbt();
+
+};
+
+}}
+
+#endif // ifndef QPID_LEGACYSTORE_BINDINGDBT_H
Propchange: qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/BindingDbt.h
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/BindingDbt.h
------------------------------------------------------------------------------
svn:keywords = Author Date Id Rev URL
Added: qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/BufferValue.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/BufferValue.cpp?rev=1501895&view=auto
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/BufferValue.cpp (added)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/BufferValue.cpp Wed Jul 10 18:20:19 2013
@@ -0,0 +1,56 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "qpid/legacystore/BufferValue.h"
+
+namespace mrg {
+namespace msgstore {
+
+
+
+BufferValue::BufferValue(u_int32_t size, u_int64_t offset)
+ : data(new char[size]),
+ buffer(data, size)
+{
+ set_data(data);
+ set_size(size);
+ set_flags(DB_DBT_USERMEM | DB_DBT_PARTIAL);
+ set_doff(offset);
+ set_dlen(size);
+ set_ulen(size);
+}
+
+BufferValue::BufferValue(const qpid::broker::Persistable& p)
+ : data(new char[p.encodedSize()]),
+ buffer(data, p.encodedSize())
+{
+ p.encode(buffer);
+
+ set_data(data);
+ set_size(p.encodedSize());
+}
+
+BufferValue::~BufferValue()
+{
+ delete [] data;
+}
+
+}}
Propchange: qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/BufferValue.cpp
------------------------------------------------------------------------------
svn:eol-style = native
Added: qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/BufferValue.h
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/BufferValue.h?rev=1501895&view=auto
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/BufferValue.h (added)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/BufferValue.h Wed Jul 10 18:20:19 2013
@@ -0,0 +1,46 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#ifndef QPID_LEGACYSTORE_BUFFERVALUE_H
+#define QPID_LEGACYSTORE_BUFFERVALUE_H
+
+#include "db-inc.h"
+#include "qpid/broker/Persistable.h"
+#include "qpid/framing/Buffer.h"
+
+namespace mrg{
+namespace msgstore{
+
+class BufferValue : public Dbt
+{
+ char* data;
+
+public:
+ qpid::framing::Buffer buffer;
+
+ BufferValue(u_int32_t size, u_int64_t offset);
+ BufferValue(const qpid::broker::Persistable& p);
+ virtual ~BufferValue();
+};
+
+}}
+
+#endif // ifndef QPID_LEGACYSTORE_BUFFERVALUE_H
Propchange: qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/BufferValue.h
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/BufferValue.h
------------------------------------------------------------------------------
svn:keywords = Author Date Id Rev URL
Added: qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/Cursor.h
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/Cursor.h?rev=1501895&view=auto
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/Cursor.h (added)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/Cursor.h Wed Jul 10 18:20:19 2013
@@ -0,0 +1,50 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#ifndef QPID_LEGACYSTORE_CURSOR_H
+#define QPID_LEGACYSTORE_CURSOR_H
+
+#include <boost/shared_ptr.hpp>
+#include "db-inc.h"
+
+namespace mrg{
+namespace msgstore{
+
+class Cursor
+{
+ Dbc* cursor;
+public:
+ typedef boost::shared_ptr<Db> db_ptr;
+
+ Cursor() : cursor(0) {}
+ virtual ~Cursor() { if(cursor) cursor->close(); }
+
+ void open(db_ptr db, DbTxn* txn, u_int32_t flags = 0) { db->cursor(txn, &cursor, flags); }
+ void close() { if(cursor) cursor->close(); cursor = 0; }
+ Dbc* get() { return cursor; }
+ Dbc* operator->() { return cursor; }
+ bool next(Dbt& key, Dbt& value) { return cursor->get(&key, &value, DB_NEXT) == 0; }
+ bool current(Dbt& key, Dbt& value) { return cursor->get(&key, &value, DB_CURRENT) == 0; }
+};
+
+}}
+
+#endif // ifndef QPID_LEGACYSTORE_CURSOR_H
Propchange: qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/Cursor.h
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/Cursor.h
------------------------------------------------------------------------------
svn:keywords = Author Date Id Rev URL
Added: qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/DataTokenImpl.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/DataTokenImpl.cpp?rev=1501895&view=auto
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/DataTokenImpl.cpp (added)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/DataTokenImpl.cpp Wed Jul 10 18:20:19 2013
@@ -0,0 +1,28 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "qpid/legacystore/DataTokenImpl.h"
+
+using namespace mrg::msgstore;
+
+DataTokenImpl::DataTokenImpl():data_tok() {}
+
+DataTokenImpl::~DataTokenImpl() {}
Propchange: qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/DataTokenImpl.cpp
------------------------------------------------------------------------------
svn:eol-style = native
Added: qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/DataTokenImpl.h
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/DataTokenImpl.h?rev=1501895&view=auto
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/DataTokenImpl.h (added)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/DataTokenImpl.h Wed Jul 10 18:20:19 2013
@@ -0,0 +1,47 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#ifndef QPID_LEGACYSTORE_DATATOKENIMPL_H
+#define QPID_LEGACYSTORE_DATATOKENIMPL_H
+
+#include "qpid/legacystore/jrnl/data_tok.h"
+#include "qpid/broker/PersistableMessage.h"
+#include <boost/intrusive_ptr.hpp>
+
+namespace mrg {
+namespace msgstore {
+
+class DataTokenImpl : public journal::data_tok, public qpid::RefCounted
+{
+ private:
+ boost::intrusive_ptr<qpid::broker::PersistableMessage> sourceMsg;
+ public:
+ DataTokenImpl();
+ virtual ~DataTokenImpl();
+
+ inline boost::intrusive_ptr<qpid::broker::PersistableMessage>& getSourceMessage() { return sourceMsg; }
+ inline void setSourceMessage(const boost::intrusive_ptr<qpid::broker::PersistableMessage>& msg) { sourceMsg = msg; }
+};
+
+} // namespace msgstore
+} // namespace mrg
+
+#endif // ifndef QPID_LEGACYSTORE_DATATOKENIMPL_H
Propchange: qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/DataTokenImpl.h
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/DataTokenImpl.h
------------------------------------------------------------------------------
svn:keywords = Author Date Id Rev URL
Added: qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/IdDbt.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/IdDbt.cpp?rev=1501895&view=auto
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/IdDbt.cpp (added)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/IdDbt.cpp Wed Jul 10 18:20:19 2013
@@ -0,0 +1,42 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "qpid/legacystore/IdDbt.h"
+
+using namespace mrg::msgstore;
+
+IdDbt::IdDbt() : id(0)
+{
+ init();
+}
+
+IdDbt::IdDbt(u_int64_t _id) : id(_id)
+{
+ init();
+}
+
+void IdDbt::init()
+{
+ set_data(&id);
+ set_size(sizeof(u_int64_t));
+ set_ulen(sizeof(u_int64_t));
+ set_flags(DB_DBT_USERMEM);
+}
Propchange: qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/IdDbt.cpp
------------------------------------------------------------------------------
svn:eol-style = native
Added: qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/IdDbt.h
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/IdDbt.h?rev=1501895&view=auto
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/IdDbt.h (added)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/IdDbt.h Wed Jul 10 18:20:19 2013
@@ -0,0 +1,42 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#ifndef QPID_LEGACYSTORE_IDDBT_H
+#define QPID_LEGACYSTORE_IDDBT_H
+
+#include "db-inc.h"
+
+namespace mrg{
+namespace msgstore{
+
+class IdDbt : public Dbt
+{
+ void init();
+public:
+ u_int64_t id;
+
+ IdDbt(u_int64_t id);
+ IdDbt();
+};
+
+}}
+
+#endif // ifndef QPID_LEGACYSTORE_IDDBT_H
Propchange: qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/IdDbt.h
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/IdDbt.h
------------------------------------------------------------------------------
svn:keywords = Author Date Id Rev URL
Added: qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/IdSequence.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/IdSequence.cpp?rev=1501895&view=auto
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/IdSequence.cpp (added)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/IdSequence.cpp Wed Jul 10 18:20:19 2013
@@ -0,0 +1,40 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "qpid/legacystore/IdSequence.h"
+
+using namespace mrg::msgstore;
+using qpid::sys::Mutex;
+
+IdSequence::IdSequence() : id(1) {}
+
+u_int64_t IdSequence::next()
+{
+ Mutex::ScopedLock guard(lock);
+ if (!id) id++; // avoid 0 when folding around
+ return id++;
+}
+
+void IdSequence::reset(uint64_t value)
+{
+ //deliberately not threadsafe, used only on recovery
+ id = value;
+}
Propchange: qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/IdSequence.cpp
------------------------------------------------------------------------------
svn:eol-style = native
Added: qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/IdSequence.h
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/IdSequence.h?rev=1501895&view=auto
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/IdSequence.h (added)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/IdSequence.h Wed Jul 10 18:20:19 2013
@@ -0,0 +1,44 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#ifndef QPID_LEGACYSTORE_IDSEQUENCE_H
+#define QPID_LEGACYSTORE_IDSEQUENCE_H
+
+#include "qpid/framing/amqp_types.h"
+#include "qpid/sys/Mutex.h"
+#include <sys/types.h>
+
+namespace mrg{
+namespace msgstore{
+
+class IdSequence
+{
+ qpid::sys::Mutex lock;
+ uint64_t id;
+public:
+ IdSequence();
+ uint64_t next();
+ void reset(uint64_t value);
+};
+
+}}
+
+#endif // ifndef QPID_LEGACYSTORE_IDSEQUENCE_H
Propchange: qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/IdSequence.h
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/IdSequence.h
------------------------------------------------------------------------------
svn:keywords = Author Date Id Rev URL
Added: qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/JournalImpl.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/JournalImpl.cpp?rev=1501895&view=auto
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/JournalImpl.cpp (added)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/JournalImpl.cpp Wed Jul 10 18:20:19 2013
@@ -0,0 +1,633 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "qpid/legacystore/JournalImpl.h"
+
+#include "qpid/legacystore/jrnl/jerrno.h"
+#include "qpid/legacystore/jrnl/jexception.h"
+#include "qpid/log/Statement.h"
+#include "qpid/management/ManagementAgent.h"
+#include "qmf/org/apache/qpid/legacystore/ArgsJournalExpand.h"
+#include "qmf/org/apache/qpid/legacystore/EventCreated.h"
+#include "qmf/org/apache/qpid/legacystore/EventEnqThresholdExceeded.h"
+#include "qmf/org/apache/qpid/legacystore/EventFull.h"
+#include "qmf/org/apache/qpid/legacystore/EventRecovered.h"
+#include "qpid/sys/Monitor.h"
+#include "qpid/sys/Timer.h"
+#include "qpid/legacystore/StoreException.h"
+
+using namespace mrg::msgstore;
+using namespace mrg::journal;
+using qpid::management::ManagementAgent;
+namespace _qmf = qmf::org::apache::qpid::legacystore;
+
+InactivityFireEvent::InactivityFireEvent(JournalImpl* p, const qpid::sys::Duration timeout):
+ qpid::sys::TimerTask(timeout, "JournalInactive:"+p->id()), _parent(p) {}
+
+void InactivityFireEvent::fire() { qpid::sys::Mutex::ScopedLock sl(_ife_lock); if (_parent) _parent->flushFire(); }
+
+GetEventsFireEvent::GetEventsFireEvent(JournalImpl* p, const qpid::sys::Duration timeout):
+ qpid::sys::TimerTask(timeout, "JournalGetEvents:"+p->id()), _parent(p) {}
+
+void GetEventsFireEvent::fire() { qpid::sys::Mutex::ScopedLock sl(_gefe_lock); if (_parent) _parent->getEventsFire(); }
+
+JournalImpl::JournalImpl(qpid::sys::Timer& timer_,
+ const std::string& journalId,
+ const std::string& journalDirectory,
+ const std::string& journalBaseFilename,
+ const qpid::sys::Duration getEventsTimeout,
+ const qpid::sys::Duration flushTimeout,
+ qpid::management::ManagementAgent* a,
+ DeleteCallback onDelete):
+ jcntl(journalId, journalDirectory, journalBaseFilename),
+ timer(timer_),
+ getEventsTimerSetFlag(false),
+ lastReadRid(0),
+ writeActivityFlag(false),
+ flushTriggeredFlag(true),
+ _xidp(0),
+ _datap(0),
+ _dlen(0),
+ _dtok(),
+ _external(false),
+ deleteCallback(onDelete)
+{
+ getEventsFireEventsPtr = new GetEventsFireEvent(this, getEventsTimeout);
+ inactivityFireEventPtr = new InactivityFireEvent(this, flushTimeout);
+ {
+ timer.start();
+ timer.add(inactivityFireEventPtr);
+ }
+
+ initManagement(a);
+
+ log(LOG_NOTICE, "Created");
+ std::ostringstream oss;
+ oss << "Journal directory = \"" << journalDirectory << "\"; Base file name = \"" << journalBaseFilename << "\"";
+ log(LOG_DEBUG, oss.str());
+}
+
+JournalImpl::~JournalImpl()
+{
+ if (deleteCallback) deleteCallback(*this);
+ if (_init_flag && !_stop_flag){
+ try { stop(true); } // NOTE: This will *block* until all outstanding disk aio calls are complete!
+ catch (const jexception& e) { log(LOG_ERROR, e.what()); }
+ }
+ getEventsFireEventsPtr->cancel();
+ inactivityFireEventPtr->cancel();
+ free_read_buffers();
+
+ if (_mgmtObject.get() != 0) {
+ _mgmtObject->resourceDestroy();
+ _mgmtObject.reset();
+ }
+
+ log(LOG_NOTICE, "Destroyed");
+}
+
+void
+JournalImpl::initManagement(qpid::management::ManagementAgent* a)
+{
+ _agent = a;
+ if (_agent != 0)
+ {
+ _mgmtObject = _qmf::Journal::shared_ptr (
+ new _qmf::Journal(_agent, this));
+
+ _mgmtObject->set_name(_jid);
+ _mgmtObject->set_directory(_jdir.dirname());
+ _mgmtObject->set_baseFileName(_base_filename);
+ _mgmtObject->set_readPageSize(JRNL_RMGR_PAGE_SIZE * JRNL_SBLK_SIZE * JRNL_DBLK_SIZE);
+ _mgmtObject->set_readPages(JRNL_RMGR_PAGES);
+
+ // The following will be set on initialize(), but being properties, these must be set to 0 in the meantime
+ _mgmtObject->set_initialFileCount(0);
+ _mgmtObject->set_dataFileSize(0);
+ _mgmtObject->set_currentFileCount(0);
+ _mgmtObject->set_writePageSize(0);
+ _mgmtObject->set_writePages(0);
+
+ _agent->addObject(_mgmtObject, 0, true);
+ }
+}
+
+
+void
+JournalImpl::initialize(const u_int16_t num_jfiles,
+ const bool auto_expand,
+ const u_int16_t ae_max_jfiles,
+ const u_int32_t jfsize_sblks,
+ const u_int16_t wcache_num_pages,
+ const u_int32_t wcache_pgsize_sblks,
+ mrg::journal::aio_callback* const cbp)
+{
+ std::ostringstream oss;
+ oss << "Initialize; num_jfiles=" << num_jfiles << " jfsize_sblks=" << jfsize_sblks;
+ oss << " wcache_pgsize_sblks=" << wcache_pgsize_sblks;
+ oss << " wcache_num_pages=" << wcache_num_pages;
+ log(LOG_DEBUG, oss.str());
+ jcntl::initialize(num_jfiles, auto_expand, ae_max_jfiles, jfsize_sblks, wcache_num_pages, wcache_pgsize_sblks, cbp);
+ log(LOG_DEBUG, "Initialization complete");
+
+ if (_mgmtObject.get() != 0)
+ {
+ _mgmtObject->set_initialFileCount(_lpmgr.num_jfiles());
+ _mgmtObject->set_autoExpand(_lpmgr.is_ae());
+ _mgmtObject->set_currentFileCount(_lpmgr.num_jfiles());
+ _mgmtObject->set_maxFileCount(_lpmgr.ae_max_jfiles());
+ _mgmtObject->set_dataFileSize(_jfsize_sblks * JRNL_SBLK_SIZE * JRNL_DBLK_SIZE);
+ _mgmtObject->set_writePageSize(wcache_pgsize_sblks * JRNL_SBLK_SIZE * JRNL_DBLK_SIZE);
+ _mgmtObject->set_writePages(wcache_num_pages);
+ }
+ if (_agent != 0)
+ _agent->raiseEvent(qmf::org::apache::qpid::legacystore::EventCreated(_jid, _jfsize_sblks * JRNL_SBLK_SIZE * JRNL_DBLK_SIZE, _lpmgr.num_jfiles()),
+ qpid::management::ManagementAgent::SEV_NOTE);
+}
+
+void
+JournalImpl::recover(const u_int16_t num_jfiles,
+ const bool auto_expand,
+ const u_int16_t ae_max_jfiles,
+ const u_int32_t jfsize_sblks,
+ const u_int16_t wcache_num_pages,
+ const u_int32_t wcache_pgsize_sblks,
+ mrg::journal::aio_callback* const cbp,
+ boost::ptr_list<msgstore::PreparedTransaction>* prep_tx_list_ptr,
+ u_int64_t& highest_rid,
+ u_int64_t queue_id)
+{
+ std::ostringstream oss1;
+ oss1 << "Recover; num_jfiles=" << num_jfiles << " jfsize_sblks=" << jfsize_sblks;
+ oss1 << " queue_id = 0x" << std::hex << queue_id << std::dec;
+ oss1 << " wcache_pgsize_sblks=" << wcache_pgsize_sblks;
+ oss1 << " wcache_num_pages=" << wcache_num_pages;
+ log(LOG_DEBUG, oss1.str());
+
+ if (_mgmtObject.get() != 0)
+ {
+ _mgmtObject->set_initialFileCount(_lpmgr.num_jfiles());
+ _mgmtObject->set_autoExpand(_lpmgr.is_ae());
+ _mgmtObject->set_currentFileCount(_lpmgr.num_jfiles());
+ _mgmtObject->set_maxFileCount(_lpmgr.ae_max_jfiles());
+ _mgmtObject->set_dataFileSize(_jfsize_sblks * JRNL_SBLK_SIZE * JRNL_DBLK_SIZE);
+ _mgmtObject->set_writePageSize(wcache_pgsize_sblks * JRNL_SBLK_SIZE * JRNL_DBLK_SIZE);
+ _mgmtObject->set_writePages(wcache_num_pages);
+ }
+
+ if (prep_tx_list_ptr) {
+ // Create list of prepared xids
+ std::vector<std::string> prep_xid_list;
+ for (msgstore::PreparedTransaction::list::iterator i = prep_tx_list_ptr->begin(); i != prep_tx_list_ptr->end(); i++) {
+ prep_xid_list.push_back(i->xid);
+ }
+
+ jcntl::recover(num_jfiles, auto_expand, ae_max_jfiles, jfsize_sblks, wcache_num_pages, wcache_pgsize_sblks,
+ cbp, &prep_xid_list, highest_rid);
+ } else {
+ jcntl::recover(num_jfiles, auto_expand, ae_max_jfiles, jfsize_sblks, wcache_num_pages, wcache_pgsize_sblks,
+ cbp, 0, highest_rid);
+ }
+
+ // Populate PreparedTransaction lists from _tmap
+ if (prep_tx_list_ptr)
+ {
+ for (msgstore::PreparedTransaction::list::iterator i = prep_tx_list_ptr->begin(); i != prep_tx_list_ptr->end(); i++) {
+ txn_data_list tdl = _tmap.get_tdata_list(i->xid); // tdl will be empty if xid not found
+ for (tdl_itr tdl_itr = tdl.begin(); tdl_itr < tdl.end(); tdl_itr++) {
+ if (tdl_itr->_enq_flag) { // enqueue op
+ i->enqueues->add(queue_id, tdl_itr->_rid);
+ } else { // dequeue op
+ i->dequeues->add(queue_id, tdl_itr->_drid);
+ }
+ }
+ }
+ }
+ std::ostringstream oss2;
+ oss2 << "Recover phase 1 complete; highest rid found = 0x" << std::hex << highest_rid;
+ oss2 << std::dec << "; emap.size=" << _emap.size() << "; tmap.size=" << _tmap.size();
+ oss2 << "; journal now read-only.";
+ log(LOG_DEBUG, oss2.str());
+
+ if (_mgmtObject.get() != 0)
+ {
+ _mgmtObject->inc_recordDepth(_emap.size());
+ _mgmtObject->inc_enqueues(_emap.size());
+ _mgmtObject->inc_txn(_tmap.size());
+ _mgmtObject->inc_txnEnqueues(_tmap.enq_cnt());
+ _mgmtObject->inc_txnDequeues(_tmap.deq_cnt());
+ }
+}
+
+void
+JournalImpl::recover_complete()
+{
+ jcntl::recover_complete();
+ log(LOG_DEBUG, "Recover phase 2 complete; journal now writable.");
+ if (_agent != 0)
+ _agent->raiseEvent(qmf::org::apache::qpid::legacystore::EventRecovered(_jid, _jfsize_sblks * JRNL_SBLK_SIZE * JRNL_DBLK_SIZE, _lpmgr.num_jfiles(),
+ _emap.size(), _tmap.size(), _tmap.enq_cnt(), _tmap.deq_cnt()), qpid::management::ManagementAgent::SEV_NOTE);
+}
+
+//#define MAX_AIO_SLEEPS 1000000 // tot: ~10 sec
+//#define AIO_SLEEP_TIME_US 10 // 0.01 ms
+// Return true if content is recovered from store; false if content is external and must be recovered from an external store.
+// Throw exception for all errors.
+bool
+JournalImpl::loadMsgContent(u_int64_t rid, std::string& data, size_t length, size_t offset)
+{
+ qpid::sys::Mutex::ScopedLock sl(_read_lock);
+ if (_dtok.rid() != rid)
+ {
+ // Free any previous msg
+ free_read_buffers();
+
+ // Last read encountered out-of-order rids, check if this rid is in that list
+ bool oooFlag = false;
+ for (std::vector<u_int64_t>::const_iterator i=oooRidList.begin(); i!=oooRidList.end() && !oooFlag; i++) {
+ if (*i == rid) {
+ oooFlag = true;
+ }
+ }
+
+ // TODO: This is a brutal approach - very inefficient and slow. Rather introduce a system of remembering
+ // jumpover points and allow the read to jump back to the first known jumpover point - but this needs
+ // a mechanism in rrfc to accomplish it. Also helpful is a struct containing a journal address - a
+ // combination of lid/offset.
+ // NOTE: The second part of the if stmt (rid < lastReadRid) is required to handle browsing.
+ if (oooFlag || rid < lastReadRid) {
+ _rmgr.invalidate();
+ oooRidList.clear();
+ }
+ _dlen = 0;
+ _dtok.reset();
+ _dtok.set_wstate(DataTokenImpl::ENQ);
+ _dtok.set_rid(0);
+ _external = false;
+ size_t xlen = 0;
+ bool transient = false;
+ bool done = false;
+ bool rid_found = false;
+ while (!done) {
+ iores res = read_data_record(&_datap, _dlen, &_xidp, xlen, transient, _external, &_dtok);
+ switch (res) {
+ case mrg::journal::RHM_IORES_SUCCESS:
+ if (_dtok.rid() != rid) {
+ // Check if this is an out-of-order rid that may impact next read
+ if (_dtok.rid() > rid)
+ oooRidList.push_back(_dtok.rid());
+ free_read_buffers();
+ // Reset data token for next read
+ _dlen = 0;
+ _dtok.reset();
+ _dtok.set_wstate(DataTokenImpl::ENQ);
+ _dtok.set_rid(0);
+ } else {
+ rid_found = _dtok.rid() == rid;
+ lastReadRid = rid;
+ done = true;
+ }
+ break;
+ case mrg::journal::RHM_IORES_PAGE_AIOWAIT:
+ if (get_wr_events(&_aio_cmpl_timeout) == journal::jerrno::AIO_TIMEOUT) {
+ std::stringstream ss;
+ ss << "read_data_record() returned " << mrg::journal::iores_str(res);
+ ss << "; timed out waiting for page to be processed.";
+ throw jexception(mrg::journal::jerrno::JERR__TIMEOUT, ss.str().c_str(), "JournalImpl",
+ "loadMsgContent");
+ }
+ break;
+ default:
+ std::stringstream ss;
+ ss << "read_data_record() returned " << mrg::journal::iores_str(res);
+ throw jexception(mrg::journal::jerrno::JERR__UNEXPRESPONSE, ss.str().c_str(), "JournalImpl",
+ "loadMsgContent");
+ }
+ }
+ if (!rid_found) {
+ std::stringstream ss;
+ ss << "read_data_record() was unable to find rid 0x" << std::hex << rid << std::dec;
+ ss << " (" << rid << "); last rid found was 0x" << std::hex << _dtok.rid() << std::dec;
+ ss << " (" << _dtok.rid() << ")";
+ throw jexception(mrg::journal::jerrno::JERR__RECNFOUND, ss.str().c_str(), "JournalImpl", "loadMsgContent");
+ }
+ }
+
+ if (_external) return false;
+
+ u_int32_t hdr_offs = qpid::framing::Buffer(static_cast<char*>(_datap), sizeof(u_int32_t)).getLong() + sizeof(u_int32_t);
+ if (hdr_offs + offset + length > _dlen) {
+ data.append((const char*)_datap + hdr_offs + offset, _dlen - hdr_offs - offset);
+ } else {
+ data.append((const char*)_datap + hdr_offs + offset, length);
+ }
+ return true;
+}
+
+void
+JournalImpl::enqueue_data_record(const void* const data_buff, const size_t tot_data_len,
+ const size_t this_data_len, data_tok* dtokp, const bool transient)
+{
+ handleIoResult(jcntl::enqueue_data_record(data_buff, tot_data_len, this_data_len, dtokp, transient));
+
+ if (_mgmtObject.get() != 0)
+ {
+ _mgmtObject->inc_enqueues();
+ _mgmtObject->inc_recordDepth();
+ }
+}
+
+void
+JournalImpl::enqueue_extern_data_record(const size_t tot_data_len, data_tok* dtokp,
+ const bool transient)
+{
+ handleIoResult(jcntl::enqueue_extern_data_record(tot_data_len, dtokp, transient));
+
+ if (_mgmtObject.get() != 0)
+ {
+ _mgmtObject->inc_enqueues();
+ _mgmtObject->inc_recordDepth();
+ }
+}
+
+void
+JournalImpl::enqueue_txn_data_record(const void* const data_buff, const size_t tot_data_len,
+ const size_t this_data_len, data_tok* dtokp, const std::string& xid, const bool transient)
+{
+ bool txn_incr = _mgmtObject.get() != 0 ? _tmap.in_map(xid) : false;
+
+ handleIoResult(jcntl::enqueue_txn_data_record(data_buff, tot_data_len, this_data_len, dtokp, xid, transient));
+
+ if (_mgmtObject.get() != 0)
+ {
+ if (!txn_incr) // If this xid was not in _tmap, it will be now...
+ _mgmtObject->inc_txn();
+ _mgmtObject->inc_enqueues();
+ _mgmtObject->inc_txnEnqueues();
+ _mgmtObject->inc_recordDepth();
+ }
+}
+
+void
+JournalImpl::enqueue_extern_txn_data_record(const size_t tot_data_len, data_tok* dtokp,
+ const std::string& xid, const bool transient)
+{
+ bool txn_incr = _mgmtObject.get() != 0 ? _tmap.in_map(xid) : false;
+
+ handleIoResult(jcntl::enqueue_extern_txn_data_record(tot_data_len, dtokp, xid, transient));
+
+ if (_mgmtObject.get() != 0)
+ {
+ if (!txn_incr) // If this xid was not in _tmap, it will be now...
+ _mgmtObject->inc_txn();
+ _mgmtObject->inc_enqueues();
+ _mgmtObject->inc_txnEnqueues();
+ _mgmtObject->inc_recordDepth();
+ }
+}
+
+void
+JournalImpl::dequeue_data_record(data_tok* const dtokp, const bool txn_coml_commit)
+{
+ handleIoResult(jcntl::dequeue_data_record(dtokp, txn_coml_commit));
+
+ if (_mgmtObject.get() != 0)
+ {
+ _mgmtObject->inc_dequeues();
+ _mgmtObject->inc_txnDequeues();
+ _mgmtObject->dec_recordDepth();
+ }
+}
+
+void
+JournalImpl::dequeue_txn_data_record(data_tok* const dtokp, const std::string& xid, const bool txn_coml_commit)
+{
+ bool txn_incr = _mgmtObject.get() != 0 ? _tmap.in_map(xid) : false;
+
+ handleIoResult(jcntl::dequeue_txn_data_record(dtokp, xid, txn_coml_commit));
+
+ if (_mgmtObject.get() != 0)
+ {
+ if (!txn_incr) // If this xid was not in _tmap, it will be now...
+ _mgmtObject->inc_txn();
+ _mgmtObject->inc_dequeues();
+ _mgmtObject->inc_txnDequeues();
+ _mgmtObject->dec_recordDepth();
+ }
+}
+
+void
+JournalImpl::txn_abort(data_tok* const dtokp, const std::string& xid)
+{
+ handleIoResult(jcntl::txn_abort(dtokp, xid));
+
+ if (_mgmtObject.get() != 0)
+ {
+ _mgmtObject->dec_txn();
+ _mgmtObject->inc_txnAborts();
+ }
+}
+
+void
+JournalImpl::txn_commit(data_tok* const dtokp, const std::string& xid)
+{
+ handleIoResult(jcntl::txn_commit(dtokp, xid));
+
+ if (_mgmtObject.get() != 0)
+ {
+ _mgmtObject->dec_txn();
+ _mgmtObject->inc_txnCommits();
+ }
+}
+
+void
+JournalImpl::stop(bool block_till_aio_cmpl)
+{
+ InactivityFireEvent* ifep = dynamic_cast<InactivityFireEvent*>(inactivityFireEventPtr.get());
+ assert(ifep); // dynamic_cast can return null if the cast fails
+ ifep->cancel();
+ jcntl::stop(block_till_aio_cmpl);
+
+ if (_mgmtObject.get() != 0) {
+ _mgmtObject->resourceDestroy();
+ _mgmtObject.reset();
+ }
+}
+
+iores
+JournalImpl::flush(const bool block_till_aio_cmpl)
+{
+ const iores res = jcntl::flush(block_till_aio_cmpl);
+ {
+ qpid::sys::Mutex::ScopedLock sl(_getf_lock);
+ if (_wmgr.get_aio_evt_rem() && !getEventsTimerSetFlag) { setGetEventTimer(); }
+ }
+ return res;
+}
+
+void
+JournalImpl::log(mrg::journal::log_level ll, const std::string& log_stmt) const
+{
+ log(ll, log_stmt.c_str());
+}
+
+void
+JournalImpl::log(mrg::journal::log_level ll, const char* const log_stmt) const
+{
+ switch (ll)
+ {
+ case LOG_TRACE: QPID_LOG(trace, "Journal \"" << _jid << "\": " << log_stmt); break;
+ case LOG_DEBUG: QPID_LOG(debug, "Journal \"" << _jid << "\": " << log_stmt); break;
+ case LOG_INFO: QPID_LOG(info, "Journal \"" << _jid << "\": " << log_stmt); break;
+ case LOG_NOTICE: QPID_LOG(notice, "Journal \"" << _jid << "\": " << log_stmt); break;
+ case LOG_WARN: QPID_LOG(warning, "Journal \"" << _jid << "\": " << log_stmt); break;
+ case LOG_ERROR: QPID_LOG(error, "Journal \"" << _jid << "\": " << log_stmt); break;
+ case LOG_CRITICAL: QPID_LOG(critical, "Journal \"" << _jid << "\": " << log_stmt); break;
+ }
+}
+
+void
+JournalImpl::getEventsFire()
+{
+ qpid::sys::Mutex::ScopedLock sl(_getf_lock);
+ getEventsTimerSetFlag = false;
+ if (_wmgr.get_aio_evt_rem()) { jcntl::get_wr_events(0); }
+ if (_wmgr.get_aio_evt_rem()) { setGetEventTimer(); }
+}
+
+void
+JournalImpl::flushFire()
+{
+ if (writeActivityFlag) {
+ writeActivityFlag = false;
+ flushTriggeredFlag = false;
+ } else {
+ if (!flushTriggeredFlag) {
+ flush();
+ flushTriggeredFlag = true;
+ }
+ }
+ inactivityFireEventPtr->setupNextFire();
+ {
+ timer.add(inactivityFireEventPtr);
+ }
+}
+
+void
+JournalImpl::wr_aio_cb(std::vector<data_tok*>& dtokl)
+{
+ for (std::vector<data_tok*>::const_iterator i=dtokl.begin(); i!=dtokl.end(); i++)
+ {
+ DataTokenImpl* dtokp = static_cast<DataTokenImpl*>(*i);
+ if (/*!is_stopped() &&*/ dtokp->getSourceMessage())
+ {
+ switch (dtokp->wstate())
+ {
+ case data_tok::ENQ:
+ dtokp->getSourceMessage()->enqueueComplete();
+ break;
+ case data_tok::DEQ:
+/* Don't need to signal until we have a way to ack completion of dequeue in AMQP
+ dtokp->getSourceMessage()->dequeueComplete();
+ if ( dtokp->getSourceMessage()->isDequeueComplete() ) // clear id after last dequeue
+ dtokp->getSourceMessage()->setPersistenceId(0);
+*/
+ break;
+ default: ;
+ }
+ }
+ dtokp->release();
+ }
+}
+
+void
+JournalImpl::rd_aio_cb(std::vector<u_int16_t>& /*pil*/)
+{}
+
+void
+JournalImpl::free_read_buffers()
+{
+ if (_xidp) {
+ ::free(_xidp);
+ _xidp = 0;
+ _datap = 0;
+ } else if (_datap) {
+ ::free(_datap);
+ _datap = 0;
+ }
+}
+
+void
+JournalImpl::handleIoResult(const iores r)
+{
+ writeActivityFlag = true;
+ switch (r)
+ {
+ case mrg::journal::RHM_IORES_SUCCESS:
+ return;
+ case mrg::journal::RHM_IORES_ENQCAPTHRESH:
+ {
+ std::ostringstream oss;
+ oss << "Enqueue capacity threshold exceeded on queue \"" << _jid << "\".";
+ log(LOG_WARN, oss.str());
+ if (_agent != 0)
+ _agent->raiseEvent(qmf::org::apache::qpid::legacystore::EventEnqThresholdExceeded(_jid, "Journal enqueue capacity threshold exceeded"),
+ qpid::management::ManagementAgent::SEV_WARN);
+ THROW_STORE_FULL_EXCEPTION(oss.str());
+ }
+ case mrg::journal::RHM_IORES_FULL:
+ {
+ std::ostringstream oss;
+ oss << "Journal full on queue \"" << _jid << "\".";
+ log(LOG_CRITICAL, oss.str());
+ if (_agent != 0)
+ _agent->raiseEvent(qmf::org::apache::qpid::legacystore::EventFull(_jid, "Journal full"), qpid::management::ManagementAgent::SEV_ERROR);
+ THROW_STORE_FULL_EXCEPTION(oss.str());
+ }
+ default:
+ {
+ std::ostringstream oss;
+ oss << "Unexpected I/O response (" << mrg::journal::iores_str(r) << ") on queue " << _jid << "\".";
+ log(LOG_ERROR, oss.str());
+ THROW_STORE_FULL_EXCEPTION(oss.str());
+ }
+ }
+}
+
+qpid::management::Manageable::status_t JournalImpl::ManagementMethod (uint32_t methodId,
+ qpid::management::Args& /*args*/,
+ std::string& /*text*/)
+{
+ Manageable::status_t status = Manageable::STATUS_UNKNOWN_METHOD;
+
+ switch (methodId)
+ {
+ case _qmf::Journal::METHOD_EXPAND :
+ //_qmf::ArgsJournalExpand& eArgs = (_qmf::ArgsJournalExpand&) args;
+
+ // Implement "expand" using eArgs.i_by (expand-by argument)
+
+ status = Manageable::STATUS_NOT_IMPLEMENTED;
+ break;
+ }
+
+ return status;
+}
Propchange: qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/JournalImpl.cpp
------------------------------------------------------------------------------
svn:eol-style = native
Added: qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/JournalImpl.h
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/JournalImpl.h?rev=1501895&view=auto
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/JournalImpl.h (added)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/JournalImpl.h Wed Jul 10 18:20:19 2013
@@ -0,0 +1,265 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#ifndef QPID_LEGACYSTORE_JOURNALIMPL_H
+#define QPID_LEGACYSTORE_JOURNALIMPL_H
+
+#include <set>
+#include "qpid/legacystore/jrnl/enums.h"
+#include "qpid/legacystore/jrnl/jcntl.h"
+#include "qpid/legacystore/DataTokenImpl.h"
+#include "qpid/legacystore/PreparedTransaction.h"
+#include "qpid/broker/PersistableQueue.h"
+#include "qpid/sys/Timer.h"
+#include "qpid/sys/Time.h"
+#include <boost/ptr_container/ptr_list.hpp>
+#include <boost/intrusive_ptr.hpp>
+#include "qpid/management/Manageable.h"
+#include "qmf/org/apache/qpid/legacystore/Journal.h"
+
+namespace qpid { namespace sys {
+class Timer;
+}}
+
+namespace mrg {
+namespace msgstore {
+
+class JournalImpl;
+
+class InactivityFireEvent : public qpid::sys::TimerTask
+{
+ JournalImpl* _parent;
+ qpid::sys::Mutex _ife_lock;
+
+ public:
+ InactivityFireEvent(JournalImpl* p, const qpid::sys::Duration timeout);
+ virtual ~InactivityFireEvent() {}
+ void fire();
+ inline void cancel() { qpid::sys::Mutex::ScopedLock sl(_ife_lock); _parent = 0; }
+};
+
+class GetEventsFireEvent : public qpid::sys::TimerTask
+{
+ JournalImpl* _parent;
+ qpid::sys::Mutex _gefe_lock;
+
+ public:
+ GetEventsFireEvent(JournalImpl* p, const qpid::sys::Duration timeout);
+ virtual ~GetEventsFireEvent() {}
+ void fire();
+ inline void cancel() { qpid::sys::Mutex::ScopedLock sl(_gefe_lock); _parent = 0; }
+};
+
+class JournalImpl : public qpid::broker::ExternalQueueStore, public mrg::journal::jcntl, public mrg::journal::aio_callback
+{
+ public:
+ typedef boost::function<void (JournalImpl&)> DeleteCallback;
+
+ private:
+// static qpid::sys::Mutex _static_lock;
+// static u_int32_t cnt;
+
+ qpid::sys::Timer& timer;
+ bool getEventsTimerSetFlag;
+ boost::intrusive_ptr<qpid::sys::TimerTask> getEventsFireEventsPtr;
+ qpid::sys::Mutex _getf_lock;
+ qpid::sys::Mutex _read_lock;
+
+ u_int64_t lastReadRid; // rid of last read msg for loadMsgContent() - detects out-of-order read requests
+ std::vector<u_int64_t> oooRidList; // list of out-of-order rids (greater than current rid) encountered during read sequence
+
+ bool writeActivityFlag;
+ bool flushTriggeredFlag;
+ boost::intrusive_ptr<qpid::sys::TimerTask> inactivityFireEventPtr;
+
+ // temp local vars for loadMsgContent below
+ void* _xidp;
+ void* _datap;
+ size_t _dlen;
+ mrg::journal::data_tok _dtok;
+ bool _external;
+
+ qpid::management::ManagementAgent* _agent;
+ qmf::org::apache::qpid::legacystore::Journal::shared_ptr _mgmtObject;
+ DeleteCallback deleteCallback;
+
+ public:
+
+ JournalImpl(qpid::sys::Timer& timer,
+ const std::string& journalId,
+ const std::string& journalDirectory,
+ const std::string& journalBaseFilename,
+ const qpid::sys::Duration getEventsTimeout,
+ const qpid::sys::Duration flushTimeout,
+ qpid::management::ManagementAgent* agent,
+ DeleteCallback deleteCallback=DeleteCallback() );
+
+ virtual ~JournalImpl();
+
+ void initManagement(qpid::management::ManagementAgent* agent);
+
+ void initialize(const u_int16_t num_jfiles,
+ const bool auto_expand,
+ const u_int16_t ae_max_jfiles,
+ const u_int32_t jfsize_sblks,
+ const u_int16_t wcache_num_pages,
+ const u_int32_t wcache_pgsize_sblks,
+ mrg::journal::aio_callback* const cbp);
+
+ inline void initialize(const u_int16_t num_jfiles,
+ const bool auto_expand,
+ const u_int16_t ae_max_jfiles,
+ const u_int32_t jfsize_sblks,
+ const u_int16_t wcache_num_pages,
+ const u_int32_t wcache_pgsize_sblks) {
+ initialize(num_jfiles, auto_expand, ae_max_jfiles, jfsize_sblks, wcache_num_pages, wcache_pgsize_sblks,
+ this);
+ }
+
+ void recover(const u_int16_t num_jfiles,
+ const bool auto_expand,
+ const u_int16_t ae_max_jfiles,
+ const u_int32_t jfsize_sblks,
+ const u_int16_t wcache_num_pages,
+ const u_int32_t wcache_pgsize_sblks,
+ mrg::journal::aio_callback* const cbp,
+ boost::ptr_list<msgstore::PreparedTransaction>* prep_tx_list_ptr,
+ u_int64_t& highest_rid,
+ u_int64_t queue_id);
+
+ inline void recover(const u_int16_t num_jfiles,
+ const bool auto_expand,
+ const u_int16_t ae_max_jfiles,
+ const u_int32_t jfsize_sblks,
+ const u_int16_t wcache_num_pages,
+ const u_int32_t wcache_pgsize_sblks,
+ boost::ptr_list<msgstore::PreparedTransaction>* prep_tx_list_ptr,
+ u_int64_t& highest_rid,
+ u_int64_t queue_id) {
+ recover(num_jfiles, auto_expand, ae_max_jfiles, jfsize_sblks, wcache_num_pages, wcache_pgsize_sblks,
+ this, prep_tx_list_ptr, highest_rid, queue_id);
+ }
+
+ void recover_complete();
+
+ // Temporary fn to read and save last msg read from journal so it can be assigned
+ // in chunks. To be replaced when coding to do this direct from the journal is ready.
+ // Returns true if the record is extern, false if local.
+ bool loadMsgContent(u_int64_t rid, std::string& data, size_t length, size_t offset = 0);
+
+ // Overrides for write inactivity timer
+ void enqueue_data_record(const void* const data_buff, const size_t tot_data_len,
+ const size_t this_data_len, mrg::journal::data_tok* dtokp,
+ const bool transient = false);
+
+ void enqueue_extern_data_record(const size_t tot_data_len, mrg::journal::data_tok* dtokp,
+ const bool transient = false);
+
+ void enqueue_txn_data_record(const void* const data_buff, const size_t tot_data_len,
+ const size_t this_data_len, mrg::journal::data_tok* dtokp, const std::string& xid,
+ const bool transient = false);
+
+ void enqueue_extern_txn_data_record(const size_t tot_data_len, mrg::journal::data_tok* dtokp,
+ const std::string& xid, const bool transient = false);
+
+ void dequeue_data_record(mrg::journal::data_tok* const dtokp, const bool txn_coml_commit = false);
+
+ void dequeue_txn_data_record(mrg::journal::data_tok* const dtokp, const std::string& xid, const bool txn_coml_commit = false);
+
+ void txn_abort(mrg::journal::data_tok* const dtokp, const std::string& xid);
+
+ void txn_commit(mrg::journal::data_tok* const dtokp, const std::string& xid);
+
+ void stop(bool block_till_aio_cmpl = false);
+
+ // Logging
+ void log(mrg::journal::log_level level, const std::string& log_stmt) const;
+ void log(mrg::journal::log_level level, const char* const log_stmt) const;
+
+ // Overrides for get_events timer
+ mrg::journal::iores flush(const bool block_till_aio_cmpl = false);
+
+ // TimerTask callback
+ void getEventsFire();
+ void flushFire();
+
+ // AIO callbacks
+ virtual void wr_aio_cb(std::vector<mrg::journal::data_tok*>& dtokl);
+ virtual void rd_aio_cb(std::vector<u_int16_t>& pil);
+
+ qpid::management::ManagementObject::shared_ptr GetManagementObject (void) const
+ { return _mgmtObject; }
+
+ qpid::management::Manageable::status_t ManagementMethod (uint32_t,
+ qpid::management::Args&,
+ std::string&);
+
+ void resetDeleteCallback() { deleteCallback = DeleteCallback(); }
+
+ private:
+ void free_read_buffers();
+
+ inline void setGetEventTimer()
+ {
+ getEventsFireEventsPtr->setupNextFire();
+ timer.add(getEventsFireEventsPtr);
+ getEventsTimerSetFlag = true;
+ }
+ void handleIoResult(const mrg::journal::iores r);
+
+ // Management instrumentation callbacks overridden from jcntl
+ inline void instr_incr_outstanding_aio_cnt() {
+ if (_mgmtObject.get() != 0) _mgmtObject->inc_outstandingAIOs();
+ }
+ inline void instr_decr_outstanding_aio_cnt() {
+ if (_mgmtObject.get() != 0) _mgmtObject->dec_outstandingAIOs();
+ }
+
+}; // class JournalImpl
+
+class TplJournalImpl : public JournalImpl
+{
+ public:
+ TplJournalImpl(qpid::sys::Timer& timer,
+ const std::string& journalId,
+ const std::string& journalDirectory,
+ const std::string& journalBaseFilename,
+ const qpid::sys::Duration getEventsTimeout,
+ const qpid::sys::Duration flushTimeout,
+ qpid::management::ManagementAgent* agent) :
+ JournalImpl(timer, journalId, journalDirectory, journalBaseFilename, getEventsTimeout, flushTimeout, agent)
+ {}
+
+ virtual ~TplJournalImpl() {}
+
+ // Special version of read_data_record that ignores transactions - needed when reading the TPL
+ inline mrg::journal::iores read_data_record(void** const datapp, std::size_t& dsize,
+ void** const xidpp, std::size_t& xidsize, bool& transient, bool& external,
+ mrg::journal::data_tok* const dtokp) {
+ return JournalImpl::read_data_record(datapp, dsize, xidpp, xidsize, transient, external, dtokp, true);
+ }
+ inline void read_reset() { _rmgr.invalidate(); }
+}; // class TplJournalImpl
+
+} // namespace msgstore
+} // namespace mrg
+
+#endif // ifndef QPID_LEGACYSTORE_JOURNALIMPL_H
Propchange: qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/JournalImpl.h
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/JournalImpl.h
------------------------------------------------------------------------------
svn:keywords = Author Date Id Rev URL
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org