You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@marmotta.apache.org by ss...@apache.org on 2016/08/25 11:04:27 UTC
[6/8] marmotta git commit: Ostrich: - separate RocksDB and LevelDB
support - refactor the whole persistence architecture to be more object
oriented and easier to understand
Ostrich:
- separate RocksDB and LevelDB support
- refactor the whole persistence architecture to be more object oriented and easier to understand
Project: http://git-wip-us.apache.org/repos/asf/marmotta/repo
Commit: http://git-wip-us.apache.org/repos/asf/marmotta/commit/b8d122a1
Tree: http://git-wip-us.apache.org/repos/asf/marmotta/tree/b8d122a1
Diff: http://git-wip-us.apache.org/repos/asf/marmotta/diff/b8d122a1
Branch: refs/heads/develop
Commit: b8d122a19406008ef5a6c1c7ce119f4563a8be35
Parents: d811ef3
Author: Sebastian Schaffert <ss...@apache.org>
Authored: Thu Aug 25 10:58:19 2016 +0200
Committer: Sebastian Schaffert <ss...@apache.org>
Committed: Thu Aug 25 10:58:19 2016 +0200
----------------------------------------------------------------------
libraries/ostrich/backend/CMakeLists.txt | 19 +-
.../ostrich/backend/persistence/CMakeLists.txt | 24 +-
.../backend/persistence/base_persistence.cc | 193 ++++++++
.../backend/persistence/base_persistence.h | 217 ++++++++
.../backend/persistence/leveldb_persistence.cc | 435 +++++------------
.../backend/persistence/leveldb_persistence.h | 75 +--
.../backend/persistence/leveldb_server.cc | 1 +
.../backend/persistence/leveldb_service.cc | 22 +-
.../backend/persistence/leveldb_service.h | 10 +-
.../backend/persistence/leveldb_sparql.h | 6 +-
.../backend/persistence/rocksdb_persistence.cc | 489 +++++++++++++++++++
.../backend/persistence/rocksdb_persistence.h | 163 +++++++
.../backend/persistence/rocksdb_server.cc | 75 +++
libraries/ostrich/backend/test/CMakeLists.txt | 4 +
libraries/ostrich/backend/test/LevelDBTest.cc | 268 ++++++++++
.../ostrich/backend/test/PersistenceTest.cc | 277 ++---------
16 files changed, 1627 insertions(+), 651 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/marmotta/blob/b8d122a1/libraries/ostrich/backend/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/libraries/ostrich/backend/CMakeLists.txt b/libraries/ostrich/backend/CMakeLists.txt
index 87dfc7f..7c64230 100644
--- a/libraries/ostrich/backend/CMakeLists.txt
+++ b/libraries/ostrich/backend/CMakeLists.txt
@@ -12,10 +12,10 @@ find_package (Rasqal REQUIRED)
find_package (GFlags REQUIRED)
find_package (Protobuf REQUIRED)
find_package (GRPC REQUIRED)
-find_package (LevelDB REQUIRED)
+find_package (LevelDB)
+find_package (RocksDB)
find_package (GLog REQUIRED)
find_package (Boost 1.54.0 COMPONENTS iostreams filesystem system)
-find_package (RocksDB)
find_package (Tcmalloc)
add_definitions(-DNDEBUG)
@@ -26,14 +26,17 @@ if (Boost_IOSTREAMS_FOUND)
endif (Boost_IOSTREAMS_FOUND)
if (RocksDB_FOUND)
- message(STATUS "Enabling RocksDB support (RocksDB found)")
- add_definitions(-DHAVE_ROCKSDB)
- set(PERSISTENCE_LIBRARY ${RocksDB_LIBRARY})
-else (RocksDB_FOUND)
- message(STATUS "Using standard LevelDB (RocksDB not found)")
- set(PERSISTENCE_LIBRARY ${LevelDB_LIBRARY})
+ message(STATUS "Enabling RocksDB support")
endif (RocksDB_FOUND)
+if (LevelDB_FOUND)
+ message(STATUS "Enabling LevelDB support")
+endif (LevelDB_FOUND)
+
+if ((NOT LevelDB_FOUND) AND (NOT RocksDB_FOUND))
+ message(FATAL_ERROR "Could not find any persistence library (RocksDB or LevelDB")
+endif()
+
if (Tcmalloc_FOUND)
message(STATUS "Enabling profiling support (Tcmalloc found)")
endif (Tcmalloc_FOUND)
http://git-wip-us.apache.org/repos/asf/marmotta/blob/b8d122a1/libraries/ostrich/backend/persistence/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/libraries/ostrich/backend/persistence/CMakeLists.txt b/libraries/ostrich/backend/persistence/CMakeLists.txt
index b6ec1b5..83bb144 100644
--- a/libraries/ostrich/backend/persistence/CMakeLists.txt
+++ b/libraries/ostrich/backend/persistence/CMakeLists.txt
@@ -2,17 +2,31 @@ include_directories(.. ${CMAKE_CURRENT_BINARY_DIR}/.. ${CMAKE_CURRENT_BINARY_DIR
# Shared Marmotta Ostrich persistence implementation
add_library(marmotta_leveldb
- leveldb_persistence.cc leveldb_persistence.h leveldb_sparql.cc leveldb_sparql.h)
+ leveldb_persistence.cc leveldb_persistence.h leveldb_sparql.cc leveldb_sparql.h base_persistence.cc base_persistence.h)
target_link_libraries(marmotta_leveldb
marmotta_model marmotta_util marmotta_sparql marmotta_service
- ${PERSISTENCE_LIBRARY} ${GLOG_LIBRARY} ${PROTOBUF_LIBRARIES})
+ ${LevelDB_LIBRARY} ${GLOG_LIBRARY} ${PROTOBUF_LIBRARIES})
# Server binary
-add_executable(marmotta_persistence
+add_executable(leveldb_persistence
leveldb_service.cc leveldb_service.h leveldb_server.cc )
-target_link_libraries(marmotta_persistence marmotta_service marmotta_leveldb
+target_link_libraries(leveldb_persistence marmotta_service marmotta_leveldb
${GFLAGS_LIBRARY} ${CMAKE_THREAD_LIBS_INIT} ${GRPC_LIBRARIES} ${Tcmalloc_LIBRARIES})
-install(TARGETS marmotta_persistence DESTINATION bin)
+install(TARGETS leveldb_persistence DESTINATION bin)
+
+add_library(marmotta_rocksdb
+ rocksdb_persistence.cc rocksdb_persistence.h leveldb_sparql.cc leveldb_sparql.h base_persistence.cc base_persistence.h)
+target_link_libraries(marmotta_rocksdb
+ marmotta_model marmotta_util marmotta_sparql marmotta_service
+ ${RocksDB_LIBRARY} ${GLOG_LIBRARY} ${PROTOBUF_LIBRARIES})
+
+# Server binary
+add_executable(rocksdb_persistence
+ leveldb_service.cc leveldb_service.h rocksdb_server.cc )
+target_link_libraries(rocksdb_persistence marmotta_service marmotta_rocksdb
+ ${GFLAGS_LIBRARY} ${CMAKE_THREAD_LIBS_INIT} ${GRPC_LIBRARIES} ${Tcmalloc_LIBRARIES})
+install(TARGETS rocksdb_persistence DESTINATION bin)
+
# Command line admin tool
add_executable(marmotta_updatedb marmotta_updatedb.cc)
http://git-wip-us.apache.org/repos/asf/marmotta/blob/b8d122a1/libraries/ostrich/backend/persistence/base_persistence.cc
----------------------------------------------------------------------
diff --git a/libraries/ostrich/backend/persistence/base_persistence.cc b/libraries/ostrich/backend/persistence/base_persistence.cc
new file mode 100644
index 0000000..6635647
--- /dev/null
+++ b/libraries/ostrich/backend/persistence/base_persistence.cc
@@ -0,0 +1,193 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "persistence/base_persistence.h"
+
+#include <cstring>
+
+#include "model/rdf_operators.h"
+#include "util/murmur3.h"
+
+using marmotta::rdf::proto::Statement;
+
+namespace marmotta {
+namespace persistence {
+namespace {
+inline bool computeKey(const std::string* s, char* result) {
+ // 128bit keys, use murmur
+ if (s != nullptr) {
+#ifdef __x86_64__
+ MurmurHash3_x64_128(s->data(), s->size(), 13, result);
+#else
+ MurmurHash3_x86_128(s->data(), s->size(), 13, result);
+#endif
+ return true;
+ } else {
+ return false;
+ }
+}
+
+inline bool computeKey(
+ const google::protobuf::Message& msg, bool enabled, char* result) {
+ if (enabled) {
+ std::string s;
+ msg.SerializeToString(&s);
+ return computeKey(&s, result);
+ }
+ return false;
+}
+
+inline void copyKey(const char* hash, bool enabled, int base, char* dest) {
+ if (enabled)
+ memcpy(dest, hash, kKeyLength);
+ else
+ memset(dest, base, kKeyLength);
+}
+}
+
+Key::Key(const std::string* s, const std::string* p,
+ const std::string* o, const std::string* c)
+ : sEnabled(computeKey(s, sHash)), pEnabled(computeKey(p, pHash))
+ , oEnabled(computeKey(o, oHash)), cEnabled(computeKey(c, cHash)) {
+}
+
+Key::Key(const rdf::proto::Statement& stmt)
+ : sEnabled(computeKey(stmt.subject(), stmt.has_subject(), sHash))
+ , pEnabled(computeKey(stmt.predicate(), stmt.has_predicate(), pHash))
+ , oEnabled(computeKey(stmt.object(), stmt.has_object(), oHash))
+ , cEnabled(computeKey(stmt.context(), stmt.has_context(), cHash)) {
+}
+
+char* Key::Create(IndexTypes type, BoundTypes bound) const {
+ char* result = new char[kKeyLength * 4];
+ memset(result, 0x00, kKeyLength);
+
+ int base = 0x00;
+
+ switch (bound) {
+ case LOWER:
+ base = 0x00;
+ break;
+ case UPPER:
+ base = 0xFF;
+ break;
+ }
+
+ switch (type) {
+ case SPOC:
+ copyKey(sHash, sEnabled, base, result);
+ copyKey(pHash, pEnabled, base, &result[kKeyLength]);
+ copyKey(oHash, oEnabled, base, &result[2 * kKeyLength]);
+ copyKey(cHash, cEnabled, base, &result[3 * kKeyLength]);
+ break;
+ case CSPO:
+ copyKey(cHash, cEnabled, base, result);
+ copyKey(sHash, sEnabled, base, &result[kKeyLength]);
+ copyKey(pHash, pEnabled, base, &result[2 * kKeyLength]);
+ copyKey(oHash, oEnabled, base, &result[3 * kKeyLength]);
+ break;
+ case OPSC:
+ copyKey(oHash, oEnabled, base, result);
+ copyKey(pHash, pEnabled, base, &result[kKeyLength]);
+ copyKey(sHash, sEnabled, base, &result[2 * kKeyLength]);
+ copyKey(cHash, cEnabled, base, &result[3 * kKeyLength]);
+ break;
+ case PCOS:
+ copyKey(pHash, pEnabled, base, result);
+ copyKey(cHash, cEnabled, base, &result[kKeyLength]);
+ copyKey(oHash, oEnabled, base, &result[2 * kKeyLength]);
+ copyKey(sHash, sEnabled, base, &result[3 * kKeyLength]);
+ break;
+ }
+ return result;
+}
+
+
+
+Pattern::Pattern(const Statement& pattern) : key_(pattern), needsFilter_(true) {
+
+ if (pattern.has_subject()) {
+ // Subject is usually most selective, so if it is present use the
+ // subject-based databases first.
+ if (pattern.has_context()) {
+ type_ = CSPO;
+ } else {
+ type_ = SPOC;
+ }
+
+ // Filter needed if there is no predicate but an object.
+ needsFilter_ = !(pattern.has_predicate()) && pattern.has_object();
+ } else if (pattern.has_object()) {
+ // Second-best option is object.
+ type_ = OPSC;
+
+ // Filter needed if there is a context (subject already checked, predicate irrelevant).
+ needsFilter_ = pattern.has_context();
+ } else if (pattern.has_predicate()) {
+ // Predicate is usually least selective.
+ type_ = PCOS;
+
+ // No filter needed, object and subject are not set.
+ needsFilter_ = false;
+ } else if (pattern.has_context()) {
+ type_ = CSPO;
+
+ // No filter needed, subject, predicate object are not set.
+ needsFilter_ = false;
+ } else {
+ // Fall back to SPOC.
+ type_ = SPOC;
+
+ // No filter needed, we just scan from the beginning.
+ needsFilter_ = false;
+ }
+}
+
+/**
+ * Return the lower key for querying the index (range [MinKey,MaxKey) ).
+ */
+char* Pattern::MinKey() const {
+ return key_.Create(Type(), LOWER);
+}
+
+/**
+ * Return the upper key for querying the index (range [MinKey,MaxKey) ).
+ */
+char* Pattern::MaxKey() const {
+ return key_.Create(Type(), UPPER);
+}
+
+
+// Return true if the statement matches the pattern. Wildcards (empty fields)
+// in the pattern are ignored.
+bool Matches(const Statement& pattern, const Statement& stmt) {
+ // equality operators defined in rdf_model.h
+ if (pattern.has_context() && stmt.context() != pattern.context()) {
+ return false;
+ }
+ if (pattern.has_subject() && stmt.subject() != pattern.subject()) {
+ return false;
+ }
+ if (pattern.has_predicate() && stmt.predicate() != pattern.predicate()) {
+ return false;
+ }
+ return !(pattern.has_object() && stmt.object() != pattern.object());
+}
+
+} // namespace persistence
+} // namespace marmotta
+
http://git-wip-us.apache.org/repos/asf/marmotta/blob/b8d122a1/libraries/ostrich/backend/persistence/base_persistence.h
----------------------------------------------------------------------
diff --git a/libraries/ostrich/backend/persistence/base_persistence.h b/libraries/ostrich/backend/persistence/base_persistence.h
new file mode 100644
index 0000000..89a5822
--- /dev/null
+++ b/libraries/ostrich/backend/persistence/base_persistence.h
@@ -0,0 +1,217 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef MARMOTTA_BASE_PERSISTENCE_H
+#define MARMOTTA_BASE_PERSISTENCE_H
+
+#include <string>
+
+#include "model/rdf_model.h"
+#include "service/sail.pb.h"
+#include "util/iterator.h"
+
+namespace marmotta {
+namespace persistence {
+
+constexpr int kKeyLength = 16;
+
+enum IndexTypes {
+ SPOC, CSPO, OPSC, PCOS
+};
+
+enum BoundTypes {
+ LOWER, UPPER
+};
+
+class Key {
+ public:
+ // Create key for the given string quadruple using a Murmer3 hash for each
+ // component.
+ Key(const std::string* s, const std::string* p,
+ const std::string* o, const std::string* c);
+
+ // Create key for the given statement. Some fields may be unset.
+ Key(const rdf::proto::Statement& stmt);
+
+ // Create the key for the given index type. Returns a newly allocated char
+ // array that needs to be deleted by the caller using free().
+ char* Create(IndexTypes type, BoundTypes bound = LOWER) const;
+
+ private:
+ bool sEnabled, pEnabled, oEnabled, cEnabled;
+ char sHash[kKeyLength], pHash[kKeyLength], oHash[kKeyLength], cHash[kKeyLength];
+};
+
+
+/**
+ * A pattern for querying the index of a key-value store supporting range queries
+ * like LevelDB or RocksDB.
+ */
+class Pattern {
+ public:
+ Pattern(const rdf::proto::Statement& pattern);
+
+ /**
+ * Return the lower key for querying the index (range [MinKey,MaxKey) ).
+ */
+ char* MinKey() const;
+
+ /**
+ * Return the upper key for querying the index (range [MinKey,MaxKey) ).
+ */
+ char* MaxKey() const;
+
+ IndexTypes Type() const {
+ return type_;
+ }
+
+ Pattern& Type(IndexTypes t) {
+ type_ = t;
+ return *this;
+ }
+
+ // Returns true in case this query pattern cannot be answered by the index alone.
+ bool NeedsFilter() const {
+ return needsFilter_;
+ }
+
+ private:
+ Key key_;
+ IndexTypes type_;
+ bool needsFilter_;
+};
+
+class Persistence {
+ public:
+ typedef util::CloseableIterator<rdf::proto::Statement> StatementIterator;
+ typedef util::CloseableIterator<rdf::proto::Namespace> NamespaceIterator;
+ typedef util::CloseableIterator<service::proto::UpdateRequest> UpdateIterator;
+
+ typedef std::function<bool(const rdf::proto::Statement&)> StatementHandler;
+ typedef std::function<bool(const rdf::proto::Namespace&)> NamespaceHandler;
+
+
+ /**
+ * Add the namespaces in the iterator to the database.
+ */
+ virtual service::proto::UpdateResponse AddNamespaces(
+ NamespaceIterator& it) = 0;
+
+ /**
+ * Add the statements in the iterator to the database.
+ */
+ virtual service::proto::UpdateResponse AddStatements(
+ StatementIterator& it) = 0;
+
+ /**
+ * Get all statements matching the pattern (which may have some fields
+ * unset to indicate wildcards). Call the callback function for each
+ * result.
+ */
+ virtual void GetStatements(
+ const rdf::proto::Statement& pattern, StatementHandler callback) = 0;
+
+ /**
+ * Get all statements matching the pattern (which may have some fields
+ * unset to indicate wildcards). Call the callback function for each
+ * result.
+ */
+ virtual std::unique_ptr<StatementIterator> GetStatements(
+ const rdf::proto::Statement& pattern) = 0;
+
+ /**
+ * Get all namespaces matching the pattern (which may have some of all
+ * fields unset to indicate wildcards). Call the callback function for
+ * each result.
+ */
+ virtual void GetNamespaces(
+ const rdf::proto::Namespace &pattern, NamespaceHandler callback) = 0;
+
+ /**
+ * Get all namespaces matching the pattern (which may have some of all
+ * fields unset to indicate wildcards). Call the callback function for
+ * each result.
+ */
+ virtual std::unique_ptr<NamespaceIterator> GetNamespaces(
+ const rdf::proto::Namespace &pattern) = 0;
+
+ /**
+ * Remove all statements matching the pattern (which may have some fields
+ * unset to indicate wildcards).
+ */
+ virtual service::proto::UpdateResponse RemoveStatements(
+ const rdf::proto::Statement& pattern) = 0;
+
+ /**
+ * Apply a batch of updates (mixed statement/namespace adds and removes).
+ * The updates are collected in LevelDB batches and written atomically to
+ * the database when iteration ends.
+ */
+ virtual service::proto::UpdateResponse Update(
+ UpdateIterator& it) = 0;
+
+ /**
+ * Return the size of this database.
+ */
+ virtual int64_t Size() = 0;
+};
+
+
+// Base iterator for wrapping a LevelDB-style database iterators.
+template<typename T, typename Iterator>
+class DBIterator : public util::CloseableIterator<T> {
+ public:
+
+ DBIterator(Iterator *it)
+ : it(it) {
+ it->SeekToFirst();
+ }
+
+ virtual ~DBIterator() override {
+ delete it;
+ };
+
+ const T& next() override {
+ // Parse current position, then iterate to next position for next call.
+ proto.ParseFromString(it->value().ToString());
+ it->Next();
+ return proto;
+ };
+
+ const T& current() const override {
+ return proto;
+ };
+
+ virtual bool hasNext() override {
+ return it->Valid();
+ }
+
+ protected:
+ Iterator* it;
+ T proto;
+};
+
+
+// Return true if the statement matches the pattern. Wildcards (empty fields)
+// in the pattern are ignored.
+bool Matches(const rdf::proto::Statement& pattern,
+ const rdf::proto::Statement& stmt);
+
+} // namespace persistence
+} // namespace marmotta
+
+#endif //MARMOTTA_BASE_PERSISTENCE_H
http://git-wip-us.apache.org/repos/asf/marmotta/blob/b8d122a1/libraries/ostrich/backend/persistence/leveldb_persistence.cc
----------------------------------------------------------------------
diff --git a/libraries/ostrich/backend/persistence/leveldb_persistence.cc b/libraries/ostrich/backend/persistence/leveldb_persistence.cc
index efc1dc6..23e2df4 100644
--- a/libraries/ostrich/backend/persistence/leveldb_persistence.cc
+++ b/libraries/ostrich/backend/persistence/leveldb_persistence.cc
@@ -30,7 +30,6 @@
#include "leveldb_persistence.h"
#include "model/rdf_operators.h"
-#include "util/murmur3.h"
#include "util/unique.h"
#define CHECK_STATUS(s) CHECK(s.ok()) << "Writing to database failed: " << s.ToString()
@@ -38,8 +37,8 @@
DEFINE_int64(write_batch_size, 1000000,
"Maximum number of statements to write in a single batch to the database");
-using dbimpl::WriteBatch;
-using dbimpl::Slice;
+using leveldb::WriteBatch;
+using leveldb::Slice;
using marmotta::rdf::proto::Statement;
using marmotta::rdf::proto::Namespace;
using marmotta::rdf::proto::Resource;
@@ -48,218 +47,26 @@ namespace marmotta {
namespace persistence {
namespace {
-
-// Creates an index key based on hashing values of the 4 messages in proper order.
-inline void computeKey(const std::string* a, const std::string* b, const std::string* c, const std::string* d, char* result) {
- // 128bit keys, use murmur
- int offset = 0;
- for (auto m : {a, b, c, d}) {
- if (m != nullptr) {
-#ifdef __x86_64__
- MurmurHash3_x64_128(m->data(), m->size(), 13, &result[offset]);
-#else
- MurmurHash3_x86_128(m->data(), m->size(), 13, &result[offset]);
-#endif
- } else {
- return;
- }
- offset += KEY_LENGTH;
- }
-}
-
-enum Position {
- S = 0, P = 1, O = 2, C = 3
-};
-
-// Reorder a hash key from the generated SPOC key without requiring to recompute murmur.
-inline void orderKey(char* dest, const char* src, Position a, Position b, Position c, Position d) {
- int offset = 0;
- for (int m : {a, b, c, d}) {
- memcpy(&dest[offset], &src[m * KEY_LENGTH], KEY_LENGTH * sizeof(char));
- offset += KEY_LENGTH;
- }
-}
-
-/**
- * Helper class to define proper cache keys and identify the index to use based on
- * fields available in the pattern.
- */
-class PatternQuery {
- public:
- enum IndexType {
- SPOC, CSPO, OPSC, PCOS
- };
-
- PatternQuery(const Statement& pattern) : pattern(pattern), needsFilter(true) {
- if (pattern.has_subject()) {
- s.reset(new std::string());
- pattern.subject().SerializeToString(s.get());
- }
- if (pattern.has_predicate()) {
- p.reset(new std::string());
- pattern.predicate().SerializeToString(p.get());
- }
- if (pattern.has_object()) {
- o.reset(new std::string());
- pattern.object().SerializeToString(o.get());
- }
- if (pattern.has_context()) {
- c.reset(new std::string());
- pattern.context().SerializeToString(c.get());
- }
-
- if (pattern.has_subject()) {
- // Subject is usually most selective, so if it is present use the
- // subject-based databases first.
- if (pattern.has_context()) {
- type_ = CSPO;
- } else {
- type_ = SPOC;
- }
-
- // Filter needed if there is no predicate but an object.
- needsFilter = !(pattern.has_predicate()) && pattern.has_object();
- } else if (pattern.has_object()) {
- // Second-best option is object.
- type_ = OPSC;
-
- // Filter needed if there is a context (subject already checked, predicate irrelevant).
- needsFilter = pattern.has_context();
- } else if (pattern.has_predicate()) {
- // Predicate is usually least selective.
- type_ = PCOS;
-
- // No filter needed, object and subject are not set.
- needsFilter = false;
- } else if (pattern.has_context()) {
- type_ = CSPO;
-
- // No filter needed, subject, predicate object are not set.
- needsFilter = false;
- } else {
- // Fall back to SPOC.
- type_ = SPOC;
-
- // No filter needed, we just scan from the beginning.
- needsFilter = false;
- }
- }
-
- /**
- * Return the lower key for querying the index (range [MinKey,MaxKey) ).
- */
- char* MinKey() const {
- char* result = (char*)calloc(4 * KEY_LENGTH, sizeof(char));
- compute(result);
- return result;
- }
-
- /**
- * Return the upper key for querying the index (range [MinKey,MaxKey) ).
- */
- char* MaxKey() const {
- char* result = (char*)malloc(4 * KEY_LENGTH * sizeof(char));
- for (int i=0; i < 4 * KEY_LENGTH; i++) {
- result[i] = (char)0xFF;
- }
-
- compute(result);
- return result;
- }
-
- IndexType Type() const {
- return type_;
- }
-
- PatternQuery& Type(IndexType t) {
- type_ = t;
- return *this;
- }
-
- // Returns true in case this query pattern cannot be answered by the index alone.
- bool NeedsFilter() const {
- return needsFilter;
- }
-
- private:
- const Statement& pattern;
- std::unique_ptr<std::string> s, p, o, c;
-
- // Creates a cache key based on hashing values of the 4 messages in proper order.
- void compute(char* result) const {
- switch(Type()) {
- case SPOC:
- computeKey(s.get(), p.get(), o.get(), c.get(), result);
- break;
- case CSPO:
- computeKey(c.get(), s.get(), p.get(), o.get(), result);
- break;
- case OPSC:
- computeKey(o.get(), p.get(), s.get(), c.get(), result);
- break;
- case PCOS:
- computeKey(p.get(), c.get(), o.get(), s.get(), result);
- break;
- }
- }
-
- IndexType type_;
- bool needsFilter = true;
-};
-
-
-// Base tterator for wrapping a LevelDB iterators.
template<typename T>
-class LevelDBIterator : public util::CloseableIterator<T> {
- public:
-
- LevelDBIterator(dbimpl::Iterator *it)
- : it(it) {
- it->SeekToFirst();
- }
-
- virtual ~LevelDBIterator() override {
- delete it;
- };
-
- const T& next() override {
- // Parse current position, then iterate to next position for next call.
- proto.ParseFromString(it->value().ToString());
- it->Next();
- return proto;
- };
-
- const T& current() const override {
- return proto;
- };
-
- virtual bool hasNext() override {
- return it->Valid();
- }
-
- protected:
- dbimpl::Iterator* it;
- T proto;
-};
-
+using LevelDBIterator = DBIterator<T, leveldb::Iterator>;
// Iterator wrapping a LevelDB Statement iterator over a given key range.
class StatementRangeIterator : public LevelDBIterator<Statement> {
public:
- StatementRangeIterator(dbimpl::Iterator *it, char *loKey, char *hiKey)
- : LevelDBIterator(it), loKey(loKey), hiKey(hiKey) {
- it->Seek(dbimpl::Slice(loKey, 4 * KEY_LENGTH));
+ StatementRangeIterator(leveldb::Iterator *it, char *loKey, char *hiKey)
+ : DBIterator(it), loKey(loKey), hiKey(hiKey) {
+ it->Seek(leveldb::Slice(loKey, 4 * KEY_LENGTH));
}
~StatementRangeIterator() override {
- free(loKey);
- free(hiKey);
+ delete[] loKey;
+ delete[] hiKey;
};
bool hasNext() override {
- return it->Valid() && it->key().compare(dbimpl::Slice(hiKey, 4 * KEY_LENGTH)) <= 0;
+ return it->Valid() && it->key().compare(leveldb::Slice(hiKey, 4 * KEY_LENGTH)) <= 0;
}
private:
@@ -267,44 +74,23 @@ class StatementRangeIterator : public LevelDBIterator<Statement> {
char *hiKey;
};
-// Return true if the statement matches the pattern. Wildcards (empty fields)
-// in the pattern are ignored.
-bool Matches(const Statement& pattern, const Statement& stmt) {
- // equality operators defined in rdf_model.h
- if (pattern.has_context() && stmt.context() != pattern.context()) {
- return false;
- }
- if (pattern.has_subject() && stmt.subject() != pattern.subject()) {
- return false;
- }
- if (pattern.has_predicate() && stmt.predicate() != pattern.predicate()) {
- return false;
- }
- return !(pattern.has_object() && stmt.object() != pattern.object());
-}
-
-
} // namespace
/**
* Build database with default options.
*/
-dbimpl::DB* buildDB(const std::string& path, const std::string& suffix, const dbimpl::Options& options) {
- dbimpl::DB* db;
- dbimpl::Status status = dbimpl::DB::Open(options, path + "/" + suffix + ".db", &db);
+leveldb::DB* buildDB(const std::string& path, const std::string& suffix, const leveldb::Options& options) {
+ leveldb::DB* db;
+ leveldb::Status status = leveldb::DB::Open(options, path + "/" + suffix + ".db", &db);
CHECK_STATUS(status);
return db;
}
-dbimpl::Options* buildOptions(KeyComparator* cmp, dbimpl::Cache* cache) {
- dbimpl::Options *options = new dbimpl::Options();
+leveldb::Options* buildOptions(KeyComparator* cmp, leveldb::Cache* cache) {
+ leveldb::Options *options = new leveldb::Options();
options->create_if_missing = true;
-#ifdef HAVE_ROCKSDB
- options->IncreaseParallelism();
- options->OptimizeLevelStyleCompaction();
-#else
// Custom comparator for our keys.
options->comparator = cmp;
@@ -315,20 +101,20 @@ dbimpl::Options* buildOptions(KeyComparator* cmp, dbimpl::Cache* cache) {
options->write_buffer_size = 16384 * 1024;
// Set a bloom filter of 10 bits.
- options->filter_policy = dbimpl::NewBloomFilterPolicy(10);
-#endif
+ options->filter_policy = leveldb::NewBloomFilterPolicy(10);
+
return options;
}
-dbimpl::Options buildNsOptions() {
- dbimpl::Options options;
+leveldb::Options buildNsOptions() {
+ leveldb::Options options;
options.create_if_missing = true;
return options;
}
LevelDBPersistence::LevelDBPersistence(const std::string &path, int64_t cacheSize)
: workers(8), comparator(new KeyComparator())
- , cache(dbimpl::NewLRUCache(cacheSize))
+ , cache(leveldb::NewLRUCache(cacheSize))
, options(buildOptions(comparator.get(), cache.get()))
, db_ns_prefix(buildDB(path, "ns_prefix", buildNsOptions()))
, db_ns_url(buildDB(path, "ns_url", buildNsOptions()))
@@ -350,6 +136,7 @@ LevelDBPersistence::LevelDBPersistence(const std::string &path, int64_t cacheSiz
}));
+ LOG(INFO) << "Opening LevelDB database ...";
for (auto& t : openers) {
t.wait();
}
@@ -363,22 +150,24 @@ LevelDBPersistence::LevelDBPersistence(const std::string &path, int64_t cacheSiz
}
-int64_t LevelDBPersistence::AddNamespaces(NamespaceIterator& it) {
+service::proto::UpdateResponse LevelDBPersistence::AddNamespaces(NamespaceIterator& it) {
DLOG(INFO) << "Starting batch namespace import operation.";
int64_t count = 0;
- dbimpl::WriteBatch batch_prefix, batch_url;
+ leveldb::WriteBatch batch_prefix, batch_url;
while (it.hasNext()) {
AddNamespace(it.next(), batch_prefix, batch_url);
count++;
}
- CHECK_STATUS(db_ns_prefix->Write(dbimpl::WriteOptions(), &batch_prefix));
- CHECK_STATUS(db_ns_url->Write(dbimpl::WriteOptions(), &batch_url));
+ CHECK_STATUS(db_ns_prefix->Write(leveldb::WriteOptions(), &batch_prefix));
+ CHECK_STATUS(db_ns_url->Write(leveldb::WriteOptions(), &batch_url));
DLOG(INFO) << "Imported " << count << " namespaces";
- return count;
+ service::proto::UpdateResponse result;
+ result.set_added_namespaces(count);
+ return result;
}
std::unique_ptr<LevelDBPersistence::NamespaceIterator> LevelDBPersistence::GetNamespaces(
@@ -387,7 +176,7 @@ std::unique_ptr<LevelDBPersistence::NamespaceIterator> LevelDBPersistence::GetNa
Namespace ns;
- dbimpl::DB *db = nullptr;
+ leveldb::DB *db = nullptr;
std::string key, value;
if (pattern.prefix() != "") {
key = pattern.prefix();
@@ -398,7 +187,7 @@ std::unique_ptr<LevelDBPersistence::NamespaceIterator> LevelDBPersistence::GetNa
}
if (db != nullptr) {
// Either prefix or uri given, report the correct namespace value.
- dbimpl::Status s = db->Get(dbimpl::ReadOptions(), key, &value);
+ leveldb::Status s = db->Get(leveldb::ReadOptions(), key, &value);
if (s.ok()) {
ns.ParseFromString(value);
return util::make_unique<util::SingletonIterator<Namespace>>(std::move(ns));
@@ -408,7 +197,7 @@ std::unique_ptr<LevelDBPersistence::NamespaceIterator> LevelDBPersistence::GetNa
} else {
// Pattern was empty, iterate over all namespaces and report them.
return util::make_unique<LevelDBIterator<Namespace>>(
- db_ns_prefix->NewIterator(dbimpl::ReadOptions()));
+ db_ns_prefix->NewIterator(leveldb::ReadOptions()));
}
}
@@ -427,28 +216,28 @@ void LevelDBPersistence::GetNamespaces(
}
-int64_t LevelDBPersistence::AddStatements(StatementIterator& it) {
+service::proto::UpdateResponse LevelDBPersistence::AddStatements(StatementIterator& it) {
auto start = std::chrono::steady_clock::now();
LOG(INFO) << "Starting batch statement import operation.";
int64_t count = 0;
- dbimpl::WriteBatch batch_spoc, batch_cspo, batch_opsc, batch_pcos;
+ leveldb::WriteBatch batch_spoc, batch_cspo, batch_opsc, batch_pcos;
auto writeBatches = [&]{
std::vector<std::future<void>> writers;
writers.push_back(workers.push([&](int id) {
- CHECK_STATUS(db_pcos->Write(dbimpl::WriteOptions(), &batch_pcos));
+ CHECK_STATUS(db_pcos->Write(leveldb::WriteOptions(), &batch_pcos));
batch_pcos.Clear();
}));
writers.push_back(workers.push([&](int id) {
- CHECK_STATUS(db_opsc->Write(dbimpl::WriteOptions(), &batch_opsc));
+ CHECK_STATUS(db_opsc->Write(leveldb::WriteOptions(), &batch_opsc));
batch_opsc.Clear();
}));
writers.push_back(workers.push([&](int id) {
- CHECK_STATUS(db_cspo->Write(dbimpl::WriteOptions(), &batch_cspo));
+ CHECK_STATUS(db_cspo->Write(leveldb::WriteOptions(), &batch_cspo));
batch_cspo.Clear();
}));
writers.push_back(workers.push([&](int id) {
- CHECK_STATUS(db_spoc->Write(dbimpl::WriteOptions(), &batch_spoc));
+ CHECK_STATUS(db_spoc->Write(leveldb::WriteOptions(), &batch_spoc));
batch_spoc.Clear();
}));
@@ -473,7 +262,9 @@ int64_t LevelDBPersistence::AddStatements(StatementIterator& it) {
std::chrono::steady_clock::now() - start).count()
<< "ms).";
- return count;
+ service::proto::UpdateResponse result;
+ result.set_added_statements(count);
+ return result;
}
@@ -481,23 +272,23 @@ std::unique_ptr<LevelDBPersistence::StatementIterator> LevelDBPersistence::GetSt
const rdf::proto::Statement &pattern) {
DLOG(INFO) << "Get statements matching pattern " << pattern.DebugString();
- PatternQuery query(pattern);
+ Pattern query(pattern);
- dbimpl::DB* db;
+ leveldb::DB* db;
switch (query.Type()) {
- case PatternQuery::SPOC:
+ case IndexTypes::SPOC:
db = db_spoc.get();
DLOG(INFO) << "Query: Using index type SPOC";
break;
- case PatternQuery::CSPO:
+ case IndexTypes::CSPO:
db = db_cspo.get();
DLOG(INFO) << "Query: Using index type CSPO";
break;
- case PatternQuery::OPSC:
+ case IndexTypes::OPSC:
db = db_opsc.get();
DLOG(INFO) << "Query: Using index type OPSC";
break;
- case PatternQuery::PCOS:
+ case IndexTypes::PCOS:
db = db_pcos.get();
DLOG(INFO) << "Query: Using index type PCOS";
break;
@@ -507,12 +298,12 @@ std::unique_ptr<LevelDBPersistence::StatementIterator> LevelDBPersistence::GetSt
DLOG(INFO) << "Retrieving statements with filter.";
return util::make_unique<util::FilteringIterator<Statement>>(
new StatementRangeIterator(
- db->NewIterator(dbimpl::ReadOptions()), query.MinKey(), query.MaxKey()),
+ db->NewIterator(leveldb::ReadOptions()), query.MinKey(), query.MaxKey()),
[&pattern](const Statement& stmt) -> bool { return Matches(pattern, stmt); });
} else {
DLOG(INFO) << "Retrieving statements without filter.";
return util::make_unique<StatementRangeIterator>(
- db->NewIterator(dbimpl::ReadOptions()), query.MinKey(), query.MaxKey());
+ db->NewIterator(leveldb::ReadOptions()), query.MinKey(), query.MaxKey());
}
}
@@ -535,29 +326,28 @@ void LevelDBPersistence::GetStatements(
}
-int64_t LevelDBPersistence::RemoveStatements(const rdf::proto::Statement& pattern) {
+service::proto::UpdateResponse LevelDBPersistence::RemoveStatements(
+ const rdf::proto::Statement& pattern) {
auto start = std::chrono::steady_clock::now();
DLOG(INFO) << "Remove statements matching pattern " << pattern.DebugString();
- int64_t count = 0;
+ leveldb::WriteBatch batch_spoc, batch_cspo, batch_opsc, batch_pcos;
- Statement stmt;
- dbimpl::WriteBatch batch_spoc, batch_cspo, batch_opsc, batch_pcos;
-
- count = RemoveStatements(pattern, batch_spoc, batch_cspo, batch_opsc, batch_pcos);
+ int64_t count =
+ RemoveStatements(pattern, batch_spoc, batch_cspo, batch_opsc, batch_pcos);
std::vector<std::future<void>> writers;
writers.push_back(workers.push([&](int id) {
- CHECK_STATUS(db_pcos->Write(dbimpl::WriteOptions(), &batch_pcos));
+ CHECK_STATUS(db_pcos->Write(leveldb::WriteOptions(), &batch_pcos));
}));
writers.push_back(workers.push([&](int id) {
- CHECK_STATUS(db_opsc->Write(dbimpl::WriteOptions(), &batch_opsc));
+ CHECK_STATUS(db_opsc->Write(leveldb::WriteOptions(), &batch_opsc));
}));
writers.push_back(workers.push([&](int id) {
- CHECK_STATUS(db_cspo->Write(dbimpl::WriteOptions(), &batch_cspo));
+ CHECK_STATUS(db_cspo->Write(leveldb::WriteOptions(), &batch_cspo));
}));
writers.push_back(workers.push([&](int id) {
- CHECK_STATUS(db_spoc->Write(dbimpl::WriteOptions(), &batch_spoc));
+ CHECK_STATUS(db_spoc->Write(leveldb::WriteOptions(), &batch_spoc));
}));
for (auto& t : writers) {
@@ -569,39 +359,41 @@ int64_t LevelDBPersistence::RemoveStatements(const rdf::proto::Statement& patter
std::chrono::steady_clock::now() - start).count()
<< "ms).";
- return count;
+ service::proto::UpdateResponse result;
+ result.set_removed_statements(count);
+ return result;
}
-UpdateStatistics LevelDBPersistence::Update(LevelDBPersistence::UpdateIterator &it) {
+service::proto::UpdateResponse LevelDBPersistence::Update(LevelDBPersistence::UpdateIterator &it) {
auto start = std::chrono::steady_clock::now();
LOG(INFO) << "Starting batch update operation.";
- UpdateStatistics stats;
+ int64_t added_stmts = 0, removed_stmts = 0, added_ns = 0, removed_ns = 0;
WriteBatch b_spoc, b_cspo, b_opsc, b_pcos, b_prefix, b_url;
auto writeBatches = [&]{
std::vector<std::future<void>> writers;
writers.push_back(workers.push([&](int id) {
- CHECK_STATUS(db_pcos->Write(dbimpl::WriteOptions(), &b_pcos));
+ CHECK_STATUS(db_pcos->Write(leveldb::WriteOptions(), &b_pcos));
b_pcos.Clear();
}));
writers.push_back(workers.push([&](int id) {
- CHECK_STATUS(db_opsc->Write(dbimpl::WriteOptions(), &b_opsc));
+ CHECK_STATUS(db_opsc->Write(leveldb::WriteOptions(), &b_opsc));
b_opsc.Clear();
}));
writers.push_back(workers.push([&](int id) {
- CHECK_STATUS(db_cspo->Write(dbimpl::WriteOptions(), &b_cspo));
+ CHECK_STATUS(db_cspo->Write(leveldb::WriteOptions(), &b_cspo));
b_cspo.Clear();
}));
writers.push_back(workers.push([&](int id) {
- CHECK_STATUS(db_spoc->Write(dbimpl::WriteOptions(), &b_spoc));
+ CHECK_STATUS(db_spoc->Write(leveldb::WriteOptions(), &b_spoc));
b_spoc.Clear();
}));
writers.push_back(workers.push([&](int id) {
- CHECK_STATUS(db_ns_prefix->Write(dbimpl::WriteOptions(), &b_prefix));
+ CHECK_STATUS(db_ns_prefix->Write(leveldb::WriteOptions(), &b_prefix));
b_prefix.Clear();
}));
writers.push_back(workers.push([&](int id) {
- CHECK_STATUS(db_ns_url->Write(dbimpl::WriteOptions(), &b_url));
+ CHECK_STATUS(db_ns_url->Write(leveldb::WriteOptions(), &b_url));
b_url.Clear();
}));
@@ -615,15 +407,16 @@ UpdateStatistics LevelDBPersistence::Update(LevelDBPersistence::UpdateIterator &
auto next = it.next();
if (next.has_stmt_added()) {
AddStatement(next.stmt_added(), b_spoc, b_cspo, b_opsc, b_pcos);
- stats.added_stmts++;
+ added_stmts++;
} else if (next.has_stmt_removed()) {
- stats.removed_stmts +=
+ removed_stmts +=
RemoveStatements(next.stmt_removed(), b_spoc, b_cspo, b_opsc, b_pcos);
} else if(next.has_ns_added()) {
AddNamespace(next.ns_added(), b_prefix, b_url);
- stats.added_ns++;
+ added_ns++;
} else if(next.has_ns_removed()) {
RemoveNamespace(next.ns_removed(), b_prefix, b_url);
+ removed_ns++;
}
count++;
@@ -634,13 +427,18 @@ UpdateStatistics LevelDBPersistence::Update(LevelDBPersistence::UpdateIterator &
writeBatches();
- LOG(INFO) << "Batch update complete. (statements added: " << stats.added_stmts
- << ", statements removed: " << stats.removed_stmts
- << ", namespaces added: " << stats.added_ns
- << ", namespaces removed: " << stats.removed_ns
+ LOG(INFO) << "Batch update complete. (statements added: " << added_stmts
+ << ", statements removed: " << removed_stmts
+ << ", namespaces added: " << added_ns
+ << ", namespaces removed: " << removed_ns
<< ", time=" << std::chrono::duration <double, std::milli> (
std::chrono::steady_clock::now() - start).count() << "ms).";
+ service::proto::UpdateResponse stats;
+ stats.set_added_statements(added_stmts);
+ stats.set_removed_statements(removed_stmts);
+ stats.set_added_namespaces(added_ns);
+ stats.set_removed_namespaces(removed_ns);
return stats;
}
@@ -671,35 +469,27 @@ void LevelDBPersistence::AddStatement(
WriteBatch &spoc, WriteBatch &cspo, WriteBatch &opsc, WriteBatch &pcos) {
DLOG(INFO) << "Adding statement " << stmt.DebugString();
- std::string buffer, bufs, bufp, bufo, bufc;
+ Key key(stmt);
+ std::string buffer;
stmt.SerializeToString(&buffer);
- stmt.subject().SerializeToString(&bufs);
- stmt.predicate().SerializeToString(&bufp);
- stmt.object().SerializeToString(&bufo);
- stmt.context().SerializeToString(&bufc);
-
- char *k_spoc = (char *) calloc(4 * KEY_LENGTH, sizeof(char));
- computeKey(&bufs, &bufp, &bufo, &bufc, k_spoc);
- spoc.Put(dbimpl::Slice(k_spoc, 4 * KEY_LENGTH), buffer);
+ char *k_spoc = key.Create(IndexTypes::SPOC);
+ spoc.Put(leveldb::Slice(k_spoc, 4 * KEY_LENGTH), buffer);
- char *k_cspo = (char *) calloc(4 * KEY_LENGTH, sizeof(char));
- orderKey(k_cspo, k_spoc, C, S, P, O);
- cspo.Put(dbimpl::Slice(k_cspo, 4 * KEY_LENGTH), buffer);
+ char *k_cspo = key.Create(IndexTypes::CSPO);
+ cspo.Put(leveldb::Slice(k_cspo, 4 * KEY_LENGTH), buffer);
- char *k_opsc = (char *) calloc(4 * KEY_LENGTH, sizeof(char));
- orderKey(k_opsc, k_spoc, O, P, S, C);
- opsc.Put(dbimpl::Slice(k_opsc, 4 * KEY_LENGTH), buffer);
+ char *k_opsc = key.Create(IndexTypes::OPSC);
+ opsc.Put(leveldb::Slice(k_opsc, 4 * KEY_LENGTH), buffer);
- char *k_pcos = (char *) calloc(4 * KEY_LENGTH, sizeof(char));
- orderKey(k_pcos, k_spoc, P, C, O, S);
- pcos.Put(dbimpl::Slice(k_pcos, 4 * KEY_LENGTH), buffer);
+ char *k_pcos = key.Create(IndexTypes::PCOS);
+ pcos.Put(leveldb::Slice(k_pcos, 4 * KEY_LENGTH), buffer);
- free(k_spoc);
- free(k_cspo);
- free(k_opsc);
- free(k_pcos);
+ delete[] k_spoc;
+ delete[] k_cspo;
+ delete[] k_opsc;
+ delete[] k_pcos;
}
@@ -712,31 +502,24 @@ int64_t LevelDBPersistence::RemoveStatements(
std::string bufs, bufp, bufo, bufc;
GetStatements(pattern, [&](const Statement stmt) -> bool {
- stmt.subject().SerializeToString(&bufs);
- stmt.predicate().SerializeToString(&bufp);
- stmt.object().SerializeToString(&bufo);
- stmt.context().SerializeToString(&bufc);
+ Key key(stmt);
- char* k_spoc = (char*)calloc(4 * KEY_LENGTH, sizeof(char));
- computeKey(&bufs, &bufp, &bufo, &bufc, k_spoc);
- spoc.Delete(dbimpl::Slice(k_spoc, 4 * KEY_LENGTH));
+ char* k_spoc = key.Create(IndexTypes::SPOC);
+ spoc.Delete(leveldb::Slice(k_spoc, 4 * KEY_LENGTH));
- char* k_cspo = (char*)calloc(4 * KEY_LENGTH, sizeof(char));
- orderKey(k_cspo, k_spoc, C, S, P, O);
- cspo.Delete(dbimpl::Slice(k_cspo, 4 * KEY_LENGTH));
+ char* k_cspo = key.Create(IndexTypes::CSPO);
+ cspo.Delete(leveldb::Slice(k_cspo, 4 * KEY_LENGTH));
- char* k_opsc = (char*)calloc(4 * KEY_LENGTH, sizeof(char));
- orderKey(k_opsc, k_spoc, O, P, S, C);
- opsc.Delete(dbimpl::Slice(k_opsc, 4 * KEY_LENGTH));
+ char* k_opsc = key.Create(IndexTypes::OPSC);
+ opsc.Delete(leveldb::Slice(k_opsc, 4 * KEY_LENGTH));
- char* k_pcos = (char*)calloc(4 * KEY_LENGTH, sizeof(char));
- orderKey(k_pcos, k_spoc, P, C, O, S);
- pcos.Delete(dbimpl::Slice(k_pcos, 4 * KEY_LENGTH));
+ char* k_pcos = key.Create(IndexTypes::PCOS);
+ pcos.Delete(leveldb::Slice(k_pcos, 4 * KEY_LENGTH));
- free(k_spoc);
- free(k_cspo);
- free(k_opsc);
- free(k_pcos);
+ delete[] k_spoc;
+ delete[] k_cspo;
+ delete[] k_opsc;
+ delete[] k_pcos;
count++;
@@ -746,14 +529,14 @@ int64_t LevelDBPersistence::RemoveStatements(
return count;
}
-int KeyComparator::Compare(const dbimpl::Slice& a, const dbimpl::Slice& b) const {
+int KeyComparator::Compare(const leveldb::Slice& a, const leveldb::Slice& b) const {
return memcmp(a.data(), b.data(), 4 * KEY_LENGTH);
}
int64_t LevelDBPersistence::Size() {
int64_t count = 0;
- dbimpl::Iterator* it = db_cspo->NewIterator(dbimpl::ReadOptions());
+ leveldb::Iterator* it = db_cspo->NewIterator(leveldb::ReadOptions());
for (it->SeekToFirst(); it->Valid(); it->Next()) {
count++;
}
http://git-wip-us.apache.org/repos/asf/marmotta/blob/b8d122a1/libraries/ostrich/backend/persistence/leveldb_persistence.h
----------------------------------------------------------------------
diff --git a/libraries/ostrich/backend/persistence/leveldb_persistence.h b/libraries/ostrich/backend/persistence/leveldb_persistence.h
index eee80e4..fe72e9f 100644
--- a/libraries/ostrich/backend/persistence/leveldb_persistence.h
+++ b/libraries/ostrich/backend/persistence/leveldb_persistence.h
@@ -22,21 +22,12 @@
#include <string>
#include <functional>
-#ifdef HAVE_ROCKSDB
-#include <rocksdb/db.h>
-#include <rocksdb/cache.h>
-#include <rocksdb/comparator.h>
-
-namespace dbimpl = rocksdb;
-#else
#include <leveldb/db.h>
#include <leveldb/cache.h>
#include <leveldb/comparator.h>
-namespace dbimpl = leveldb;
-#endif
-
#include "model/rdf_model.h"
+#include "persistence/base_persistence.h"
#include "service/sail.pb.h"
#include "util/iterator.h"
#include "util/threadpool.h"
@@ -47,51 +38,34 @@ namespace persistence {
/**
* A custom comparator treating the bytes in the key as unsigned char.
*/
-class KeyComparator : public dbimpl::Comparator {
+class KeyComparator : public leveldb::Comparator {
public:
- int Compare(const dbimpl::Slice& a, const dbimpl::Slice& b) const override ;
+ int Compare(const leveldb::Slice& a, const leveldb::Slice& b) const override ;
const char* Name() const override { return "KeyComparator"; }
- void FindShortestSeparator(std::string*, const dbimpl::Slice&) const override { }
+ void FindShortestSeparator(std::string*, const leveldb::Slice&) const override { }
void FindShortSuccessor(std::string*) const override { }
};
-
-// Statistical data about updates.
-struct UpdateStatistics {
- UpdateStatistics()
- : added_stmts(0), removed_stmts(0), added_ns(0), removed_ns(0) {}
-
- int64_t added_stmts, removed_stmts, added_ns, removed_ns;
-};
-
/**
* Persistence implementation based on the LevelDB high performance database.
*/
-class LevelDBPersistence {
+class LevelDBPersistence : public Persistence {
public:
- typedef util::CloseableIterator<rdf::proto::Statement> StatementIterator;
- typedef util::CloseableIterator<rdf::proto::Namespace> NamespaceIterator;
- typedef util::CloseableIterator<service::proto::UpdateRequest> UpdateIterator;
-
- typedef std::function<bool(const rdf::proto::Statement&)> StatementHandler;
- typedef std::function<bool(const rdf::proto::Namespace&)> NamespaceHandler;
-
-
/**
* Initialise a new LevelDB database using the given path and cache size (bytes).
*/
LevelDBPersistence(const std::string& path, int64_t cacheSize);
/**
- * Add the namespaces in the iterator to the database.
- */
- int64_t AddNamespaces(NamespaceIterator& it);
+ * Add the namespaces in the iterator to the database.
+ */
+ service::proto::UpdateResponse AddNamespaces(NamespaceIterator& it) override;
/**
* Add the statements in the iterator to the database.
*/
- int64_t AddStatements(StatementIterator& it);
+ service::proto::UpdateResponse AddStatements(StatementIterator& it) override;
/**
* Get all statements matching the pattern (which may have some fields
@@ -107,7 +81,7 @@ class LevelDBPersistence {
* result.
*/
std::unique_ptr<StatementIterator>
- GetStatements(const rdf::proto::Statement& pattern);
+ GetStatements(const rdf::proto::Statement& pattern);
/**
* Get all namespaces matching the pattern (which may have some of all
@@ -123,59 +97,60 @@ class LevelDBPersistence {
* each result.
*/
std::unique_ptr<NamespaceIterator>
- GetNamespaces(const rdf::proto::Namespace &pattern);
+ GetNamespaces(const rdf::proto::Namespace &pattern);
/**
* Remove all statements matching the pattern (which may have some fields
* unset to indicate wildcards).
*/
- int64_t RemoveStatements(const rdf::proto::Statement& pattern);
+ service::proto::UpdateResponse RemoveStatements(
+ const rdf::proto::Statement& pattern) override;
/**
* Apply a batch of updates (mixed statement/namespace adds and removes).
* The updates are collected in LevelDB batches and written atomically to
* the database when iteration ends.
*/
- UpdateStatistics Update(UpdateIterator& it);
+ service::proto::UpdateResponse Update(
+ UpdateIterator& it) override;
/**
* Return the size of this database.
*/
- int64_t Size();
+ int64_t Size() override;
private:
ctpl::thread_pool workers;
std::unique_ptr<KeyComparator> comparator;
- std::shared_ptr<dbimpl::Cache> cache;
- std::unique_ptr<dbimpl::Options> options;
+ std::shared_ptr<leveldb::Cache> cache;
+ std::unique_ptr<leveldb::Options> options;
// We currently support efficient lookups by subject, context and object.
- std::unique_ptr<dbimpl::DB>
+ std::unique_ptr<leveldb::DB>
// Statement databases, indexed for query performance
db_spoc, db_cspo, db_opsc, db_pcos,
// Namespace databases
db_ns_prefix, db_ns_url,
// Triple store metadata.
db_meta;
-
/**
* Add the namespace to the given database batch operations.
*/
void AddNamespace(const rdf::proto::Namespace& ns,
- dbimpl::WriteBatch& ns_prefix, dbimpl::WriteBatch& ns_url);
+ leveldb::WriteBatch& ns_prefix, leveldb::WriteBatch& ns_url);
/**
* Add the namespace to the given database batch operations.
*/
void RemoveNamespace(const rdf::proto::Namespace& ns,
- dbimpl::WriteBatch& ns_prefix, dbimpl::WriteBatch& ns_url);
+ leveldb::WriteBatch& ns_prefix, leveldb::WriteBatch& ns_url);
/**
* Add the statement to the given database batch operations.
*/
void AddStatement(const rdf::proto::Statement& stmt,
- dbimpl::WriteBatch& spoc, dbimpl::WriteBatch& cspo,
- dbimpl::WriteBatch& opsc, dbimpl::WriteBatch&pcos);
+ leveldb::WriteBatch& spoc, leveldb::WriteBatch& cspo,
+ leveldb::WriteBatch& opsc, leveldb::WriteBatch&pcos);
/**
@@ -183,8 +158,8 @@ class LevelDBPersistence {
* unset to indicate wildcards) from the given database batch operations.
*/
int64_t RemoveStatements(const rdf::proto::Statement& pattern,
- dbimpl::WriteBatch& spoc, dbimpl::WriteBatch& cspo,
- dbimpl::WriteBatch& opsc, dbimpl::WriteBatch&pcos);
+ leveldb::WriteBatch& spoc, leveldb::WriteBatch& cspo,
+ leveldb::WriteBatch& opsc, leveldb::WriteBatch&pcos);
};
http://git-wip-us.apache.org/repos/asf/marmotta/blob/b8d122a1/libraries/ostrich/backend/persistence/leveldb_server.cc
----------------------------------------------------------------------
diff --git a/libraries/ostrich/backend/persistence/leveldb_server.cc b/libraries/ostrich/backend/persistence/leveldb_server.cc
index 737a38c..d1d9a95 100644
--- a/libraries/ostrich/backend/persistence/leveldb_server.cc
+++ b/libraries/ostrich/backend/persistence/leveldb_server.cc
@@ -22,6 +22,7 @@
#include <sys/stat.h>
#include <signal.h>
+#include "leveldb_persistence.h"
#include "leveldb_service.h"
using grpc::Status;
http://git-wip-us.apache.org/repos/asf/marmotta/blob/b8d122a1/libraries/ostrich/backend/persistence/leveldb_service.cc
----------------------------------------------------------------------
diff --git a/libraries/ostrich/backend/persistence/leveldb_service.cc b/libraries/ostrich/backend/persistence/leveldb_service.cc
index f2637bd..9f02607 100644
--- a/libraries/ostrich/backend/persistence/leveldb_service.cc
+++ b/libraries/ostrich/backend/persistence/leveldb_service.cc
@@ -38,6 +38,7 @@ using marmotta::rdf::proto::Statement;
using marmotta::rdf::proto::Namespace;
using marmotta::rdf::proto::Resource;
using marmotta::service::proto::ContextRequest;
+using marmotta::service::proto::UpdateResponse;
using marmotta::persistence::sparql::LevelDBTripleSource;
using marmotta::sparql::SparqlService;
using marmotta::sparql::TripleSource;
@@ -87,8 +88,8 @@ Status LevelDBService::AddNamespaces(
ServerContext* context, ServerReader<Namespace>* reader, Int64Value* result) {
auto it = NamespaceIterator(reader);
- int64_t count = persistence->AddNamespaces(it);
- result->set_value(count);
+ UpdateResponse stats = persistence->AddNamespaces(it);
+ result->set_value(stats.added_namespaces());
return Status::OK;
}
@@ -123,8 +124,8 @@ Status LevelDBService::AddStatements(
util::TimeLogger timeLogger("Adding statements");
auto it = StatementIterator(reader);
- int64_t count = persistence->AddStatements(it);
- result->set_value(count);
+ UpdateResponse stats = persistence->AddStatements(it);
+ result->set_value(stats.added_statements());
return Status::OK;
}
@@ -145,7 +146,7 @@ Status LevelDBService::RemoveStatements(
ServerContext* context, const Statement* pattern, Int64Value* result) {
util::TimeLogger timeLogger("Removing statements");
- int64_t count = persistence->RemoveStatements(*pattern);
+ int64_t count = persistence->RemoveStatements(*pattern).removed_statements();
result->set_value(count);
return Status::OK;
@@ -161,10 +162,10 @@ Status LevelDBService::Clear(
if (contexts->context_size() > 0) {
for (const Resource &r : contexts->context()) {
pattern.mutable_context()->CopyFrom(r);
- count += persistence->RemoveStatements(pattern);
+ count += persistence->RemoveStatements(pattern).removed_statements();
}
} else {
- count += persistence->RemoveStatements(pattern);
+ count += persistence->RemoveStatements(pattern).removed_statements();
}
result->set_value(count);
@@ -224,12 +225,7 @@ grpc::Status LevelDBService::Update(grpc::ServerContext *context,
util::TimeLogger timeLogger("Updating database");
auto it = UpdateIterator(reader);
- persistence::UpdateStatistics stats = persistence->Update(it);
-
- result->set_added_namespaces(stats.added_ns);
- result->set_removed_namespaces(stats.removed_ns);
- result->set_added_statements(stats.added_stmts);
- result->set_removed_statements(stats.removed_stmts);
+ *result = persistence->Update(it);
return Status::OK;
}
http://git-wip-us.apache.org/repos/asf/marmotta/blob/b8d122a1/libraries/ostrich/backend/persistence/leveldb_service.h
----------------------------------------------------------------------
diff --git a/libraries/ostrich/backend/persistence/leveldb_service.h b/libraries/ostrich/backend/persistence/leveldb_service.h
index 87b5b12..dc139e8 100644
--- a/libraries/ostrich/backend/persistence/leveldb_service.h
+++ b/libraries/ostrich/backend/persistence/leveldb_service.h
@@ -18,7 +18,7 @@
#ifndef MARMOTTA_SERVICE_H
#define MARMOTTA_SERVICE_H
-#include "leveldb_persistence.h"
+#include "base_persistence.h"
#include <grpc/grpc.h>
#include <grpc++/server.h>
@@ -49,7 +49,7 @@ class LevelDBService : public svc::SailService::Service {
* Construct a new SailService wrapper around the LevelDB persistence passed
* as argument. The service will not take ownership of the pointer.
*/
- LevelDBService(persistence::LevelDBPersistence* persistance) : persistence(persistance) { };
+ LevelDBService(persistence::Persistence* persistance) : persistence(persistance) { };
grpc::Status AddNamespaces(grpc::ServerContext* context,
grpc::ServerReader<rdf::proto::Namespace>* reader,
@@ -92,7 +92,7 @@ class LevelDBService : public svc::SailService::Service {
google::protobuf::Int64Value* result) override;
private:
- persistence::LevelDBPersistence* persistence;
+ persistence::Persistence* persistence;
};
@@ -105,7 +105,7 @@ class LevelDBSparqlService : public spq::SparqlService::Service {
* Construct a new SparqlService wrapper around the LevelDB persistence passed
* as argument. The service will not take ownership of the pointer.
*/
- LevelDBSparqlService(persistence::LevelDBPersistence* persistence) : persistence(persistence) { };
+ LevelDBSparqlService(persistence::Persistence* persistence) : persistence(persistence) { };
grpc::Status TupleQuery(grpc::ServerContext* context,
const spq::SparqlRequest* pattern,
@@ -119,7 +119,7 @@ class LevelDBSparqlService : public spq::SparqlService::Service {
const spq::SparqlRequest* pattern,
google::protobuf::BoolValue* result) override;
private:
- persistence::LevelDBPersistence* persistence;
+ persistence::Persistence* persistence;
};
}
http://git-wip-us.apache.org/repos/asf/marmotta/blob/b8d122a1/libraries/ostrich/backend/persistence/leveldb_sparql.h
----------------------------------------------------------------------
diff --git a/libraries/ostrich/backend/persistence/leveldb_sparql.h b/libraries/ostrich/backend/persistence/leveldb_sparql.h
index 7b103a4..0b2f3bc 100644
--- a/libraries/ostrich/backend/persistence/leveldb_sparql.h
+++ b/libraries/ostrich/backend/persistence/leveldb_sparql.h
@@ -19,7 +19,7 @@
#define MARMOTTA_SPARQL_H
#include "sparql/rasqal_adapter.h"
-#include "leveldb_persistence.h"
+#include "base_persistence.h"
namespace marmotta {
namespace persistence {
@@ -33,7 +33,7 @@ using std::experimental::optional;
class LevelDBTripleSource : public ::marmotta::sparql::TripleSource {
public:
- LevelDBTripleSource(LevelDBPersistence *persistence) : persistence(persistence) { }
+ LevelDBTripleSource(Persistence *persistence) : persistence(persistence) { }
bool HasStatement(
@@ -47,7 +47,7 @@ class LevelDBTripleSource : public ::marmotta::sparql::TripleSource {
private:
// A pointer to the persistence instance wrapped by this triple source.
- LevelDBPersistence* persistence;
+ Persistence* persistence;
};
http://git-wip-us.apache.org/repos/asf/marmotta/blob/b8d122a1/libraries/ostrich/backend/persistence/rocksdb_persistence.cc
----------------------------------------------------------------------
diff --git a/libraries/ostrich/backend/persistence/rocksdb_persistence.cc b/libraries/ostrich/backend/persistence/rocksdb_persistence.cc
new file mode 100644
index 0000000..4b5879d
--- /dev/null
+++ b/libraries/ostrich/backend/persistence/rocksdb_persistence.cc
@@ -0,0 +1,489 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#define KEY_LENGTH 16
+
+#include <chrono>
+#include <stdlib.h>
+#include <malloc.h>
+
+#include <gflags/gflags.h>
+#include <glog/logging.h>
+#include <rocksdb/filter_policy.h>
+#include <rocksdb/write_batch.h>
+#include <google/protobuf/wrappers.pb.h>
+#include <thread>
+#include <algorithm>
+
+#include "rocksdb_persistence.h"
+#include "model/rdf_operators.h"
+#include "util/murmur3.h"
+#include "util/unique.h"
+
+#define CHECK_STATUS(s) CHECK(s.ok()) << "Writing to database failed: " << s.ToString()
+
+DEFINE_int64(write_batch_size, 1000000,
+ "Maximum number of statements to write in a single batch to the database");
+
+
+constexpr char kSPOC[] = "spoc";
+constexpr char kCSPO[] = "cspo";
+constexpr char kOPSC[] = "opsc";
+constexpr char kPCOS[] = "pcos";
+constexpr char kNSPREFIX[] = "nsprefix";
+constexpr char kNSURI[] = "nsuri";
+constexpr char kMETA[] = "meta";
+
+using rocksdb::ColumnFamilyDescriptor;
+using rocksdb::ColumnFamilyHandle;
+using rocksdb::ColumnFamilyOptions;
+using rocksdb::WriteBatch;
+using rocksdb::Slice;
+using marmotta::rdf::proto::Statement;
+using marmotta::rdf::proto::Namespace;
+using marmotta::rdf::proto::Resource;
+
+namespace marmotta {
+namespace persistence {
+namespace {
+
+// Base iterator for wrapping a RocksDB iterators.
+template<typename T>
+using RocksDBIterator = DBIterator<T, rocksdb::Iterator>;
+
+// Iterator wrapping a RocksDB Statement iterator over a given key range.
+class StatementRangeIterator : public RocksDBIterator<Statement> {
+ public:
+ StatementRangeIterator(rocksdb::Iterator *it, char *loKey, char *hiKey)
+ : DBIterator(it), loKey(loKey), hiKey(hiKey) {
+ it->Seek(rocksdb::Slice(loKey, 4 * KEY_LENGTH));
+ }
+
+ ~StatementRangeIterator() override {
+ delete[] loKey;
+ delete[] hiKey;
+ };
+
+ bool hasNext() override {
+ return it->Valid() && it->key().compare(rocksdb::Slice(hiKey, 4 * KEY_LENGTH)) <= 0;
+ }
+
+ private:
+ char *loKey;
+ char *hiKey;
+};
+
+
+} // namespace
+
+
+/**
+ * Build database with default options.
+ */
+rocksdb::DB* buildDB(const std::string& path, const std::string& suffix, const rocksdb::Options& options) {
+ rocksdb::DB* db;
+ rocksdb::Status status = rocksdb::DB::Open(options, path + "/" + suffix + ".db", &db);
+ CHECK_STATUS(status);
+ return db;
+}
+
+rocksdb::Options* buildOptions(KeyComparator* cmp) {
+ rocksdb::Options *options = new rocksdb::Options();
+ options->create_if_missing = true;
+ options->create_missing_column_families = true;
+
+ options->IncreaseParallelism();
+ options->OptimizeLevelStyleCompaction();
+
+ // Custom comparator for our keys.
+ options->comparator = cmp;
+
+ // Write buffer size 16MB (fast bulk imports)
+ options->write_buffer_size = 16384 * 1024;
+
+ return options;
+}
+
+RocksDBPersistence::RocksDBPersistence(const std::string &path, int64_t cacheSize)
+ : workers_(8) {
+ rocksdb::Options options;
+ options.create_if_missing = true;
+ options.create_missing_column_families = true;
+
+ options.IncreaseParallelism();
+ options.OptimizeLevelStyleCompaction();
+
+ // Custom comparator for our keys.
+ options.comparator = &comparator_;
+
+ // Write buffer size 16MB (fast bulk imports)
+ options.write_buffer_size = 16384 * 1024;
+
+ ColumnFamilyOptions cfOptions;
+ cfOptions.OptimizeLevelStyleCompaction();
+
+ // Initialise column families.
+ std::vector<ColumnFamilyDescriptor> columnFamilies = {
+ ColumnFamilyDescriptor(kSPOC, cfOptions),
+ ColumnFamilyDescriptor(kCSPO, cfOptions),
+ ColumnFamilyDescriptor(kOPSC, cfOptions),
+ ColumnFamilyDescriptor(kPCOS, cfOptions),
+ ColumnFamilyDescriptor(kNSPREFIX, cfOptions),
+ ColumnFamilyDescriptor(kNSURI, cfOptions),
+ ColumnFamilyDescriptor(kMETA, cfOptions),
+ ColumnFamilyDescriptor("default", cfOptions),
+ };
+
+ rocksdb::DB* db;
+ rocksdb::Status status = rocksdb::DB::Open(options, path + "/data.db", columnFamilies, &handles_, &db);
+ CHECK_STATUS(status);
+ database_.reset(db);
+
+ LOG(INFO) << "RocksDB Database initialised.";
+}
+
+RocksDBPersistence::~RocksDBPersistence() {
+ std::for_each(handles_.begin(), handles_.end(), [](ColumnFamilyHandle* h) {
+ delete h;
+ });
+}
+
+
+service::proto::UpdateResponse RocksDBPersistence::AddNamespaces(NamespaceIterator& it) {
+ DLOG(INFO) << "Starting batch namespace import operation.";
+ int64_t count = 0;
+
+ rocksdb::WriteBatch batch;
+
+ while (it.hasNext()) {
+ AddNamespace(it.next(), batch);
+ count++;
+ }
+ CHECK_STATUS(database_->Write(rocksdb::WriteOptions(), &batch));
+
+ DLOG(INFO) << "Imported " << count << " namespaces";
+
+ service::proto::UpdateResponse result;
+ result.set_added_namespaces(count);
+ return result;
+}
+
+std::unique_ptr<RocksDBPersistence::NamespaceIterator> RocksDBPersistence::GetNamespaces(
+ const rdf::proto::Namespace &pattern) {
+ DLOG(INFO) << "Get namespaces matching pattern " << pattern.DebugString();
+
+ Namespace ns;
+
+ ColumnFamilyHandle *h = nullptr;
+ std::string key, value;
+ if (pattern.prefix() != "") {
+ key = pattern.prefix();
+ h = handles_[Handles::NSPREFIX];
+ } else if(pattern.uri() != "") {
+ key = pattern.uri();
+ h = handles_[Handles::NSURI];
+ }
+ if (h != nullptr) {
+ // Either prefix or uri given, report the correct namespace value.
+ rocksdb::Status s = database_->Get(rocksdb::ReadOptions(), h, key, &value);
+ if (s.ok()) {
+ ns.ParseFromString(value);
+ return util::make_unique<util::SingletonIterator<Namespace>>(std::move(ns));
+ } else {
+ return util::make_unique<util::EmptyIterator<Namespace>>();
+ }
+ } else {
+ // Pattern was empty, iterate over all namespaces and report them.
+ return util::make_unique<RocksDBIterator<Namespace>>(
+ database_->NewIterator(rocksdb::ReadOptions(), handles_[Handles::NSPREFIX]));
+ }
+}
+
+
+void RocksDBPersistence::GetNamespaces(
+ const Namespace &pattern, RocksDBPersistence::NamespaceHandler callback) {
+ int64_t count = 0;
+
+ bool cbsuccess = true;
+ for(auto it = GetNamespaces(pattern); cbsuccess && it->hasNext();) {
+ cbsuccess = callback(it->next());
+ count++;
+ }
+
+ DLOG(INFO) << "Get namespaces done (count=" << count <<")";
+}
+
+
+service::proto::UpdateResponse RocksDBPersistence::AddStatements(StatementIterator& it) {
+ auto start = std::chrono::steady_clock::now();
+ LOG(INFO) << "Starting batch statement import operation.";
+ int64_t count = 0;
+
+ rocksdb::WriteBatch batch;
+ while (it.hasNext()) {
+ AddStatement(it.next(), batch);
+ count++;
+
+ if (count % FLAGS_write_batch_size == 0) {
+ CHECK_STATUS(database_->Write(rocksdb::WriteOptions(), &batch));
+ batch.Clear();
+ }
+ }
+
+ CHECK_STATUS(database_->Write(rocksdb::WriteOptions(), &batch));
+
+ LOG(INFO) << "Imported " << count << " statements (time="
+ << std::chrono::duration <double, std::milli> (
+ std::chrono::steady_clock::now() - start).count()
+ << "ms).";
+
+ service::proto::UpdateResponse result;
+ result.set_added_statements(count);
+ return result;
+}
+
+
+std::unique_ptr<RocksDBPersistence::StatementIterator> RocksDBPersistence::GetStatements(
+ const rdf::proto::Statement &pattern) {
+ DLOG(INFO) << "Get statements matching pattern " << pattern.DebugString();
+
+ Pattern query(pattern);
+
+ ColumnFamilyHandle* h;
+ switch (query.Type()) {
+ case IndexTypes::SPOC:
+ h = handles_[Handles::ISPOC];
+ DLOG(INFO) << "Query: Using index type SPOC";
+ break;
+ case IndexTypes::CSPO:
+ h = handles_[Handles::ICSPO];
+ DLOG(INFO) << "Query: Using index type CSPO";
+ break;
+ case IndexTypes::OPSC:
+ h = handles_[Handles::IOPSC];
+ DLOG(INFO) << "Query: Using index type OPSC";
+ break;
+ case IndexTypes::PCOS:
+ h = handles_[Handles::IPCOS];
+ DLOG(INFO) << "Query: Using index type PCOS";
+ break;
+ };
+
+ if (query.NeedsFilter()) {
+ DLOG(INFO) << "Retrieving statements with filter.";
+ return util::make_unique<util::FilteringIterator<Statement>>(
+ new StatementRangeIterator(
+ database_->NewIterator(rocksdb::ReadOptions(), h), query.MinKey(), query.MaxKey()),
+ [&pattern](const Statement& stmt) -> bool { return Matches(pattern, stmt); });
+ } else {
+ DLOG(INFO) << "Retrieving statements without filter.";
+ return util::make_unique<StatementRangeIterator>(
+ database_->NewIterator(rocksdb::ReadOptions(), h), query.MinKey(), query.MaxKey());
+ }
+}
+
+
+void RocksDBPersistence::GetStatements(
+ const Statement& pattern, std::function<bool(const Statement&)> callback) {
+ auto start = std::chrono::steady_clock::now();
+ int64_t count = 0;
+
+ bool cbsuccess = true;
+ for(auto it = GetStatements(pattern); cbsuccess && it->hasNext(); ) {
+ cbsuccess = callback(it->next());
+ count++;
+ }
+
+ DLOG(INFO) << "Get statements done (count=" << count << ", time="
+ << std::chrono::duration <double, std::milli> (
+ std::chrono::steady_clock::now() - start).count()
+ << "ms).";
+}
+
+
+service::proto::UpdateResponse RocksDBPersistence::RemoveStatements(const rdf::proto::Statement& pattern) {
+ auto start = std::chrono::steady_clock::now();
+ DLOG(INFO) << "Remove statements matching pattern " << pattern.DebugString();
+
+ rocksdb::WriteBatch batch;
+
+ int64_t count = RemoveStatements(pattern, batch);
+ CHECK_STATUS(database_->Write(rocksdb::WriteOptions(), &batch));
+
+ DLOG(INFO) << "Removed " << count << " statements (time=" <<
+ std::chrono::duration <double, std::milli> (
+ std::chrono::steady_clock::now() - start).count()
+ << "ms).";
+
+ service::proto::UpdateResponse result;
+ result.set_removed_statements(count);
+ return result;
+}
+
+service::proto::UpdateResponse RocksDBPersistence::Update(RocksDBPersistence::UpdateIterator &it) {
+ auto start = std::chrono::steady_clock::now();
+ LOG(INFO) << "Starting batch update operation.";
+
+ WriteBatch batch;
+ int64_t added_stmts = 0, removed_stmts = 0, added_ns = 0, removed_ns = 0;
+
+ long count = 0;
+ while (it.hasNext()) {
+ auto next = it.next();
+ if (next.has_stmt_added()) {
+ AddStatement(next.stmt_added(), batch);
+ added_stmts++;
+ } else if (next.has_stmt_removed()) {
+ removed_stmts +=
+ RemoveStatements(next.stmt_removed(), batch);
+ } else if(next.has_ns_added()) {
+ AddNamespace(next.ns_added(), batch);
+ added_ns++;
+ } else if(next.has_ns_removed()) {
+ RemoveNamespace(next.ns_removed(), batch);
+ removed_ns++;
+ }
+
+ count++;
+ if (count % FLAGS_write_batch_size == 0) {
+ CHECK_STATUS(database_->Write(rocksdb::WriteOptions(), &batch));
+ batch.Clear();
+ }
+ }
+
+ CHECK_STATUS(database_->Write(rocksdb::WriteOptions(), &batch));
+ batch.Clear();
+
+ LOG(INFO) << "Batch update complete. (statements added: " << added_stmts
+ << ", statements removed: " << removed_stmts
+ << ", namespaces added: " << added_ns
+ << ", namespaces removed: " << removed_ns
+ << ", time=" << std::chrono::duration <double, std::milli> (
+ std::chrono::steady_clock::now() - start).count() << "ms).";
+
+ service::proto::UpdateResponse stats;
+ stats.set_added_statements(added_stmts);
+ stats.set_removed_statements(removed_stmts);
+ stats.set_added_namespaces(added_ns);
+ stats.set_removed_namespaces(removed_ns);
+ return stats;
+}
+
+void RocksDBPersistence::AddNamespace(
+ const Namespace &ns, WriteBatch &batch) {
+ DLOG(INFO) << "Adding namespace " << ns.DebugString();
+
+ std::string buffer;
+ ns.SerializeToString(&buffer);
+ batch.Put(handles_[Handles::NSPREFIX], ns.prefix(), buffer);
+ batch.Put(handles_[Handles::NSURI], ns.uri(), buffer);
+}
+
+void RocksDBPersistence::RemoveNamespace(
+ const Namespace &pattern, WriteBatch &batch) {
+ DLOG(INFO) << "Removing namespaces matching pattern " << pattern.DebugString();
+
+ GetNamespaces(pattern, [&batch, this](const rdf::proto::Namespace& ns) -> bool {
+ batch.Delete(handles_[Handles::NSPREFIX], ns.prefix());
+ batch.Delete(handles_[Handles::NSURI], ns.uri());
+ return true;
+ });
+}
+
+
+void RocksDBPersistence::AddStatement(
+ const Statement &stmt, WriteBatch &batch) {
+ DLOG(INFO) << "Adding statement " << stmt.DebugString();
+
+ Key key(stmt);
+
+ std::string buffer;
+ stmt.SerializeToString(&buffer);
+
+ char *k_spoc = key.Create(IndexTypes::SPOC);
+ batch.Put(handles_[Handles::ISPOC], rocksdb::Slice(k_spoc, 4 * KEY_LENGTH), buffer);
+
+ char *k_cspo = key.Create(IndexTypes::CSPO);
+ batch.Put(handles_[Handles::ICSPO], rocksdb::Slice(k_cspo, 4 * KEY_LENGTH), buffer);
+
+ char *k_opsc = key.Create(IndexTypes::OPSC);
+ batch.Put(handles_[Handles::IOPSC], rocksdb::Slice(k_opsc, 4 * KEY_LENGTH), buffer);
+
+ char *k_pcos = key.Create(IndexTypes::PCOS);
+ batch.Put(handles_[Handles::IPCOS], rocksdb::Slice(k_pcos, 4 * KEY_LENGTH), buffer);
+
+ delete[] k_spoc;
+ delete[] k_cspo;
+ delete[] k_opsc;
+ delete[] k_pcos;
+}
+
+
+int64_t RocksDBPersistence::RemoveStatements(
+ const Statement& pattern, WriteBatch& batch) {
+ DLOG(INFO) << "Removing statements matching " << pattern.DebugString();
+
+ int64_t count = 0;
+
+ GetStatements(pattern, [&](const Statement stmt) -> bool {
+ Key key(stmt);
+
+ char* k_spoc = key.Create(IndexTypes::SPOC);
+ batch.Delete(handles_[Handles::ISPOC], rocksdb::Slice(k_spoc, 4 * KEY_LENGTH));
+
+ char* k_cspo = key.Create(IndexTypes::CSPO);
+ batch.Delete(handles_[Handles::ICSPO], rocksdb::Slice(k_cspo, 4 * KEY_LENGTH));
+
+ char* k_opsc = key.Create(IndexTypes::OPSC);
+ batch.Delete(handles_[Handles::IOPSC], rocksdb::Slice(k_opsc, 4 * KEY_LENGTH));
+
+ char* k_pcos = key.Create(IndexTypes::PCOS);
+ batch.Delete(handles_[Handles::IPCOS], rocksdb::Slice(k_pcos, 4 * KEY_LENGTH));
+
+ delete[] k_spoc;
+ delete[] k_cspo;
+ delete[] k_opsc;
+ delete[] k_pcos;
+
+ count++;
+
+ return true;
+ });
+
+ return count;
+}
+
+int KeyComparator::Compare(const rocksdb::Slice& a, const rocksdb::Slice& b) const {
+ return memcmp(a.data(), b.data(), 4 * KEY_LENGTH);
+}
+
+
+int64_t RocksDBPersistence::Size() {
+ int64_t count = 0;
+ rocksdb::Iterator* it = database_->NewIterator(rocksdb::ReadOptions(), handles_[Handles::ISPOC]);
+ for (it->SeekToFirst(); it->Valid(); it->Next()) {
+ count++;
+ }
+
+ delete it;
+ return count;
+}
+
+} // namespace persistence
+} // namespace marmotta
+
+
http://git-wip-us.apache.org/repos/asf/marmotta/blob/b8d122a1/libraries/ostrich/backend/persistence/rocksdb_persistence.h
----------------------------------------------------------------------
diff --git a/libraries/ostrich/backend/persistence/rocksdb_persistence.h b/libraries/ostrich/backend/persistence/rocksdb_persistence.h
new file mode 100644
index 0000000..a04169b
--- /dev/null
+++ b/libraries/ostrich/backend/persistence/rocksdb_persistence.h
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef MARMOTTA_PERSISTENCE_H
+#define MARMOTTA_PERSISTENCE_H
+
+#include <memory>
+#include <string>
+#include <functional>
+
+#include <rocksdb/db.h>
+#include <rocksdb/cache.h>
+#include <rocksdb/comparator.h>
+
+#include "persistence/base_persistence.h"
+#include "util/threadpool.h"
+
+namespace marmotta {
+namespace persistence {
+
+/**
+ * A custom comparator treating the bytes in the key as unsigned char.
+ */
+class KeyComparator : public rocksdb::Comparator {
+ public:
+ int Compare(const rocksdb::Slice& a, const rocksdb::Slice& b) const override ;
+
+ const char* Name() const override { return "KeyComparator"; }
+ void FindShortestSeparator(std::string*, const rocksdb::Slice&) const override { }
+ void FindShortSuccessor(std::string*) const override { }
+};
+
+
+// Symbolic handle indices,
+enum Handles {
+ ISPOC = 0, ICSPO = 1, IOPSC = 2, IPCOS = 3, NSPREFIX = 4, NSURI = 5, META = 6
+};
+
+/**
+ * Persistence implementation based on the RocksDB high performance database.
+ */
+class RocksDBPersistence : public Persistence {
+ public:
+ /**
+ * Initialise a new LevelDB database using the given path and cache size (bytes).
+ */
+ RocksDBPersistence(const std::string& path, int64_t cacheSize);
+
+ ~RocksDBPersistence();
+
+ /**
+ * Add the namespaces in the iterator to the database.
+ */
+ service::proto::UpdateResponse AddNamespaces(NamespaceIterator& it) override;
+
+ /**
+ * Add the statements in the iterator to the database.
+ */
+ service::proto::UpdateResponse AddStatements(StatementIterator& it) override;
+
+ /**
+ * Get all statements matching the pattern (which may have some fields
+ * unset to indicate wildcards). Call the callback function for each
+ * result.
+ */
+ void GetStatements(const rdf::proto::Statement& pattern,
+ StatementHandler callback);
+
+ /**
+ * Get all statements matching the pattern (which may have some fields
+ * unset to indicate wildcards). Call the callback function for each
+ * result.
+ */
+ std::unique_ptr<StatementIterator>
+ GetStatements(const rdf::proto::Statement& pattern);
+
+ /**
+ * Get all namespaces matching the pattern (which may have some of all
+ * fields unset to indicate wildcards). Call the callback function for
+ * each result.
+ */
+ void GetNamespaces(const rdf::proto::Namespace &pattern,
+ NamespaceHandler callback);
+
+ /**
+ * Get all namespaces matching the pattern (which may have some of all
+ * fields unset to indicate wildcards). Call the callback function for
+ * each result.
+ */
+ std::unique_ptr<NamespaceIterator>
+ GetNamespaces(const rdf::proto::Namespace &pattern);
+
+ /**
+ * Remove all statements matching the pattern (which may have some fields
+ * unset to indicate wildcards).
+ */
+ service::proto::UpdateResponse RemoveStatements(
+ const rdf::proto::Statement& pattern) override;
+
+ /**
+ * Apply a batch of updates (mixed statement/namespace adds and removes).
+ * The updates are collected in LevelDB batches and written atomically to
+ * the database when iteration ends.
+ */
+ service::proto::UpdateResponse Update(
+ UpdateIterator& it) override;
+
+ /**
+ * Return the size of this database.
+ */
+ int64_t Size() override;
+ private:
+ ctpl::thread_pool workers_;
+
+ KeyComparator comparator_;
+ std::unique_ptr<rocksdb::DB> database_;
+
+ // Column Families for the different index access types.
+ std::vector<rocksdb::ColumnFamilyHandle*> handles_;
+
+ /**
+ * Add the namespace to the given database batch operations.
+ */
+ void AddNamespace(const rdf::proto::Namespace& ns, rocksdb::WriteBatch& batch);
+
+ /**
+ * Add the namespace to the given database batch operations.
+ */
+ void RemoveNamespace(const rdf::proto::Namespace& ns, rocksdb::WriteBatch& batch);
+
+ /**
+ * Add the statement to the given database batch operations.
+ */
+ void AddStatement(const rdf::proto::Statement& stmt, rocksdb::WriteBatch& batch);
+
+
+ /**
+ * Remove all statements matching the pattern (which may have some fields
+ * unset to indicate wildcards) from the given database batch operations.
+ */
+ int64_t RemoveStatements(const rdf::proto::Statement& pattern, rocksdb::WriteBatch& batch);
+};
+
+
+
+} // namespace persistence
+} // namespace marmotta
+
+#endif //MARMOTTA_PERSISTENCE_H
http://git-wip-us.apache.org/repos/asf/marmotta/blob/b8d122a1/libraries/ostrich/backend/persistence/rocksdb_server.cc
----------------------------------------------------------------------
diff --git a/libraries/ostrich/backend/persistence/rocksdb_server.cc b/libraries/ostrich/backend/persistence/rocksdb_server.cc
new file mode 100644
index 0000000..837242a
--- /dev/null
+++ b/libraries/ostrich/backend/persistence/rocksdb_server.cc
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+// Binary to start a persistence server implementing the sail.proto API.
+#include <gflags/gflags.h>
+#include <glog/logging.h>
+#include <sys/stat.h>
+#include <signal.h>
+
+#include "rocksdb_persistence.h"
+#include "leveldb_service.h"
+
+using grpc::Status;
+using grpc::Server;
+using grpc::ServerBuilder;
+
+
+DEFINE_string(host, "0.0.0.0", "Address/name of server to access.");
+DEFINE_string(port, "10000", "Port of server to access.");
+DEFINE_string(db, "/tmp/testdb", "Path to database. Will be created if non-existant.");
+DEFINE_int64(cache_size, 100 * 1048576, "Cache size used by the database (in bytes).");
+
+std::unique_ptr<Server> server;
+
+void stopServer(int signal) {
+ if (server.get() != nullptr) {
+ LOG(INFO) << "RocksDB Persistence Server shutting down";
+ server->Shutdown();
+ }
+}
+
+int main(int argc, char** argv) {
+ // Initialize Google's logging library.
+ google::InitGoogleLogging(argv[0]);
+ google::ParseCommandLineFlags(&argc, &argv, true);
+
+ mkdir(FLAGS_db.c_str(), 0700);
+ marmotta::persistence::RocksDBPersistence persistence(FLAGS_db, FLAGS_cache_size);
+
+ marmotta::service::LevelDBService sailService(&persistence);
+ marmotta::service::LevelDBSparqlService sparqlService(&persistence);
+
+ ServerBuilder builder;
+ builder.AddListeningPort(FLAGS_host + ":" + FLAGS_port, grpc::InsecureServerCredentials());
+ builder.RegisterService(&sailService);
+ builder.RegisterService(&sparqlService);
+ builder.SetMaxMessageSize(INT_MAX);
+
+ server = builder.BuildAndStart();
+ std::cout << "RocksDB Persistence Server listening on " << FLAGS_host << ":" << FLAGS_port << std::endl;
+
+ LOG(INFO) << "RocksDB Persistence Server listening on " << FLAGS_host << ":" << FLAGS_port;
+
+ signal(SIGINT, stopServer);
+ signal(SIGTERM, stopServer);
+
+ server->Wait();
+
+ return 0;
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/marmotta/blob/b8d122a1/libraries/ostrich/backend/test/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/libraries/ostrich/backend/test/CMakeLists.txt b/libraries/ostrich/backend/test/CMakeLists.txt
index 29491ba..c3beef9 100644
--- a/libraries/ostrich/backend/test/CMakeLists.txt
+++ b/libraries/ostrich/backend/test/CMakeLists.txt
@@ -12,9 +12,13 @@ target_link_libraries(model_tests gtest marmotta_model ${GLOG_LIBRARY})
add_executable(sparql_tests SparqlTest.cc main.cc)
target_link_libraries(sparql_tests gtest marmotta_model marmotta_sparql ${GLOG_LIBRARY})
+add_executable(leveldb_tests main.cc LevelDBTest.cc)
+target_link_libraries(leveldb_tests gtest marmotta_leveldb ${GLOG_LIBRARY} ${Boost_LIBRARIES})
+
add_executable(persistence_tests main.cc PersistenceTest.cc)
target_link_libraries(persistence_tests gtest marmotta_leveldb ${GLOG_LIBRARY} ${Boost_LIBRARIES})
add_test(NAME ModelTest COMMAND model_tests)
add_test(NAME SparqlTest COMMAND sparql_tests)
+add_test(NAME LevelDBTest COMMAND leveldb_tests)
add_test(NAME PersistenceTest COMMAND persistence_tests)
\ No newline at end of file