You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@marmotta.apache.org by ss...@apache.org on 2016/08/25 11:04:22 UTC

[1/8] marmotta git commit: support for RocksDB as alternative to LevelDB

Repository: marmotta
Updated Branches:
  refs/heads/develop bc09d2747 -> ed49359d8


support for RocksDB as alternative to LevelDB


Project: http://git-wip-us.apache.org/repos/asf/marmotta/repo
Commit: http://git-wip-us.apache.org/repos/asf/marmotta/commit/9aa5c2aa
Tree: http://git-wip-us.apache.org/repos/asf/marmotta/tree/9aa5c2aa
Diff: http://git-wip-us.apache.org/repos/asf/marmotta/diff/9aa5c2aa

Branch: refs/heads/develop
Commit: 9aa5c2aa560eae77b59bbdbfab3bc657bc0f79b4
Parents: 0fe6531
Author: Sebastian Schaffert <ss...@apache.org>
Authored: Mon Aug 22 01:05:57 2016 +0200
Committer: Sebastian Schaffert <ss...@apache.org>
Committed: Mon Aug 22 01:05:57 2016 +0200

----------------------------------------------------------------------
 libraries/ostrich/backend/CMakeLists.txt        |  10 ++
 .../ostrich/backend/cmake/FindRocksDB.cmake     |  18 +++
 .../ostrich/backend/persistence/CMakeLists.txt  |   2 +-
 .../backend/persistence/leveldb_persistence.cc  | 111 ++++++++++---------
 .../backend/persistence/leveldb_persistence.h   |  38 ++++---
 libraries/ostrich/backend/util/time_logger.cc   |   4 +-
 6 files changed, 114 insertions(+), 69 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/marmotta/blob/9aa5c2aa/libraries/ostrich/backend/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/libraries/ostrich/backend/CMakeLists.txt b/libraries/ostrich/backend/CMakeLists.txt
index 61156a5..50efd24 100644
--- a/libraries/ostrich/backend/CMakeLists.txt
+++ b/libraries/ostrich/backend/CMakeLists.txt
@@ -15,6 +15,7 @@ find_package (GRPC REQUIRED)
 find_package (LevelDB REQUIRED)
 find_package (GLog REQUIRED)
 find_package (Boost 1.54.0 COMPONENTS iostreams filesystem system)
+find_package (RocksDB)
 find_package (Tcmalloc)
 
 add_definitions(-DNDEBUG)
@@ -24,6 +25,15 @@ if (Boost_IOSTREAMS_FOUND)
     add_definitions(-DHAVE_IOSTREAMS)
 endif (Boost_IOSTREAMS_FOUND)
 
+if (RocksDB_FOUND)
+    message(STATUS "Enabling RocksDB support (RocksDB found)")
+    add_definitions(-DHAVE_ROCKSDB)
+    set(PERSISTENCE_LIBRARY ${RocksDB_LIBRARY})
+else (RocksDB_FOUND)
+    message(STATUS "Using standard LevelDB (RocksDB not found)")
+    set(PERSISTENCE_LIBRARY ${LevelDB_LIBRARY})
+endif (RocksDB_FOUND)
+
 if (Tcmalloc_FOUND)
     message(STATUS "Enabling profiling support (Tcmalloc found)")
 endif (Tcmalloc_FOUND)

http://git-wip-us.apache.org/repos/asf/marmotta/blob/9aa5c2aa/libraries/ostrich/backend/cmake/FindRocksDB.cmake
----------------------------------------------------------------------
diff --git a/libraries/ostrich/backend/cmake/FindRocksDB.cmake b/libraries/ostrich/backend/cmake/FindRocksDB.cmake
new file mode 100644
index 0000000..63eb049
--- /dev/null
+++ b/libraries/ostrich/backend/cmake/FindRocksDB.cmake
@@ -0,0 +1,18 @@
+# Find libleveldb.a - key/value storage system
+
+find_path(RocksDB_INCLUDE_PATH NAMES rocksdb/db.h)
+find_library(RocksDB_LIBRARY NAMES rocksdb)
+
+if(RocksDB_INCLUDE_PATH AND RocksDB_LIBRARY)
+    set(RocksDB_FOUND TRUE)
+endif(RocksDB_INCLUDE_PATH AND RocksDB_LIBRARY)
+
+if(RocksDB_FOUND)
+    if(NOT RocksDB_FIND_QUIETLY)
+        message(STATUS "Found RocksDB: ${RocksDB_LIBRARY}; includes - ${RocksDB_INCLUDE_PATH}")
+    endif(NOT RocksDB_FIND_QUIETLY)
+else(RocksDB_FOUND)
+    if(RocksDB_FIND_REQUIRED)
+        message(FATAL_ERROR "Could not find rocksdb library.")
+    endif(RocksDB_FIND_REQUIRED)
+endif(RocksDB_FOUND)
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/marmotta/blob/9aa5c2aa/libraries/ostrich/backend/persistence/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/libraries/ostrich/backend/persistence/CMakeLists.txt b/libraries/ostrich/backend/persistence/CMakeLists.txt
index 8392c61..b6ec1b5 100644
--- a/libraries/ostrich/backend/persistence/CMakeLists.txt
+++ b/libraries/ostrich/backend/persistence/CMakeLists.txt
@@ -5,7 +5,7 @@ add_library(marmotta_leveldb
         leveldb_persistence.cc leveldb_persistence.h leveldb_sparql.cc leveldb_sparql.h)
 target_link_libraries(marmotta_leveldb
         marmotta_model marmotta_util marmotta_sparql marmotta_service
-        ${LevelDB_LIBRARY} ${GLOG_LIBRARY} ${PROTOBUF_LIBRARIES})
+        ${PERSISTENCE_LIBRARY} ${GLOG_LIBRARY} ${PROTOBUF_LIBRARIES})
 
 # Server binary
 add_executable(marmotta_persistence

http://git-wip-us.apache.org/repos/asf/marmotta/blob/9aa5c2aa/libraries/ostrich/backend/persistence/leveldb_persistence.cc
----------------------------------------------------------------------
diff --git a/libraries/ostrich/backend/persistence/leveldb_persistence.cc b/libraries/ostrich/backend/persistence/leveldb_persistence.cc
index 0a29841..fbb1cb2 100644
--- a/libraries/ostrich/backend/persistence/leveldb_persistence.cc
+++ b/libraries/ostrich/backend/persistence/leveldb_persistence.cc
@@ -18,6 +18,8 @@
 #define KEY_LENGTH 16
 
 #include <chrono>
+#include <stdlib.h>
+#include <malloc.h>
 
 #include <gflags/gflags.h>
 #include <glog/logging.h>
@@ -33,11 +35,11 @@
 
 #define CHECK_STATUS(s) CHECK(s.ok()) << "Writing to database failed: " << s.ToString()
 
-DEFINE_int64(write_batch_size, 10000,
+DEFINE_int64(write_batch_size, 1000000,
              "Maximum number of statements to write in a single batch to the database");
 
-using leveldb::WriteBatch;
-using leveldb::Slice;
+using dbimpl::WriteBatch;
+using dbimpl::Slice;
 using marmotta::rdf::proto::Statement;
 using marmotta::rdf::proto::Namespace;
 using marmotta::rdf::proto::Resource;
@@ -211,7 +213,7 @@ template<typename T>
 class LevelDBIterator : public util::CloseableIterator<T> {
  public:
 
-    LevelDBIterator(leveldb::Iterator *it)
+    LevelDBIterator(dbimpl::Iterator *it)
         : it(it) {
         it->SeekToFirst();
     }
@@ -236,7 +238,7 @@ class LevelDBIterator : public util::CloseableIterator<T> {
     }
 
  protected:
-    leveldb::Iterator* it;
+    dbimpl::Iterator* it;
     T proto;
 };
 
@@ -246,9 +248,9 @@ class LevelDBIterator : public util::CloseableIterator<T> {
 class StatementRangeIterator : public LevelDBIterator<Statement> {
  public:
 
-    StatementRangeIterator(leveldb::Iterator *it, char *loKey, char *hiKey)
+    StatementRangeIterator(dbimpl::Iterator *it, char *loKey, char *hiKey)
             : LevelDBIterator(it), loKey(loKey), hiKey(hiKey) {
-        it->Seek(leveldb::Slice(loKey, 4 * KEY_LENGTH));
+        it->Seek(dbimpl::Slice(loKey, 4 * KEY_LENGTH));
     }
 
     ~StatementRangeIterator() override {
@@ -257,7 +259,7 @@ class StatementRangeIterator : public LevelDBIterator<Statement> {
     };
 
     bool hasNext() override {
-        return it->Valid() && it->key().compare(leveldb::Slice(hiKey, 4 * KEY_LENGTH)) <= 0;
+        return it->Valid() && it->key().compare(dbimpl::Slice(hiKey, 4 * KEY_LENGTH)) <= 0;
     }
 
  private:
@@ -288,17 +290,21 @@ bool Matches(const Statement& pattern, const Statement& stmt) {
 /**
  * Build database with default options.
  */
-leveldb::DB* buildDB(const std::string& path, const std::string& suffix, const leveldb::Options& options) {
-    leveldb::DB* db;
-    leveldb::Status status = leveldb::DB::Open(options, path + "/" + suffix + ".db", &db);
+dbimpl::DB* buildDB(const std::string& path, const std::string& suffix, const dbimpl::Options& options) {
+    dbimpl::DB* db;
+    dbimpl::Status status = dbimpl::DB::Open(options, path + "/" + suffix + ".db", &db);
     assert(status.ok());
     return db;
 }
 
-leveldb::Options* buildOptions(KeyComparator* cmp, leveldb::Cache* cache) {
-    leveldb::Options *options = new leveldb::Options();
+dbimpl::Options* buildOptions(KeyComparator* cmp, dbimpl::Cache* cache) {
+    dbimpl::Options *options = new dbimpl::Options();
     options->create_if_missing = true;
 
+#ifdef HAVE_ROCKSDB
+    options->IncreaseParallelism();
+    options->OptimizeLevelStyleCompaction();
+#else
     // Custom comparator for our keys.
     options->comparator = cmp;
 
@@ -309,19 +315,20 @@ leveldb::Options* buildOptions(KeyComparator* cmp, leveldb::Cache* cache) {
     options->write_buffer_size = 16384 * 1024;
 
     // Set a bloom filter of 10 bits.
-    options->filter_policy = leveldb::NewBloomFilterPolicy(10);
+    options->filter_policy = dbimpl::NewBloomFilterPolicy(10);
+#endif
     return options;
 }
 
-leveldb::Options buildNsOptions() {
-    leveldb::Options options;
+dbimpl::Options buildNsOptions() {
+    dbimpl::Options options;
     options.create_if_missing = true;
     return options;
 }
 
 LevelDBPersistence::LevelDBPersistence(const std::string &path, int64_t cacheSize)
         : comparator(new KeyComparator())
-        , cache(leveldb::NewLRUCache(cacheSize))
+        , cache(dbimpl::NewLRUCache(cacheSize))
         , options(buildOptions(comparator.get(), cache.get()))
         , db_ns_prefix(buildDB(path, "ns_prefix", buildNsOptions()))
         , db_ns_url(buildDB(path, "ns_url", buildNsOptions()))
@@ -360,14 +367,14 @@ int64_t LevelDBPersistence::AddNamespaces(NamespaceIterator& it) {
     DLOG(INFO) << "Starting batch namespace import operation.";
     int64_t count = 0;
 
-    leveldb::WriteBatch batch_prefix, batch_url;
+    dbimpl::WriteBatch batch_prefix, batch_url;
 
     while (it.hasNext()) {
         AddNamespace(it.next(), batch_prefix, batch_url);
         count++;
     }
-    CHECK_STATUS(db_ns_prefix->Write(leveldb::WriteOptions(), &batch_prefix));
-    CHECK_STATUS(db_ns_url->Write(leveldb::WriteOptions(), &batch_url));
+    CHECK_STATUS(db_ns_prefix->Write(dbimpl::WriteOptions(), &batch_prefix));
+    CHECK_STATUS(db_ns_url->Write(dbimpl::WriteOptions(), &batch_url));
 
     DLOG(INFO) << "Imported " << count << " namespaces";
 
@@ -380,7 +387,7 @@ std::unique_ptr<LevelDBPersistence::NamespaceIterator> LevelDBPersistence::GetNa
 
     Namespace ns;
 
-    leveldb::DB *db = nullptr;
+    dbimpl::DB *db = nullptr;
     std::string key, value;
     if (pattern.prefix() != "") {
         key = pattern.prefix();
@@ -391,7 +398,7 @@ std::unique_ptr<LevelDBPersistence::NamespaceIterator> LevelDBPersistence::GetNa
     }
     if (db != nullptr) {
         // Either prefix or uri given, report the correct namespace value.
-        leveldb::Status s = db->Get(leveldb::ReadOptions(), key, &value);
+        dbimpl::Status s = db->Get(dbimpl::ReadOptions(), key, &value);
         if (s.ok()) {
             ns.ParseFromString(value);
             return util::make_unique<util::SingletonIterator<Namespace>>(std::move(ns));
@@ -401,7 +408,7 @@ std::unique_ptr<LevelDBPersistence::NamespaceIterator> LevelDBPersistence::GetNa
     } else {
         // Pattern was empty, iterate over all namespaces and report them.
         return util::make_unique<LevelDBIterator<Namespace>>(
-                db_ns_prefix->NewIterator(leveldb::ReadOptions()));
+                db_ns_prefix->NewIterator(dbimpl::ReadOptions()));
     }
 }
 
@@ -425,23 +432,23 @@ int64_t LevelDBPersistence::AddStatements(StatementIterator& it) {
     LOG(INFO) << "Starting batch statement import operation.";
     int64_t count = 0;
 
-    leveldb::WriteBatch batch_spoc, batch_cspo, batch_opsc, batch_pcos;
+    dbimpl::WriteBatch batch_spoc, batch_cspo, batch_opsc, batch_pcos;
     auto writeBatches = [&]{
         std::vector<std::thread> writers;
         writers.push_back(std::thread([&]() {
-            CHECK_STATUS(db_pcos->Write(leveldb::WriteOptions(), &batch_pcos));
+            CHECK_STATUS(db_pcos->Write(dbimpl::WriteOptions(), &batch_pcos));
             batch_pcos.Clear();
         }));
         writers.push_back(std::thread([&]() {
-            CHECK_STATUS(db_opsc->Write(leveldb::WriteOptions(), &batch_opsc));
+            CHECK_STATUS(db_opsc->Write(dbimpl::WriteOptions(), &batch_opsc));
             batch_opsc.Clear();
         }));
         writers.push_back(std::thread([&]() {
-            CHECK_STATUS(db_cspo->Write(leveldb::WriteOptions(), &batch_cspo));
+            CHECK_STATUS(db_cspo->Write(dbimpl::WriteOptions(), &batch_cspo));
             batch_cspo.Clear();
         }));
         writers.push_back(std::thread([&]() {
-            CHECK_STATUS(db_spoc->Write(leveldb::WriteOptions(), &batch_spoc));
+            CHECK_STATUS(db_spoc->Write(dbimpl::WriteOptions(), &batch_spoc));
             batch_spoc.Clear();
         }));
 
@@ -476,7 +483,7 @@ std::unique_ptr<LevelDBPersistence::StatementIterator> LevelDBPersistence::GetSt
 
     PatternQuery query(pattern);
 
-    leveldb::DB* db;
+    dbimpl::DB* db;
     switch (query.Type()) {
         case PatternQuery::SPOC:
             db = db_spoc.get();
@@ -500,12 +507,12 @@ std::unique_ptr<LevelDBPersistence::StatementIterator> LevelDBPersistence::GetSt
         DLOG(INFO) << "Retrieving statements with filter.";
         return util::make_unique<util::FilteringIterator<Statement>>(
                 new StatementRangeIterator(
-                        db->NewIterator(leveldb::ReadOptions()), query.MinKey(), query.MaxKey()),
+                        db->NewIterator(dbimpl::ReadOptions()), query.MinKey(), query.MaxKey()),
                 [&pattern](const Statement& stmt) -> bool { return Matches(pattern, stmt); });
     } else {
         DLOG(INFO) << "Retrieving statements without filter.";
         return util::make_unique<StatementRangeIterator>(
-                db->NewIterator(leveldb::ReadOptions()), query.MinKey(), query.MaxKey());
+                db->NewIterator(dbimpl::ReadOptions()), query.MinKey(), query.MaxKey());
     }
 }
 
@@ -535,22 +542,22 @@ int64_t LevelDBPersistence::RemoveStatements(const rdf::proto::Statement& patter
     int64_t count = 0;
 
     Statement stmt;
-    leveldb::WriteBatch batch_spoc, batch_cspo, batch_opsc, batch_pcos;
+    dbimpl::WriteBatch batch_spoc, batch_cspo, batch_opsc, batch_pcos;
 
     count = RemoveStatements(pattern, batch_spoc, batch_cspo, batch_opsc, batch_pcos);
 
     std::vector<std::thread> writers;
     writers.push_back(std::thread([&]() {
-        CHECK_STATUS(db_pcos->Write(leveldb::WriteOptions(), &batch_pcos));
+        CHECK_STATUS(db_pcos->Write(dbimpl::WriteOptions(), &batch_pcos));
     }));
     writers.push_back(std::thread([&]() {
-        CHECK_STATUS(db_opsc->Write(leveldb::WriteOptions(), &batch_opsc));
+        CHECK_STATUS(db_opsc->Write(dbimpl::WriteOptions(), &batch_opsc));
     }));
     writers.push_back(std::thread([&]() {
-        CHECK_STATUS(db_cspo->Write(leveldb::WriteOptions(), &batch_cspo));
+        CHECK_STATUS(db_cspo->Write(dbimpl::WriteOptions(), &batch_cspo));
     }));
     writers.push_back(std::thread([&]() {
-        CHECK_STATUS(db_spoc->Write(leveldb::WriteOptions(), &batch_spoc));
+        CHECK_STATUS(db_spoc->Write(dbimpl::WriteOptions(), &batch_spoc));
     }));
 
     for (auto& t : writers) {
@@ -574,27 +581,27 @@ UpdateStatistics LevelDBPersistence::Update(LevelDBPersistence::UpdateIterator &
     auto writeBatches = [&]{
         std::vector<std::thread> writers;
         writers.push_back(std::thread([&]() {
-            CHECK_STATUS(db_pcos->Write(leveldb::WriteOptions(), &b_pcos));
+            CHECK_STATUS(db_pcos->Write(dbimpl::WriteOptions(), &b_pcos));
             b_pcos.Clear();
         }));
         writers.push_back(std::thread([&]() {
-            CHECK_STATUS(db_opsc->Write(leveldb::WriteOptions(), &b_opsc));
+            CHECK_STATUS(db_opsc->Write(dbimpl::WriteOptions(), &b_opsc));
             b_opsc.Clear();
         }));
         writers.push_back(std::thread([&]() {
-            CHECK_STATUS(db_cspo->Write(leveldb::WriteOptions(), &b_cspo));
+            CHECK_STATUS(db_cspo->Write(dbimpl::WriteOptions(), &b_cspo));
             b_cspo.Clear();
         }));
         writers.push_back(std::thread([&]() {
-            CHECK_STATUS(db_spoc->Write(leveldb::WriteOptions(), &b_spoc));
+            CHECK_STATUS(db_spoc->Write(dbimpl::WriteOptions(), &b_spoc));
             b_spoc.Clear();
         }));
         writers.push_back(std::thread([&]() {
-            CHECK_STATUS(db_ns_prefix->Write(leveldb::WriteOptions(), &b_prefix));
+            CHECK_STATUS(db_ns_prefix->Write(dbimpl::WriteOptions(), &b_prefix));
             b_prefix.Clear();
         }));
         writers.push_back(std::thread([&]() {
-            CHECK_STATUS(db_ns_url->Write(leveldb::WriteOptions(), &b_url));
+            CHECK_STATUS(db_ns_url->Write(dbimpl::WriteOptions(), &b_url));
             b_url.Clear();
         }));
 
@@ -675,19 +682,19 @@ void LevelDBPersistence::AddStatement(
 
     char *k_spoc = (char *) calloc(4 * KEY_LENGTH, sizeof(char));
     computeKey(&bufs, &bufp, &bufo, &bufc, k_spoc);
-    spoc.Put(leveldb::Slice(k_spoc, 4 * KEY_LENGTH), buffer);
+    spoc.Put(dbimpl::Slice(k_spoc, 4 * KEY_LENGTH), buffer);
 
     char *k_cspo = (char *) calloc(4 * KEY_LENGTH, sizeof(char));
     orderKey(k_cspo, k_spoc, C, S, P, O);
-    cspo.Put(leveldb::Slice(k_cspo, 4 * KEY_LENGTH), buffer);
+    cspo.Put(dbimpl::Slice(k_cspo, 4 * KEY_LENGTH), buffer);
 
     char *k_opsc = (char *) calloc(4 * KEY_LENGTH, sizeof(char));
     orderKey(k_opsc, k_spoc, O, P, S, C);
-    opsc.Put(leveldb::Slice(k_opsc, 4 * KEY_LENGTH), buffer);
+    opsc.Put(dbimpl::Slice(k_opsc, 4 * KEY_LENGTH), buffer);
 
     char *k_pcos = (char *) calloc(4 * KEY_LENGTH, sizeof(char));
     orderKey(k_pcos, k_spoc, P, C, O, S);
-    pcos.Put(leveldb::Slice(k_pcos, 4 * KEY_LENGTH), buffer);
+    pcos.Put(dbimpl::Slice(k_pcos, 4 * KEY_LENGTH), buffer);
 
     free(k_spoc);
     free(k_cspo);
@@ -712,19 +719,19 @@ int64_t LevelDBPersistence::RemoveStatements(
 
         char* k_spoc = (char*)calloc(4 * KEY_LENGTH, sizeof(char));
         computeKey(&bufs, &bufp, &bufo, &bufc, k_spoc);
-        spoc.Delete(leveldb::Slice(k_spoc, 4 * KEY_LENGTH));
+        spoc.Delete(dbimpl::Slice(k_spoc, 4 * KEY_LENGTH));
 
         char* k_cspo = (char*)calloc(4 * KEY_LENGTH, sizeof(char));
         orderKey(k_cspo, k_spoc, C, S, P, O);
-        cspo.Delete(leveldb::Slice(k_cspo, 4 * KEY_LENGTH));
+        cspo.Delete(dbimpl::Slice(k_cspo, 4 * KEY_LENGTH));
 
         char* k_opsc = (char*)calloc(4 * KEY_LENGTH, sizeof(char));
         orderKey(k_opsc, k_spoc, O, P, S, C);
-        opsc.Delete(leveldb::Slice(k_opsc, 4 * KEY_LENGTH));
+        opsc.Delete(dbimpl::Slice(k_opsc, 4 * KEY_LENGTH));
 
         char* k_pcos = (char*)calloc(4 * KEY_LENGTH, sizeof(char));
         orderKey(k_pcos, k_spoc, P, C, O, S);
-        pcos.Delete(leveldb::Slice(k_pcos, 4 * KEY_LENGTH));
+        pcos.Delete(dbimpl::Slice(k_pcos, 4 * KEY_LENGTH));
 
         free(k_spoc);
         free(k_cspo);
@@ -739,14 +746,14 @@ int64_t LevelDBPersistence::RemoveStatements(
     return count;
 }
 
-int KeyComparator::Compare(const leveldb::Slice& a, const leveldb::Slice& b) const {
+int KeyComparator::Compare(const dbimpl::Slice& a, const dbimpl::Slice& b) const {
     return memcmp(a.data(), b.data(), 4 * KEY_LENGTH);
 }
 
 
 int64_t LevelDBPersistence::Size() {
     int64_t count = 0;
-    leveldb::Iterator* it = db_cspo->NewIterator(leveldb::ReadOptions());
+    dbimpl::Iterator* it = db_cspo->NewIterator(dbimpl::ReadOptions());
     for (it->SeekToFirst(); it->Valid(); it->Next()) {
         count++;
     }

http://git-wip-us.apache.org/repos/asf/marmotta/blob/9aa5c2aa/libraries/ostrich/backend/persistence/leveldb_persistence.h
----------------------------------------------------------------------
diff --git a/libraries/ostrich/backend/persistence/leveldb_persistence.h b/libraries/ostrich/backend/persistence/leveldb_persistence.h
index 7a3da17..9fd1924 100644
--- a/libraries/ostrich/backend/persistence/leveldb_persistence.h
+++ b/libraries/ostrich/backend/persistence/leveldb_persistence.h
@@ -22,10 +22,20 @@
 #include <string>
 #include <functional>
 
+#ifdef HAVE_ROCKSDB
+#include <rocksdb/db.h>
+#include <rocksdb/cache.h>
+#include <rocksdb/comparator.h>
+
+namespace dbimpl = rocksdb;
+#else
 #include <leveldb/db.h>
 #include <leveldb/cache.h>
 #include <leveldb/comparator.h>
 
+namespace dbimpl = leveldb;
+#endif
+
 #include "model/rdf_model.h"
 #include "service/sail.pb.h"
 #include "util/iterator.h"
@@ -36,13 +46,13 @@ namespace persistence {
 /**
  * A custom comparator treating the bytes in the key as unsigned char.
  */
-class KeyComparator : public leveldb::Comparator {
+class KeyComparator : public dbimpl::Comparator {
  public:
-    int Compare(const leveldb::Slice& a, const leveldb::Slice& b) const;
+    int Compare(const dbimpl::Slice& a, const dbimpl::Slice& b) const override ;
 
-    const char* Name() const { return "KeyComparator"; }
-    void FindShortestSeparator(std::string*, const leveldb::Slice&) const { }
-    void FindShortSuccessor(std::string*) const { }
+    const char* Name() const override { return "KeyComparator"; }
+    void FindShortestSeparator(std::string*, const dbimpl::Slice&) const override { }
+    void FindShortSuccessor(std::string*) const override { }
 };
 
 
@@ -134,11 +144,11 @@ class LevelDBPersistence {
  private:
 
     std::unique_ptr<KeyComparator> comparator;
-    std::unique_ptr<leveldb::Cache> cache;
-    std::unique_ptr<leveldb::Options> options;
+    std::shared_ptr<dbimpl::Cache> cache;
+    std::unique_ptr<dbimpl::Options> options;
 
     // We currently support efficient lookups by subject, context and object.
-    std::unique_ptr<leveldb::DB>
+    std::unique_ptr<dbimpl::DB>
             // Statement databases, indexed for query performance
             db_spoc, db_cspo, db_opsc, db_pcos,
             // Namespace databases
@@ -150,20 +160,20 @@ class LevelDBPersistence {
      * Add the namespace to the given database batch operations.
      */
     void AddNamespace(const rdf::proto::Namespace& ns,
-                      leveldb::WriteBatch& ns_prefix, leveldb::WriteBatch& ns_url);
+                      dbimpl::WriteBatch& ns_prefix, dbimpl::WriteBatch& ns_url);
 
     /**
      * Add the namespace to the given database batch operations.
      */
     void RemoveNamespace(const rdf::proto::Namespace& ns,
-                         leveldb::WriteBatch& ns_prefix, leveldb::WriteBatch& ns_url);
+                         dbimpl::WriteBatch& ns_prefix, dbimpl::WriteBatch& ns_url);
 
     /**
      * Add the statement to the given database batch operations.
      */
     void AddStatement(const rdf::proto::Statement& stmt,
-                      leveldb::WriteBatch& spoc, leveldb::WriteBatch& cspo,
-                      leveldb::WriteBatch& opsc, leveldb::WriteBatch&pcos);
+                      dbimpl::WriteBatch& spoc, dbimpl::WriteBatch& cspo,
+                      dbimpl::WriteBatch& opsc, dbimpl::WriteBatch&pcos);
 
 
     /**
@@ -171,8 +181,8 @@ class LevelDBPersistence {
      * unset to indicate wildcards) from the given database batch operations.
      */
     int64_t RemoveStatements(const rdf::proto::Statement& pattern,
-                             leveldb::WriteBatch& spoc, leveldb::WriteBatch& cspo,
-                             leveldb::WriteBatch& opsc, leveldb::WriteBatch&pcos);
+                             dbimpl::WriteBatch& spoc, dbimpl::WriteBatch& cspo,
+                             dbimpl::WriteBatch& opsc, dbimpl::WriteBatch&pcos);
 
 
 };

http://git-wip-us.apache.org/repos/asf/marmotta/blob/9aa5c2aa/libraries/ostrich/backend/util/time_logger.cc
----------------------------------------------------------------------
diff --git a/libraries/ostrich/backend/util/time_logger.cc b/libraries/ostrich/backend/util/time_logger.cc
index e836b61..a4650ca 100644
--- a/libraries/ostrich/backend/util/time_logger.cc
+++ b/libraries/ostrich/backend/util/time_logger.cc
@@ -6,8 +6,8 @@
 #include "time_logger.h"
 
 marmotta::util::TimeLogger::TimeLogger(const std::string &name)
-        : start_(std::chrono::steady_clock::now())
-        , name_(name) {
+        : name_(name)
+        , start_(std::chrono::steady_clock::now()) {
     LOG(INFO) << name << " started.";
 }
 


[3/8] marmotta git commit: use C++14 optional as wildcard in Ostrich SPARQL

Posted by ss...@apache.org.
use C++14 optional as wildcard in Ostrich SPARQL


Project: http://git-wip-us.apache.org/repos/asf/marmotta/repo
Commit: http://git-wip-us.apache.org/repos/asf/marmotta/commit/76ed0613
Tree: http://git-wip-us.apache.org/repos/asf/marmotta/tree/76ed0613
Diff: http://git-wip-us.apache.org/repos/asf/marmotta/diff/76ed0613

Branch: refs/heads/develop
Commit: 76ed0613164085e0dc807c0a1e78b3a87ef9277f
Parents: 3828f65
Author: Sebastian Schaffert <ss...@apache.org>
Authored: Mon Aug 22 21:53:40 2016 +0200
Committer: Sebastian Schaffert <ss...@apache.org>
Committed: Mon Aug 22 21:53:40 2016 +0200

----------------------------------------------------------------------
 libraries/ostrich/backend/CMakeLists.txt        |  2 +-
 .../backend/persistence/leveldb_sparql.cc       | 22 ++++++-----
 .../backend/persistence/leveldb_sparql.h        | 10 ++++-
 .../ostrich/backend/sparql/rasqal_adapter.cc    | 40 ++++++++++----------
 .../ostrich/backend/sparql/rasqal_adapter.h     | 10 +++--
 libraries/ostrich/backend/test/SparqlTest.cc    | 26 +++++++------
 6 files changed, 60 insertions(+), 50 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/marmotta/blob/76ed0613/libraries/ostrich/backend/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/libraries/ostrich/backend/CMakeLists.txt b/libraries/ostrich/backend/CMakeLists.txt
index 50efd24..87dfc7f 100644
--- a/libraries/ostrich/backend/CMakeLists.txt
+++ b/libraries/ostrich/backend/CMakeLists.txt
@@ -2,7 +2,7 @@ cmake_minimum_required(VERSION 3.0)
 project(Marmotta)
 
 set(CMAKE_MODULE_PATH ${CMAKE_MODULE_PATH} "${CMAKE_SOURCE_DIR}/cmake/")
-set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -std=c++11 -g")
+set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -std=c++1y -g")
 set(PROTOBUF_IMPORT_DIRS "${CMAKE_SOURCE_DIR}/model")
 set(USE_TCMALLOC TRUE)
 

http://git-wip-us.apache.org/repos/asf/marmotta/blob/76ed0613/libraries/ostrich/backend/persistence/leveldb_sparql.cc
----------------------------------------------------------------------
diff --git a/libraries/ostrich/backend/persistence/leveldb_sparql.cc b/libraries/ostrich/backend/persistence/leveldb_sparql.cc
index a71dc46..f0983b7 100644
--- a/libraries/ostrich/backend/persistence/leveldb_sparql.cc
+++ b/libraries/ostrich/backend/persistence/leveldb_sparql.cc
@@ -35,19 +35,20 @@ class WrapProtoStatementIterator : public util::ConvertingIterator<rdf::proto::S
 
 
 bool LevelDBTripleSource::HasStatement(
-        const rdf::Resource *s, const rdf::URI *p, const rdf::Value *o, const rdf::Resource *c) {
+        const optional<rdf::Resource>& s, const optional<rdf::URI>& p,
+        const optional<rdf::Value>& o, const optional<rdf::Resource>& c)  {
     rdf::proto::Statement pattern;
 
-    if (s != nullptr) {
+    if (s) {
         *pattern.mutable_subject() = s->getMessage();
     }
-    if (p != nullptr) {
+    if (p) {
         *pattern.mutable_predicate() = p->getMessage();
     }
-    if (o != nullptr) {
+    if (o) {
         *pattern.mutable_object() = o->getMessage();
     }
-    if (c != nullptr) {
+    if (c) {
         *pattern.mutable_context() = c->getMessage();
     }
 
@@ -61,19 +62,20 @@ bool LevelDBTripleSource::HasStatement(
 }
 
 std::unique_ptr<sparql::StatementIterator> LevelDBTripleSource::GetStatements(
-        const rdf::Resource *s, const rdf::URI *p, const rdf::Value *o, const rdf::Resource *c) {
+        const optional<rdf::Resource>& s, const optional<rdf::URI>& p,
+        const optional<rdf::Value>& o, const optional<rdf::Resource>& c)  {
     rdf::proto::Statement pattern;
 
-    if (s != nullptr) {
+    if (s) {
         *pattern.mutable_subject() = s->getMessage();
     }
-    if (p != nullptr) {
+    if (p) {
         *pattern.mutable_predicate() = p->getMessage();
     }
-    if (o != nullptr) {
+    if (o) {
         *pattern.mutable_object() = o->getMessage();
     }
-    if (c != nullptr) {
+    if (c) {
         *pattern.mutable_context() = c->getMessage();
     }
 

http://git-wip-us.apache.org/repos/asf/marmotta/blob/76ed0613/libraries/ostrich/backend/persistence/leveldb_sparql.h
----------------------------------------------------------------------
diff --git a/libraries/ostrich/backend/persistence/leveldb_sparql.h b/libraries/ostrich/backend/persistence/leveldb_sparql.h
index 9d8e989..7b103a4 100644
--- a/libraries/ostrich/backend/persistence/leveldb_sparql.h
+++ b/libraries/ostrich/backend/persistence/leveldb_sparql.h
@@ -25,6 +25,8 @@ namespace marmotta {
 namespace persistence {
 namespace sparql {
 
+using std::experimental::optional;
+
 /**
  * A SPARQL triple source using a LevelDBPersistence to access data.
  */
@@ -34,10 +36,14 @@ class LevelDBTripleSource : public ::marmotta::sparql::TripleSource {
     LevelDBTripleSource(LevelDBPersistence *persistence) : persistence(persistence) { }
 
 
-    bool HasStatement(const rdf::Resource *s, const rdf::URI *p, const rdf::Value *o, const rdf::Resource *c) override;
+    bool HasStatement(
+            const optional<rdf::Resource>& s, const optional<rdf::URI>& p,
+            const optional<rdf::Value>& o, const optional<rdf::Resource>& c) override;
 
     std::unique_ptr<::marmotta::sparql::StatementIterator>
-            GetStatements(const rdf::Resource *s, const rdf::URI *p, const rdf::Value *o, const rdf::Resource *c) override;
+            GetStatements(
+                    const optional<rdf::Resource>& s, const optional<rdf::URI>& p,
+                    const optional<rdf::Value>& o, const optional<rdf::Resource>& c) override;
 
  private:
     // A pointer to the persistence instance wrapped by this triple source.

http://git-wip-us.apache.org/repos/asf/marmotta/blob/76ed0613/libraries/ostrich/backend/sparql/rasqal_adapter.cc
----------------------------------------------------------------------
diff --git a/libraries/ostrich/backend/sparql/rasqal_adapter.cc b/libraries/ostrich/backend/sparql/rasqal_adapter.cc
index 1f6055b..9081e0d 100644
--- a/libraries/ostrich/backend/sparql/rasqal_adapter.cc
+++ b/libraries/ostrich/backend/sparql/rasqal_adapter.cc
@@ -124,52 +124,52 @@ int init_triples_match(
 
     SparqlService *self = (SparqlService *) *(void**)user_data;
 
-    std::unique_ptr<rdf::Resource> s = nullptr;
-    std::unique_ptr<rdf::URI> p = nullptr;
-    std::unique_ptr<rdf::Value> o = nullptr;
-    std::unique_ptr<rdf::Resource> c = nullptr;
+    optional<rdf::Resource> s;
+    optional<rdf::URI> p;
+    optional<rdf::Value> o;
+    optional<rdf::Resource> c;
 
     rasqal_variable* var;
     if ((var=rasqal_literal_as_variable(t->subject))) {
         m->bindings[0] = var;
         if (var->value) {
-            s.reset(new rdf::Resource(rasqal::ConvertResource(var->value)));
+            s = rasqal::ConvertResource(var->value);
         }
     } else {
-        s.reset(new rdf::Resource(rasqal::ConvertResource(t->subject)));
+        s = rasqal::ConvertResource(t->subject);
     }
 
     if ((var=rasqal_literal_as_variable(t->predicate))) {
         m->bindings[1] = var;
         if (var->value) {
-            p.reset(new rdf::URI(rasqal::ConvertURI(var->value)));
+            p = rasqal::ConvertURI(var->value);
         }
     } else {
-        p.reset(new rdf::URI(rasqal::ConvertURI(t->predicate)));
+        p = rasqal::ConvertURI(t->predicate);
     }
 
     if ((var=rasqal_literal_as_variable(t->object))) {
         m->bindings[2] = var;
         if (var->value) {
-            o.reset(new rdf::Value(rasqal::ConvertValue(var->value)));
+            o = rasqal::ConvertValue(var->value);
         }
     } else {
-        o.reset(new rdf::Value(rasqal::ConvertValue(t->object)));
+        o = rasqal::ConvertValue(t->object);
     }
 
     if(t->origin) {
         if ((var=rasqal_literal_as_variable(t->origin))) {
             m->bindings[3] = var;
             if (var->value) {
-                c.reset(new rdf::Resource(rasqal::ConvertResource(var->value)));
+                c = rasqal::ConvertResource(var->value);
             }
         } else {
-            c.reset(new rdf::Resource(rasqal::ConvertResource(t->origin)));
+            c = rasqal::ConvertResource(t->origin);
         }
     }
 
     // Store C++ iterator in user_data and take ownership
-    auto it = self->Source().GetStatements(s.get(), p.get(), o.get(), c.get());
+    auto it = self->Source().GetStatements(s, p, o, c);
     rtm->user_data = it.release();
 
     rtm->bind_match = bind_match;
@@ -185,18 +185,16 @@ int triple_present(
         struct rasqal_triples_source_s *rts, void *user_data, rasqal_triple *t) {
     DLOG(INFO) << "Check triple";
 
-    auto s = rasqal::ConvertResource(t->subject);
-    auto p = rasqal::ConvertURI(t->predicate);
-    auto o = rasqal::ConvertValue(t->object);
+    optional<rdf::Resource> s = rasqal::ConvertResource(t->subject);
+    optional<rdf::URI> p = rasqal::ConvertURI(t->predicate);
+    optional<rdf::Value> o = rasqal::ConvertValue(t->object);
+    optional<rdf::Resource> c;
 
     SparqlService *self = (SparqlService *) *(void**)user_data;
     if ((t->flags & RASQAL_TRIPLE_ORIGIN) != 0) {
-        auto c = rasqal::ConvertResource(t->origin);
-
-        return self->Source().HasStatement(&s, &p, &o, &c);
-    } else {
-        return self->Source().HasStatement(&s, &p, &o, nullptr);
+        c = rasqal::ConvertResource(t->origin);
     }
+    return self->Source().HasStatement(s, p, o, c);
 }
 
 void free_triples_source(void *user_data) {

http://git-wip-us.apache.org/repos/asf/marmotta/blob/76ed0613/libraries/ostrich/backend/sparql/rasqal_adapter.h
----------------------------------------------------------------------
diff --git a/libraries/ostrich/backend/sparql/rasqal_adapter.h b/libraries/ostrich/backend/sparql/rasqal_adapter.h
index 68f7b69..451b16d 100644
--- a/libraries/ostrich/backend/sparql/rasqal_adapter.h
+++ b/libraries/ostrich/backend/sparql/rasqal_adapter.h
@@ -20,6 +20,7 @@
 
 #include <map>
 #include <memory>
+#include <experimental/optional>
 #include <rasqal/rasqal.h>
 
 #include "model/rdf_model.h"
@@ -29,6 +30,7 @@ namespace marmotta {
 namespace sparql {
 
 using StatementIterator = util::CloseableIterator<rdf::Statement>;
+using std::experimental::optional;
 
 /**
  * An abstract superclass for more easily interfacing from the C++ Marmotta model
@@ -42,8 +44,8 @@ class TripleSource {
      * Parameters with nullptr value are interpreted as wildcards.
      */
     virtual bool HasStatement(
-            const rdf::Resource* s, const rdf::URI* p, const rdf::Value* o,
-            const rdf::Resource* c) = 0;
+            const optional<rdf::Resource>& s, const optional<rdf::URI>& p,
+            const optional<rdf::Value>& o, const optional<rdf::Resource>& c) = 0;
 
     /**
      * Return an iterator over statements matching the given subject, predicate,
@@ -52,8 +54,8 @@ class TripleSource {
      * Parameters with nullptr value are interpreted as wildcards.
      */
     virtual std::unique_ptr<StatementIterator> GetStatements(
-            const rdf::Resource* s, const rdf::URI* p, const rdf::Value* o,
-            const rdf::Resource* c) = 0;
+            const optional<rdf::Resource>& s, const optional<rdf::URI>& p,
+            const optional<rdf::Value>& o, const optional<rdf::Resource>& c) = 0;
 };
 
 class SparqlException : public std::exception {

http://git-wip-us.apache.org/repos/asf/marmotta/blob/76ed0613/libraries/ostrich/backend/test/SparqlTest.cc
----------------------------------------------------------------------
diff --git a/libraries/ostrich/backend/test/SparqlTest.cc b/libraries/ostrich/backend/test/SparqlTest.cc
index a45a282..75d00ca 100644
--- a/libraries/ostrich/backend/test/SparqlTest.cc
+++ b/libraries/ostrich/backend/test/SparqlTest.cc
@@ -13,7 +13,7 @@ namespace {
 
 const rdf::URI base_uri("http://example.com/");
 
-
+using std::experimental::optional;
 using MockStatementIterator = util::CollectionIterator<rdf::Statement>;
 
 class MockTripleSource : public TripleSource {
@@ -22,19 +22,20 @@ class MockTripleSource : public TripleSource {
     MockTripleSource(std::vector<rdf::Statement> statements)
             : statements(statements) { }
 
-    bool HasStatement(const rdf::Resource *s, const rdf::URI *p, const rdf::Value *o, const rdf::Resource *c) override {
+    bool HasStatement(const optional<rdf::Resource>& s, const optional<rdf::URI>& p,
+                      const optional<rdf::Value>& o, const optional<rdf::Resource>& c) override {
         for (const auto& stmt : statements) {
             bool match = true;
-            if (s != nullptr && *s != stmt.getSubject()) {
+            if (s && *s != stmt.getSubject()) {
                 match = false;
             }
-            if (p != nullptr && *p != stmt.getPredicate()) {
+            if (p && *p != stmt.getPredicate()) {
                 match = false;
             }
-            if (o != nullptr && *o != stmt.getObject()) {
+            if (o && *o != stmt.getObject()) {
                 match = false;
             }
-            if (c != nullptr && *c != stmt.getContext()) {
+            if (c && *c != stmt.getContext()) {
                 match = false;
             }
             if (!match) {
@@ -44,21 +45,22 @@ class MockTripleSource : public TripleSource {
         return false;
     }
 
-    std::unique_ptr<StatementIterator> GetStatements(const rdf::Resource *s, const rdf::URI *p,
-                                                             const rdf::Value *o, const rdf::Resource *c) override {
+    std::unique_ptr<StatementIterator> GetStatements(
+            const optional<rdf::Resource>& s, const optional<rdf::URI>& p,
+            const optional<rdf::Value>& o, const optional<rdf::Resource>& c) override {
         std::vector<rdf::Statement> results;
         for (const auto& stmt : statements) {
             bool match = true;
-            if (s != nullptr && *s != stmt.getSubject()) {
+            if (s && *s != stmt.getSubject()) {
                 match = false;
             }
-            if (p != nullptr && *p != stmt.getPredicate()) {
+            if (p && *p != stmt.getPredicate()) {
                 match = false;
             }
-            if (o != nullptr && *o != stmt.getObject()) {
+            if (o && *o != stmt.getObject()) {
                 match = false;
             }
-            if (c != nullptr && *c != stmt.getContext()) {
+            if (c && *c != stmt.getContext()) {
                 match = false;
             }
             if (match) {


[2/8] marmotta git commit: Java cleanups for KiWi SPARQL

Posted by ss...@apache.org.
Java cleanups for KiWi SPARQL


Project: http://git-wip-us.apache.org/repos/asf/marmotta/repo
Commit: http://git-wip-us.apache.org/repos/asf/marmotta/commit/3828f654
Tree: http://git-wip-us.apache.org/repos/asf/marmotta/tree/3828f654
Diff: http://git-wip-us.apache.org/repos/asf/marmotta/diff/3828f654

Branch: refs/heads/develop
Commit: 3828f65455f61919b1a5de991649a6b7dda1b991
Parents: 9aa5c2a
Author: Sebastian Schaffert <ss...@apache.org>
Authored: Mon Aug 22 21:53:02 2016 +0200
Committer: Sebastian Schaffert <ss...@apache.org>
Committed: Mon Aug 22 21:53:02 2016 +0200

----------------------------------------------------------------------
 .../marmotta/kiwi/loader/pgsql/PGCopyUtil.java  |  4 ++-
 .../kiwi/sparql/builder/SQLBuilder.java         | 32 +++++++++-----------
 .../sparql/builder/collect/ConditionFinder.java |  8 +++--
 .../sparql/builder/collect/DistinctFinder.java  | 10 ++++--
 .../sparql/builder/collect/ExtensionFinder.java |  9 ++++--
 .../sparql/builder/collect/GroupFinder.java     |  9 ++++--
 .../sparql/builder/collect/OrderFinder.java     |  8 +++--
 .../sparql/builder/collect/VariableFinder.java  |  7 +++--
 .../kiwi/sparql/builder/model/SQLVariable.java  |  8 +++++
 .../evaluation/KiWiEvaluationStrategy.java      | 13 +++++---
 10 files changed, 73 insertions(+), 35 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/marmotta/blob/3828f654/libraries/kiwi/kiwi-loader/src/main/java/org/apache/marmotta/kiwi/loader/pgsql/PGCopyUtil.java
----------------------------------------------------------------------
diff --git a/libraries/kiwi/kiwi-loader/src/main/java/org/apache/marmotta/kiwi/loader/pgsql/PGCopyUtil.java b/libraries/kiwi/kiwi-loader/src/main/java/org/apache/marmotta/kiwi/loader/pgsql/PGCopyUtil.java
index d6a8387..a749509 100644
--- a/libraries/kiwi/kiwi-loader/src/main/java/org/apache/marmotta/kiwi/loader/pgsql/PGCopyUtil.java
+++ b/libraries/kiwi/kiwi-loader/src/main/java/org/apache/marmotta/kiwi/loader/pgsql/PGCopyUtil.java
@@ -82,7 +82,9 @@ public class PGCopyUtil {
 
 
     // PostgreSQL expects the empty string to be quoted to distinguish between null and empty
-    final static CsvPreference nodesPreference = new CsvPreference.Builder('"', ',', "\r\n").useEncoder(new DefaultCsvEncoder() {
+    final static CsvPreference nodesPreference = new CsvPreference
+            .Builder('"', ',', "\r\n")
+            .useEncoder(new DefaultCsvEncoder() {
         /**
          * {@inheritDoc}
          */

http://git-wip-us.apache.org/repos/asf/marmotta/blob/3828f654/libraries/kiwi/kiwi-sparql/src/main/java/org/apache/marmotta/kiwi/sparql/builder/SQLBuilder.java
----------------------------------------------------------------------
diff --git a/libraries/kiwi/kiwi-sparql/src/main/java/org/apache/marmotta/kiwi/sparql/builder/SQLBuilder.java b/libraries/kiwi/kiwi-sparql/src/main/java/org/apache/marmotta/kiwi/sparql/builder/SQLBuilder.java
index 3b2da50..45b2340 100644
--- a/libraries/kiwi/kiwi-sparql/src/main/java/org/apache/marmotta/kiwi/sparql/builder/SQLBuilder.java
+++ b/libraries/kiwi/kiwi-sparql/src/main/java/org/apache/marmotta/kiwi/sparql/builder/SQLBuilder.java
@@ -203,25 +203,24 @@ public class SQLBuilder {
         limit    = new LimitFinder(query).limit;
 
         // check if query is distinct
-        distinct = new DistinctFinder(query).distinct;
+        distinct = DistinctFinder.find(query);
 
         // find the ordering
-        orderby  = new OrderFinder(query).elements;
+        orderby = OrderFinder.find(query);
 
         // find the grouping
-        GroupFinder gf  = new GroupFinder(query);
-        groupLabels      = gf.bindings;
+        groupLabels = GroupFinder.find(query);
 
         // find extensions (BIND)
-        extensions = new ExtensionFinder(query).elements;
+        extensions = ExtensionFinder.find(query);
 
         // find variables that need to be resolved
-        resolveVariables = new ConditionFinder(query).neededVariables;
+        resolveVariables = ConditionFinder.find(query);
 
         int variableCount = 0;
 
         // find all variables that have been bound already, even if they do not appear in a pattern
-        for(Var v : new VariableFinder(query).variables) {
+        for(Var v : VariableFinder.find(query)) {
             if (v.hasValue() && !isConst(v)) {
                 SQLVariable sv = variables.get(v.getName());
                 if(sv == null) {
@@ -232,7 +231,7 @@ public class SQLBuilder {
                         sv.setProjectionType(ValueType.NODE);
                     }
 
-                    sv.getExpressions().add(""+ converter.convert(v.getValue()).getId());
+                    sv.addExpression(""+ converter.convert(v.getValue()).getId());
 
                     addVariable(sv);
                 }
@@ -340,7 +339,7 @@ public class SQLBuilder {
             }
 
             try {
-                sv.getExpressions().add(evaluateExpression(ext.getExpr(), ValueType.NODE));
+                sv.addExpression(evaluateExpression(ext.getExpr(), ValueType.NODE));
                 if(sv.getProjectionType() == ValueType.NODE && getProjectionType(ext.getExpr()) != ValueType.NODE) {
                     sv.setProjectionType(getProjectionType(ext.getExpr()));
                 }
@@ -364,7 +363,7 @@ public class SQLBuilder {
                         String pName = p.getName();
 
                         // if the variable has been used before, add a join condition to the first occurrence
-                        if (!sv.getExpressions().isEmpty()) {
+                        if (sv.hasExpressions()) {
                             // case distinction: is this variable projected as node or as another value in an extension?
                             // if it is a value, we need to refer to the corresponding typed column of the node, otherwise
                             // to the node ID (field ID is sufficient)
@@ -394,7 +393,7 @@ public class SQLBuilder {
                             }
                         }
 
-                        sv.getExpressions().add(pName + "." + positions[i]);
+                        sv.addExpression(pName + "." + positions[i]);
                     }
                 }
             }
@@ -411,7 +410,7 @@ public class SQLBuilder {
                         sq.addCondition(sv.getExpressions().get(0) + " = " + sqName + "." + sq_v.getName());
                     }
 
-                    sv.getExpressions().add(sqName + "." + sq_v.getName());
+                    sv.addExpression(sqName + "." + sq_v.getName());
 
                 }
             }
@@ -421,7 +420,7 @@ public class SQLBuilder {
             Var v = new Var(ext.getName());
 
             SQLVariable sv = variables.get(v.getName());
-            sv.getExpressions().add(evaluateExpression(ext.getExpr(), ValueType.NODE));
+            sv.addExpression(evaluateExpression(ext.getExpr(), ValueType.NODE));
         }
 
         // find context restrictions of patterns and match them with potential restrictions given in the
@@ -529,8 +528,7 @@ public class SQLBuilder {
                         }
 
                         if (nodeId >= 0) {
-                            String condition = pName + "." + positions[i] + " = " + nodeId;
-                            p.addCondition(condition);
+                            p.addCondition(pName + "." + positions[i] + " = " + nodeId);
                         }
                     }
                 }
@@ -618,7 +616,7 @@ public class SQLBuilder {
             if(v.getProjectionType() != ValueType.NONE && (projectedVars.isEmpty() || projectedVars.contains(v.getSparqlName()))) {
 
 
-                if (v.getExpressions() != null && v.getExpressions().size() > 0) {
+                if (v.hasExpressions()) {
                     String fromName = v.getExpressions().get(0);
                     projections.add(fromName + " AS " + projectedName);
                 }
@@ -701,7 +699,7 @@ public class SQLBuilder {
             for(String v : bindings.getBindingNames()) {
                 SQLVariable sv = variables.get(v);
 
-                if(sv != null && !sv.getExpressions().isEmpty()) {
+                if(sv != null && sv.hasExpressions()) {
                     List<String> vNames = sv.getExpressions();
                     String vName = vNames.get(0);
                     Value binding = converter.convert(bindings.getValue(v));

http://git-wip-us.apache.org/repos/asf/marmotta/blob/3828f654/libraries/kiwi/kiwi-sparql/src/main/java/org/apache/marmotta/kiwi/sparql/builder/collect/ConditionFinder.java
----------------------------------------------------------------------
diff --git a/libraries/kiwi/kiwi-sparql/src/main/java/org/apache/marmotta/kiwi/sparql/builder/collect/ConditionFinder.java b/libraries/kiwi/kiwi-sparql/src/main/java/org/apache/marmotta/kiwi/sparql/builder/collect/ConditionFinder.java
index 64fd830..a167b51 100644
--- a/libraries/kiwi/kiwi-sparql/src/main/java/org/apache/marmotta/kiwi/sparql/builder/collect/ConditionFinder.java
+++ b/libraries/kiwi/kiwi-sparql/src/main/java/org/apache/marmotta/kiwi/sparql/builder/collect/ConditionFinder.java
@@ -35,12 +35,16 @@ public class ConditionFinder extends QueryModelVisitorBase<RuntimeException> {
     int valueNeeded = 0;
 
     // set of variables that need a value to be resolved (used by ExtensionElem resolution)
-    public Set<String> neededVariables = new HashSet<>();
+    private Set<String> neededVariables = new HashSet<>();
 
-    public ConditionFinder(TupleExpr expr) {
+    private ConditionFinder(TupleExpr expr) {
         expr.visit(this);
     }
 
+    public static Set<String> find(TupleExpr expr) {
+        return new ConditionFinder(expr).neededVariables;
+    }
+
     @Override
     public void meet(Var node) throws RuntimeException {
         if(valueNeeded > 0) {

http://git-wip-us.apache.org/repos/asf/marmotta/blob/3828f654/libraries/kiwi/kiwi-sparql/src/main/java/org/apache/marmotta/kiwi/sparql/builder/collect/DistinctFinder.java
----------------------------------------------------------------------
diff --git a/libraries/kiwi/kiwi-sparql/src/main/java/org/apache/marmotta/kiwi/sparql/builder/collect/DistinctFinder.java b/libraries/kiwi/kiwi-sparql/src/main/java/org/apache/marmotta/kiwi/sparql/builder/collect/DistinctFinder.java
index 31fb293..2074e2f 100644
--- a/libraries/kiwi/kiwi-sparql/src/main/java/org/apache/marmotta/kiwi/sparql/builder/collect/DistinctFinder.java
+++ b/libraries/kiwi/kiwi-sparql/src/main/java/org/apache/marmotta/kiwi/sparql/builder/collect/DistinctFinder.java
@@ -20,6 +20,8 @@ package org.apache.marmotta.kiwi.sparql.builder.collect;
 import org.openrdf.query.algebra.*;
 import org.openrdf.query.algebra.helpers.QueryModelVisitorBase;
 
+import java.util.List;
+
 /**
 * Find distinct/reduced in a tuple expression.
 *
@@ -27,12 +29,16 @@ import org.openrdf.query.algebra.helpers.QueryModelVisitorBase;
 */
 public class DistinctFinder extends QueryModelVisitorBase<RuntimeException> {
 
-    public boolean distinct = false;
+    private boolean distinct = false;
 
-    public DistinctFinder(TupleExpr expr) {
+    private DistinctFinder(TupleExpr expr) {
         expr.visit(this);
     }
 
+    public static boolean find(TupleExpr expr) {
+        return new DistinctFinder(expr).distinct;
+    }
+
     @Override
     public void meet(Distinct node) throws RuntimeException {
         distinct = true;

http://git-wip-us.apache.org/repos/asf/marmotta/blob/3828f654/libraries/kiwi/kiwi-sparql/src/main/java/org/apache/marmotta/kiwi/sparql/builder/collect/ExtensionFinder.java
----------------------------------------------------------------------
diff --git a/libraries/kiwi/kiwi-sparql/src/main/java/org/apache/marmotta/kiwi/sparql/builder/collect/ExtensionFinder.java b/libraries/kiwi/kiwi-sparql/src/main/java/org/apache/marmotta/kiwi/sparql/builder/collect/ExtensionFinder.java
index 75579d7..d6b7cbe 100644
--- a/libraries/kiwi/kiwi-sparql/src/main/java/org/apache/marmotta/kiwi/sparql/builder/collect/ExtensionFinder.java
+++ b/libraries/kiwi/kiwi-sparql/src/main/java/org/apache/marmotta/kiwi/sparql/builder/collect/ExtensionFinder.java
@@ -24,6 +24,7 @@ import org.slf4j.LoggerFactory;
 
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Set;
 
 /**
 * Find the offset and limit values in a tuple expression
@@ -34,12 +35,16 @@ public class ExtensionFinder extends QueryModelVisitorBase<RuntimeException> {
 
     private static Logger log = LoggerFactory.getLogger(ExtensionFinder.class);
 
-    public List<ExtensionElem> elements = new ArrayList<>();
+    private List<ExtensionElem> elements = new ArrayList<>();
 
-    public ExtensionFinder(TupleExpr expr) {
+    private ExtensionFinder(TupleExpr expr) {
         expr.visit(this);
     }
 
+    public static List<ExtensionElem> find(TupleExpr expr) {
+        return new ExtensionFinder(expr).elements;
+    }
+
     @Override
     public void meet(Extension node) throws RuntimeException {
         // visit children before, as there might be dependencies

http://git-wip-us.apache.org/repos/asf/marmotta/blob/3828f654/libraries/kiwi/kiwi-sparql/src/main/java/org/apache/marmotta/kiwi/sparql/builder/collect/GroupFinder.java
----------------------------------------------------------------------
diff --git a/libraries/kiwi/kiwi-sparql/src/main/java/org/apache/marmotta/kiwi/sparql/builder/collect/GroupFinder.java b/libraries/kiwi/kiwi-sparql/src/main/java/org/apache/marmotta/kiwi/sparql/builder/collect/GroupFinder.java
index 81fa0bf..ae52df8 100644
--- a/libraries/kiwi/kiwi-sparql/src/main/java/org/apache/marmotta/kiwi/sparql/builder/collect/GroupFinder.java
+++ b/libraries/kiwi/kiwi-sparql/src/main/java/org/apache/marmotta/kiwi/sparql/builder/collect/GroupFinder.java
@@ -32,13 +32,16 @@ import java.util.Set;
 */
 public class GroupFinder extends QueryModelVisitorBase<RuntimeException> {
 
-    public Set<String>     bindings = new HashSet<>();
-    public List<GroupElem> elements = new ArrayList<>();
+    private Set<String>     bindings = new HashSet<>();
+    private List<GroupElem> elements = new ArrayList<>();
 
-    public GroupFinder(TupleExpr expr) {
+    private GroupFinder(TupleExpr expr) {
         expr.visit(this);
     }
 
+    public static Set<String> find(TupleExpr expr) {
+        return new GroupFinder(expr).bindings;
+    }
     @Override
     public void meet(Group node) throws RuntimeException {
         bindings.addAll(node.getGroupBindingNames());

http://git-wip-us.apache.org/repos/asf/marmotta/blob/3828f654/libraries/kiwi/kiwi-sparql/src/main/java/org/apache/marmotta/kiwi/sparql/builder/collect/OrderFinder.java
----------------------------------------------------------------------
diff --git a/libraries/kiwi/kiwi-sparql/src/main/java/org/apache/marmotta/kiwi/sparql/builder/collect/OrderFinder.java b/libraries/kiwi/kiwi-sparql/src/main/java/org/apache/marmotta/kiwi/sparql/builder/collect/OrderFinder.java
index dc5d2f2..97349ff 100644
--- a/libraries/kiwi/kiwi-sparql/src/main/java/org/apache/marmotta/kiwi/sparql/builder/collect/OrderFinder.java
+++ b/libraries/kiwi/kiwi-sparql/src/main/java/org/apache/marmotta/kiwi/sparql/builder/collect/OrderFinder.java
@@ -30,12 +30,16 @@ import java.util.List;
 */
 public class OrderFinder extends QueryModelVisitorBase<RuntimeException> {
 
-    public List<OrderElem> elements = new ArrayList<>();
+    private List<OrderElem> elements = new ArrayList<>();
 
-    public OrderFinder(TupleExpr expr) {
+    private OrderFinder(TupleExpr expr) {
         expr.visit(this);
     }
 
+    public static List<OrderElem> find(TupleExpr expr) {
+        return new OrderFinder(expr).elements;
+    }
+
     @Override
     public void meet(Order node) throws RuntimeException {
         elements.addAll(node.getElements());

http://git-wip-us.apache.org/repos/asf/marmotta/blob/3828f654/libraries/kiwi/kiwi-sparql/src/main/java/org/apache/marmotta/kiwi/sparql/builder/collect/VariableFinder.java
----------------------------------------------------------------------
diff --git a/libraries/kiwi/kiwi-sparql/src/main/java/org/apache/marmotta/kiwi/sparql/builder/collect/VariableFinder.java b/libraries/kiwi/kiwi-sparql/src/main/java/org/apache/marmotta/kiwi/sparql/builder/collect/VariableFinder.java
index 2cc7f76..81d2b14 100644
--- a/libraries/kiwi/kiwi-sparql/src/main/java/org/apache/marmotta/kiwi/sparql/builder/collect/VariableFinder.java
+++ b/libraries/kiwi/kiwi-sparql/src/main/java/org/apache/marmotta/kiwi/sparql/builder/collect/VariableFinder.java
@@ -32,12 +32,15 @@ import java.util.Set;
 */
 public class VariableFinder extends QueryModelVisitorBase<RuntimeException> {
 
-    public Set<Var> variables = new HashSet<>();
+    private Set<Var> variables = new HashSet<>();
 
-    public VariableFinder(TupleExpr expr) {
+    private VariableFinder(TupleExpr expr) {
         expr.visit(this);
     }
 
+    public static Set<Var> find(TupleExpr expr) {
+        return new VariableFinder(expr).variables;
+    }
 
     @Override
     public void meet(Var node) throws RuntimeException {

http://git-wip-us.apache.org/repos/asf/marmotta/blob/3828f654/libraries/kiwi/kiwi-sparql/src/main/java/org/apache/marmotta/kiwi/sparql/builder/model/SQLVariable.java
----------------------------------------------------------------------
diff --git a/libraries/kiwi/kiwi-sparql/src/main/java/org/apache/marmotta/kiwi/sparql/builder/model/SQLVariable.java b/libraries/kiwi/kiwi-sparql/src/main/java/org/apache/marmotta/kiwi/sparql/builder/model/SQLVariable.java
index 23715fe..f187e1e 100644
--- a/libraries/kiwi/kiwi-sparql/src/main/java/org/apache/marmotta/kiwi/sparql/builder/model/SQLVariable.java
+++ b/libraries/kiwi/kiwi-sparql/src/main/java/org/apache/marmotta/kiwi/sparql/builder/model/SQLVariable.java
@@ -113,10 +113,18 @@ public class SQLVariable  implements Cloneable{
         return bindings;
     }
 
+    public void addExpression(String e) {
+        expressions.add(e);
+    }
+
     public List<String> getExpressions() {
         return expressions;
     }
 
+    public boolean hasExpressions() {
+        return expressions != null && !expressions.isEmpty();
+    }
+
     public ValueType getProjectionType() {
         return projectionType;
     }

http://git-wip-us.apache.org/repos/asf/marmotta/blob/3828f654/libraries/kiwi/kiwi-sparql/src/main/java/org/apache/marmotta/kiwi/sparql/evaluation/KiWiEvaluationStrategy.java
----------------------------------------------------------------------
diff --git a/libraries/kiwi/kiwi-sparql/src/main/java/org/apache/marmotta/kiwi/sparql/evaluation/KiWiEvaluationStrategy.java b/libraries/kiwi/kiwi-sparql/src/main/java/org/apache/marmotta/kiwi/sparql/evaluation/KiWiEvaluationStrategy.java
index 6315886..a198ebc 100644
--- a/libraries/kiwi/kiwi-sparql/src/main/java/org/apache/marmotta/kiwi/sparql/evaluation/KiWiEvaluationStrategy.java
+++ b/libraries/kiwi/kiwi-sparql/src/main/java/org/apache/marmotta/kiwi/sparql/evaluation/KiWiEvaluationStrategy.java
@@ -240,7 +240,8 @@ public class KiWiEvaluationStrategy extends EvaluationStrategyImpl{
                         long[] nodeIds = new long[vars.size()];
                         for(int i=0; i<vars.size(); i++) {
                             SQLVariable sv = vars.get(i);
-                            if(sv.getProjectionType() == ValueType.NODE && (builder.getProjectedVars().isEmpty() || builder.getProjectedVars().contains(sv.getSparqlName()))) {
+                            if(sv.getProjectionType() == ValueType.NODE && (builder.getProjectedVars().isEmpty()
+                                    || builder.getProjectedVars().contains(sv.getSparqlName()))) {
                                 nodeIds[i] = row.getLong(sv.getName());
                             }
                         }
@@ -251,14 +252,17 @@ public class KiWiEvaluationStrategy extends EvaluationStrategyImpl{
                             if(nodes[i] != null) {
                                 // resolved node
                                 resultRow.addBinding(sv.getSparqlName(), nodes[i]);
-                            } else if(sv.getProjectionType() != ValueType.NONE && (builder.getProjectedVars().isEmpty() || builder.getProjectedVars().contains(sv.getSparqlName()))) {
+                            } else if(sv.getProjectionType() != ValueType.NONE && (builder.getProjectedVars().isEmpty()
+                                    || builder.getProjectedVars().contains(sv.getSparqlName()))) {
                                 // literal value
                                 String svalue;
                                 switch (sv.getProjectionType()) {
                                     case URI:
                                         svalue = row.getString(sv.getName());
                                         if(svalue != null)
-                                            resultRow.addBinding(sv.getSparqlName(), new URIImpl(svalue));
+                                            try {
+                                                resultRow.addBinding(sv.getSparqlName(), new URIImpl(svalue));
+                                            } catch (IllegalArgumentException ex) {} // illegal URI unbound
                                         break;
                                     case BNODE:
                                         svalue = row.getString(sv.getName());
@@ -373,7 +377,8 @@ public class KiWiEvaluationStrategy extends EvaluationStrategyImpl{
                 });
 
 
-                return new ExceptionConvertingIteration<BindingSet, QueryEvaluationException>(new CloseableIteratorIteration<BindingSet, SQLException>(Iterations.asList(it).iterator())) {
+                return new ExceptionConvertingIteration<BindingSet, QueryEvaluationException>(
+                        new CloseableIteratorIteration<BindingSet, SQLException>(Iterations.asList(it).iterator())) {
                     @Override
                     protected QueryEvaluationException convert(Exception e) {
                         return new QueryEvaluationException(e);


[7/8] marmotta git commit: Ostrich: reorganize build targets for more efficient builds

Posted by ss...@apache.org.
Ostrich: reorganize build targets for more efficient builds


Project: http://git-wip-us.apache.org/repos/asf/marmotta/repo
Commit: http://git-wip-us.apache.org/repos/asf/marmotta/commit/f56b1b90
Tree: http://git-wip-us.apache.org/repos/asf/marmotta/tree/f56b1b90
Diff: http://git-wip-us.apache.org/repos/asf/marmotta/diff/f56b1b90

Branch: refs/heads/develop
Commit: f56b1b9065f1fceb1d0abd54deeb5bcbd0e1ef21
Parents: b8d122a
Author: Sebastian Schaffert <ss...@apache.org>
Authored: Thu Aug 25 13:03:53 2016 +0200
Committer: Sebastian Schaffert <ss...@apache.org>
Committed: Thu Aug 25 13:03:53 2016 +0200

----------------------------------------------------------------------
 .../ostrich/backend/persistence/CMakeLists.txt  | 29 ++++++++++++--------
 libraries/ostrich/backend/test/CMakeLists.txt   |  2 +-
 2 files changed, 19 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/marmotta/blob/f56b1b90/libraries/ostrich/backend/persistence/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/libraries/ostrich/backend/persistence/CMakeLists.txt b/libraries/ostrich/backend/persistence/CMakeLists.txt
index 83bb144..fcf8366 100644
--- a/libraries/ostrich/backend/persistence/CMakeLists.txt
+++ b/libraries/ostrich/backend/persistence/CMakeLists.txt
@@ -1,31 +1,38 @@
 include_directories(.. ${CMAKE_CURRENT_BINARY_DIR}/.. ${CMAKE_CURRENT_BINARY_DIR}/../model ${RAPTOR_INCLUDE_DIR}/raptor2)
 
+add_library(marmotta_persistence
+        leveldb_sparql.cc leveldb_sparql.h
+        base_persistence.cc base_persistence.h
+        leveldb_service.cc leveldb_service.h )
+target_link_libraries(marmotta_persistence
+        marmotta_model marmotta_util marmotta_sparql marmotta_service
+        ${GLOG_LIBRARY} ${PROTOBUF_LIBRARIES})
+
+if (LevelDB_FOUND)
 # Shared Marmotta Ostrich persistence implementation
-add_library(marmotta_leveldb
-        leveldb_persistence.cc leveldb_persistence.h leveldb_sparql.cc leveldb_sparql.h base_persistence.cc base_persistence.h)
+add_library(marmotta_leveldb leveldb_persistence.cc leveldb_persistence.h)
 target_link_libraries(marmotta_leveldb
-        marmotta_model marmotta_util marmotta_sparql marmotta_service
-        ${LevelDB_LIBRARY} ${GLOG_LIBRARY} ${PROTOBUF_LIBRARIES})
+        marmotta_persistence ${LevelDB_LIBRARY} ${GLOG_LIBRARY} ${PROTOBUF_LIBRARIES})
 
 # Server binary
-add_executable(leveldb_persistence
-        leveldb_service.cc leveldb_service.h leveldb_server.cc )
+add_executable(leveldb_persistence leveldb_server.cc)
 target_link_libraries(leveldb_persistence marmotta_service marmotta_leveldb
         ${GFLAGS_LIBRARY} ${CMAKE_THREAD_LIBS_INIT} ${GRPC_LIBRARIES} ${Tcmalloc_LIBRARIES})
 install(TARGETS leveldb_persistence DESTINATION bin)
+endif()
 
+if (RocksDB_FOUND)
 add_library(marmotta_rocksdb
-        rocksdb_persistence.cc rocksdb_persistence.h leveldb_sparql.cc leveldb_sparql.h base_persistence.cc base_persistence.h)
+        rocksdb_persistence.cc rocksdb_persistence.h)
 target_link_libraries(marmotta_rocksdb
-        marmotta_model marmotta_util marmotta_sparql marmotta_service
-        ${RocksDB_LIBRARY} ${GLOG_LIBRARY} ${PROTOBUF_LIBRARIES})
+        marmotta_persistence ${RocksDB_LIBRARY} ${GLOG_LIBRARY} ${PROTOBUF_LIBRARIES})
 
 # Server binary
-add_executable(rocksdb_persistence
-        leveldb_service.cc leveldb_service.h rocksdb_server.cc )
+add_executable(rocksdb_persistence rocksdb_server.cc )
 target_link_libraries(rocksdb_persistence marmotta_service marmotta_rocksdb
         ${GFLAGS_LIBRARY} ${CMAKE_THREAD_LIBS_INIT} ${GRPC_LIBRARIES} ${Tcmalloc_LIBRARIES})
 install(TARGETS rocksdb_persistence DESTINATION bin)
+endif()
 
 
 # Command line admin tool

http://git-wip-us.apache.org/repos/asf/marmotta/blob/f56b1b90/libraries/ostrich/backend/test/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/libraries/ostrich/backend/test/CMakeLists.txt b/libraries/ostrich/backend/test/CMakeLists.txt
index c3beef9..a24e395 100644
--- a/libraries/ostrich/backend/test/CMakeLists.txt
+++ b/libraries/ostrich/backend/test/CMakeLists.txt
@@ -16,7 +16,7 @@ add_executable(leveldb_tests main.cc LevelDBTest.cc)
 target_link_libraries(leveldb_tests gtest marmotta_leveldb ${GLOG_LIBRARY} ${Boost_LIBRARIES})
 
 add_executable(persistence_tests main.cc PersistenceTest.cc)
-target_link_libraries(persistence_tests gtest marmotta_leveldb ${GLOG_LIBRARY} ${Boost_LIBRARIES})
+target_link_libraries(persistence_tests gtest marmotta_persistence ${GLOG_LIBRARY} ${Boost_LIBRARIES})
 
 add_test(NAME ModelTest COMMAND model_tests)
 add_test(NAME SparqlTest COMMAND sparql_tests)


[8/8] marmotta git commit: Merge remote-tracking branch 'origin/develop' into develop

Posted by ss...@apache.org.
Merge remote-tracking branch 'origin/develop' into develop


Project: http://git-wip-us.apache.org/repos/asf/marmotta/repo
Commit: http://git-wip-us.apache.org/repos/asf/marmotta/commit/ed49359d
Tree: http://git-wip-us.apache.org/repos/asf/marmotta/tree/ed49359d
Diff: http://git-wip-us.apache.org/repos/asf/marmotta/diff/ed49359d

Branch: refs/heads/develop
Commit: ed49359d88b51db0bca3c13a186b77077bf8554f
Parents: f56b1b9 bc09d27
Author: Sebastian Schaffert <ss...@apache.org>
Authored: Thu Aug 25 13:04:14 2016 +0200
Committer: Sebastian Schaffert <ss...@apache.org>
Committed: Thu Aug 25 13:04:14 2016 +0200

----------------------------------------------------------------------
 .travis.yml | 3 +++
 1 file changed, 3 insertions(+)
----------------------------------------------------------------------



[6/8] marmotta git commit: Ostrich: - separate RocksDB and LevelDB support - refactor the whole persistence architecture to be more object oriented and easier to understand

Posted by ss...@apache.org.
Ostrich:
- separate RocksDB and LevelDB support
- refactor the whole persistence architecture to be more object oriented and easier to understand


Project: http://git-wip-us.apache.org/repos/asf/marmotta/repo
Commit: http://git-wip-us.apache.org/repos/asf/marmotta/commit/b8d122a1
Tree: http://git-wip-us.apache.org/repos/asf/marmotta/tree/b8d122a1
Diff: http://git-wip-us.apache.org/repos/asf/marmotta/diff/b8d122a1

Branch: refs/heads/develop
Commit: b8d122a19406008ef5a6c1c7ce119f4563a8be35
Parents: d811ef3
Author: Sebastian Schaffert <ss...@apache.org>
Authored: Thu Aug 25 10:58:19 2016 +0200
Committer: Sebastian Schaffert <ss...@apache.org>
Committed: Thu Aug 25 10:58:19 2016 +0200

----------------------------------------------------------------------
 libraries/ostrich/backend/CMakeLists.txt        |  19 +-
 .../ostrich/backend/persistence/CMakeLists.txt  |  24 +-
 .../backend/persistence/base_persistence.cc     | 193 ++++++++
 .../backend/persistence/base_persistence.h      | 217 ++++++++
 .../backend/persistence/leveldb_persistence.cc  | 435 +++++------------
 .../backend/persistence/leveldb_persistence.h   |  75 +--
 .../backend/persistence/leveldb_server.cc       |   1 +
 .../backend/persistence/leveldb_service.cc      |  22 +-
 .../backend/persistence/leveldb_service.h       |  10 +-
 .../backend/persistence/leveldb_sparql.h        |   6 +-
 .../backend/persistence/rocksdb_persistence.cc  | 489 +++++++++++++++++++
 .../backend/persistence/rocksdb_persistence.h   | 163 +++++++
 .../backend/persistence/rocksdb_server.cc       |  75 +++
 libraries/ostrich/backend/test/CMakeLists.txt   |   4 +
 libraries/ostrich/backend/test/LevelDBTest.cc   | 268 ++++++++++
 .../ostrich/backend/test/PersistenceTest.cc     | 277 ++---------
 16 files changed, 1627 insertions(+), 651 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/marmotta/blob/b8d122a1/libraries/ostrich/backend/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/libraries/ostrich/backend/CMakeLists.txt b/libraries/ostrich/backend/CMakeLists.txt
index 87dfc7f..7c64230 100644
--- a/libraries/ostrich/backend/CMakeLists.txt
+++ b/libraries/ostrich/backend/CMakeLists.txt
@@ -12,10 +12,10 @@ find_package (Rasqal REQUIRED)
 find_package (GFlags REQUIRED)
 find_package (Protobuf REQUIRED)
 find_package (GRPC REQUIRED)
-find_package (LevelDB REQUIRED)
+find_package (LevelDB)
+find_package (RocksDB)
 find_package (GLog REQUIRED)
 find_package (Boost 1.54.0 COMPONENTS iostreams filesystem system)
-find_package (RocksDB)
 find_package (Tcmalloc)
 
 add_definitions(-DNDEBUG)
@@ -26,14 +26,17 @@ if (Boost_IOSTREAMS_FOUND)
 endif (Boost_IOSTREAMS_FOUND)
 
 if (RocksDB_FOUND)
-    message(STATUS "Enabling RocksDB support (RocksDB found)")
-    add_definitions(-DHAVE_ROCKSDB)
-    set(PERSISTENCE_LIBRARY ${RocksDB_LIBRARY})
-else (RocksDB_FOUND)
-    message(STATUS "Using standard LevelDB (RocksDB not found)")
-    set(PERSISTENCE_LIBRARY ${LevelDB_LIBRARY})
+    message(STATUS "Enabling RocksDB support")
 endif (RocksDB_FOUND)
 
+if (LevelDB_FOUND)
+    message(STATUS "Enabling LevelDB support")
+endif (LevelDB_FOUND)
+
+if ((NOT LevelDB_FOUND) AND (NOT RocksDB_FOUND))
+    message(FATAL_ERROR "Could not find any persistence library (RocksDB or LevelDB")
+endif()
+
 if (Tcmalloc_FOUND)
     message(STATUS "Enabling profiling support (Tcmalloc found)")
 endif (Tcmalloc_FOUND)

http://git-wip-us.apache.org/repos/asf/marmotta/blob/b8d122a1/libraries/ostrich/backend/persistence/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/libraries/ostrich/backend/persistence/CMakeLists.txt b/libraries/ostrich/backend/persistence/CMakeLists.txt
index b6ec1b5..83bb144 100644
--- a/libraries/ostrich/backend/persistence/CMakeLists.txt
+++ b/libraries/ostrich/backend/persistence/CMakeLists.txt
@@ -2,17 +2,31 @@ include_directories(.. ${CMAKE_CURRENT_BINARY_DIR}/.. ${CMAKE_CURRENT_BINARY_DIR
 
 # Shared Marmotta Ostrich persistence implementation
 add_library(marmotta_leveldb
-        leveldb_persistence.cc leveldb_persistence.h leveldb_sparql.cc leveldb_sparql.h)
+        leveldb_persistence.cc leveldb_persistence.h leveldb_sparql.cc leveldb_sparql.h base_persistence.cc base_persistence.h)
 target_link_libraries(marmotta_leveldb
         marmotta_model marmotta_util marmotta_sparql marmotta_service
-        ${PERSISTENCE_LIBRARY} ${GLOG_LIBRARY} ${PROTOBUF_LIBRARIES})
+        ${LevelDB_LIBRARY} ${GLOG_LIBRARY} ${PROTOBUF_LIBRARIES})
 
 # Server binary
-add_executable(marmotta_persistence
+add_executable(leveldb_persistence
         leveldb_service.cc leveldb_service.h leveldb_server.cc )
-target_link_libraries(marmotta_persistence marmotta_service marmotta_leveldb
+target_link_libraries(leveldb_persistence marmotta_service marmotta_leveldb
         ${GFLAGS_LIBRARY} ${CMAKE_THREAD_LIBS_INIT} ${GRPC_LIBRARIES} ${Tcmalloc_LIBRARIES})
-install(TARGETS marmotta_persistence DESTINATION bin)
+install(TARGETS leveldb_persistence DESTINATION bin)
+
+add_library(marmotta_rocksdb
+        rocksdb_persistence.cc rocksdb_persistence.h leveldb_sparql.cc leveldb_sparql.h base_persistence.cc base_persistence.h)
+target_link_libraries(marmotta_rocksdb
+        marmotta_model marmotta_util marmotta_sparql marmotta_service
+        ${RocksDB_LIBRARY} ${GLOG_LIBRARY} ${PROTOBUF_LIBRARIES})
+
+# Server binary
+add_executable(rocksdb_persistence
+        leveldb_service.cc leveldb_service.h rocksdb_server.cc )
+target_link_libraries(rocksdb_persistence marmotta_service marmotta_rocksdb
+        ${GFLAGS_LIBRARY} ${CMAKE_THREAD_LIBS_INIT} ${GRPC_LIBRARIES} ${Tcmalloc_LIBRARIES})
+install(TARGETS rocksdb_persistence DESTINATION bin)
+
 
 # Command line admin tool
 add_executable(marmotta_updatedb marmotta_updatedb.cc)

http://git-wip-us.apache.org/repos/asf/marmotta/blob/b8d122a1/libraries/ostrich/backend/persistence/base_persistence.cc
----------------------------------------------------------------------
diff --git a/libraries/ostrich/backend/persistence/base_persistence.cc b/libraries/ostrich/backend/persistence/base_persistence.cc
new file mode 100644
index 0000000..6635647
--- /dev/null
+++ b/libraries/ostrich/backend/persistence/base_persistence.cc
@@ -0,0 +1,193 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "persistence/base_persistence.h"
+
+#include <cstring>
+
+#include "model/rdf_operators.h"
+#include "util/murmur3.h"
+
+using marmotta::rdf::proto::Statement;
+
+namespace marmotta {
+namespace persistence {
+namespace {
+inline bool computeKey(const std::string* s, char* result) {
+    // 128bit keys, use murmur
+    if (s != nullptr) {
+#ifdef __x86_64__
+        MurmurHash3_x64_128(s->data(), s->size(), 13, result);
+#else
+        MurmurHash3_x86_128(s->data(), s->size(), 13, result);
+#endif
+        return true;
+    } else {
+        return false;
+    }
+}
+
+inline bool computeKey(
+        const google::protobuf::Message& msg, bool enabled, char* result) {
+    if (enabled) {
+        std::string s;
+        msg.SerializeToString(&s);
+        return computeKey(&s, result);
+    }
+    return false;
+}
+
+inline void copyKey(const char* hash, bool enabled, int base, char* dest) {
+    if (enabled)
+        memcpy(dest, hash, kKeyLength);
+    else
+        memset(dest, base, kKeyLength);
+}
+}
+
+Key::Key(const std::string* s, const std::string* p,
+         const std::string* o, const std::string* c)
+        : sEnabled(computeKey(s, sHash)), pEnabled(computeKey(p, pHash))
+        , oEnabled(computeKey(o, oHash)), cEnabled(computeKey(c, cHash)) {
+}
+
+Key::Key(const rdf::proto::Statement& stmt)
+        : sEnabled(computeKey(stmt.subject(), stmt.has_subject(), sHash))
+        , pEnabled(computeKey(stmt.predicate(), stmt.has_predicate(), pHash))
+        , oEnabled(computeKey(stmt.object(), stmt.has_object(), oHash))
+        , cEnabled(computeKey(stmt.context(), stmt.has_context(), cHash)) {
+}
+
+char* Key::Create(IndexTypes type, BoundTypes bound) const {
+    char* result = new char[kKeyLength * 4];
+    memset(result, 0x00, kKeyLength);
+
+    int base = 0x00;
+
+    switch (bound) {
+        case LOWER:
+            base = 0x00;
+            break;
+        case UPPER:
+            base = 0xFF;
+            break;
+    }
+
+    switch (type) {
+        case SPOC:
+            copyKey(sHash, sEnabled, base, result);
+            copyKey(pHash, pEnabled, base, &result[kKeyLength]);
+            copyKey(oHash, oEnabled, base, &result[2 * kKeyLength]);
+            copyKey(cHash, cEnabled, base, &result[3 * kKeyLength]);
+            break;
+        case CSPO:
+            copyKey(cHash, cEnabled, base, result);
+            copyKey(sHash, sEnabled, base, &result[kKeyLength]);
+            copyKey(pHash, pEnabled, base, &result[2 * kKeyLength]);
+            copyKey(oHash, oEnabled, base, &result[3 * kKeyLength]);
+            break;
+        case OPSC:
+            copyKey(oHash, oEnabled, base, result);
+            copyKey(pHash, pEnabled, base, &result[kKeyLength]);
+            copyKey(sHash, sEnabled, base, &result[2 * kKeyLength]);
+            copyKey(cHash, cEnabled, base, &result[3 * kKeyLength]);
+            break;
+        case PCOS:
+            copyKey(pHash, pEnabled, base, result);
+            copyKey(cHash, cEnabled, base, &result[kKeyLength]);
+            copyKey(oHash, oEnabled, base, &result[2 * kKeyLength]);
+            copyKey(sHash, sEnabled, base, &result[3 * kKeyLength]);
+            break;
+    }
+    return result;
+}
+
+
+
+Pattern::Pattern(const Statement& pattern) : key_(pattern), needsFilter_(true) {
+
+    if (pattern.has_subject()) {
+        // Subject is usually most selective, so if it is present use the
+        // subject-based databases first.
+        if (pattern.has_context()) {
+            type_ = CSPO;
+        } else {
+            type_ = SPOC;
+        }
+
+        // Filter needed if there is no predicate but an object.
+        needsFilter_ = !(pattern.has_predicate()) && pattern.has_object();
+    } else if (pattern.has_object()) {
+        // Second-best option is object.
+        type_ = OPSC;
+
+        // Filter needed if there is a context (subject already checked, predicate irrelevant).
+        needsFilter_ = pattern.has_context();
+    } else if (pattern.has_predicate()) {
+        // Predicate is usually least selective.
+        type_ = PCOS;
+
+        // No filter needed, object and subject are not set.
+        needsFilter_ = false;
+    } else if (pattern.has_context()) {
+        type_ = CSPO;
+
+        // No filter needed, subject, predicate object are not set.
+        needsFilter_ = false;
+    } else {
+        // Fall back to SPOC.
+        type_ = SPOC;
+
+        // No filter needed, we just scan from the beginning.
+        needsFilter_ = false;
+    }
+}
+
+/**
+ * Return the lower key for querying the index (range [MinKey,MaxKey) ).
+ */
+char* Pattern::MinKey() const {
+    return key_.Create(Type(), LOWER);
+}
+
+/**
+ * Return the upper key for querying the index (range [MinKey,MaxKey) ).
+ */
+char* Pattern::MaxKey() const {
+    return key_.Create(Type(), UPPER);
+}
+
+
+// Return true if the statement matches the pattern. Wildcards (empty fields)
+// in the pattern are ignored.
+bool Matches(const Statement& pattern, const Statement& stmt) {
+    // equality operators defined in rdf_model.h
+    if (pattern.has_context() && stmt.context() != pattern.context()) {
+        return false;
+    }
+    if (pattern.has_subject() && stmt.subject() != pattern.subject()) {
+        return false;
+    }
+    if (pattern.has_predicate() && stmt.predicate() != pattern.predicate()) {
+        return false;
+    }
+    return !(pattern.has_object() && stmt.object() != pattern.object());
+}
+
+}  // namespace persistence
+}  // namespace marmotta
+

http://git-wip-us.apache.org/repos/asf/marmotta/blob/b8d122a1/libraries/ostrich/backend/persistence/base_persistence.h
----------------------------------------------------------------------
diff --git a/libraries/ostrich/backend/persistence/base_persistence.h b/libraries/ostrich/backend/persistence/base_persistence.h
new file mode 100644
index 0000000..89a5822
--- /dev/null
+++ b/libraries/ostrich/backend/persistence/base_persistence.h
@@ -0,0 +1,217 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef MARMOTTA_BASE_PERSISTENCE_H
+#define MARMOTTA_BASE_PERSISTENCE_H
+
+#include <string>
+
+#include "model/rdf_model.h"
+#include "service/sail.pb.h"
+#include "util/iterator.h"
+
+namespace marmotta {
+namespace persistence {
+
+constexpr int kKeyLength = 16;
+
+enum IndexTypes {
+    SPOC, CSPO, OPSC, PCOS
+};
+
+enum BoundTypes {
+    LOWER, UPPER
+};
+
+class Key {
+ public:
+    // Create key for the given string quadruple using a Murmer3 hash for each
+    // component.
+    Key(const std::string* s, const std::string* p,
+        const std::string* o, const std::string* c);
+
+    // Create key for the given statement. Some fields may be unset.
+    Key(const rdf::proto::Statement& stmt);
+
+    // Create the key for the given index type. Returns a newly allocated char
+    // array that needs to be deleted by the caller using free().
+    char* Create(IndexTypes type, BoundTypes bound = LOWER) const;
+
+ private:
+    bool sEnabled, pEnabled, oEnabled, cEnabled;
+    char sHash[kKeyLength], pHash[kKeyLength], oHash[kKeyLength], cHash[kKeyLength];
+};
+
+
+/**
+ * A pattern for querying the index of a key-value store supporting range queries
+ * like LevelDB or RocksDB.
+ */
+class Pattern {
+ public:
+    Pattern(const rdf::proto::Statement& pattern);
+
+    /**
+     * Return the lower key for querying the index (range [MinKey,MaxKey) ).
+     */
+    char* MinKey() const;
+
+    /**
+     * Return the upper key for querying the index (range [MinKey,MaxKey) ).
+     */
+    char* MaxKey() const;
+
+    IndexTypes Type() const {
+        return type_;
+    }
+
+    Pattern& Type(IndexTypes t) {
+        type_ = t;
+        return *this;
+    }
+
+    // Returns true in case this query pattern cannot be answered by the index alone.
+    bool NeedsFilter() const {
+        return needsFilter_;
+    }
+
+ private:
+    Key key_;
+    IndexTypes type_;
+    bool needsFilter_;
+};
+
+class Persistence {
+ public:
+    typedef util::CloseableIterator<rdf::proto::Statement> StatementIterator;
+    typedef util::CloseableIterator<rdf::proto::Namespace> NamespaceIterator;
+    typedef util::CloseableIterator<service::proto::UpdateRequest> UpdateIterator;
+
+    typedef std::function<bool(const rdf::proto::Statement&)> StatementHandler;
+    typedef std::function<bool(const rdf::proto::Namespace&)> NamespaceHandler;
+
+
+    /**
+     * Add the namespaces in the iterator to the database.
+     */
+    virtual service::proto::UpdateResponse AddNamespaces(
+            NamespaceIterator& it) = 0;
+
+    /**
+     * Add the statements in the iterator to the database.
+     */
+    virtual service::proto::UpdateResponse AddStatements(
+            StatementIterator& it) = 0;
+
+    /**
+     * Get all statements matching the pattern (which may have some fields
+     * unset to indicate wildcards). Call the callback function for each
+     * result.
+     */
+    virtual void GetStatements(
+            const rdf::proto::Statement& pattern, StatementHandler callback) = 0;
+
+    /**
+     * Get all statements matching the pattern (which may have some fields
+     * unset to indicate wildcards). Call the callback function for each
+     * result.
+     */
+    virtual std::unique_ptr<StatementIterator> GetStatements(
+            const rdf::proto::Statement& pattern) = 0;
+
+    /**
+     * Get all namespaces matching the pattern (which may have some of all
+     * fields unset to indicate wildcards). Call the callback function for
+     * each result.
+     */
+    virtual void GetNamespaces(
+            const rdf::proto::Namespace &pattern, NamespaceHandler callback) = 0;
+
+    /**
+     * Get all namespaces matching the pattern (which may have some of all
+     * fields unset to indicate wildcards). Call the callback function for
+     * each result.
+     */
+    virtual std::unique_ptr<NamespaceIterator> GetNamespaces(
+            const rdf::proto::Namespace &pattern) = 0;
+
+    /**
+     * Remove all statements matching the pattern (which may have some fields
+     * unset to indicate wildcards).
+     */
+    virtual service::proto::UpdateResponse RemoveStatements(
+            const rdf::proto::Statement& pattern) = 0;
+
+    /**
+     * Apply a batch of updates (mixed statement/namespace adds and removes).
+     * The updates are collected in LevelDB batches and written atomically to
+     * the database when iteration ends.
+     */
+    virtual service::proto::UpdateResponse Update(
+            UpdateIterator& it) = 0;
+
+    /**
+     * Return the size of this database.
+     */
+    virtual int64_t Size() = 0;
+};
+
+
+// Base iterator for wrapping a LevelDB-style database iterators.
+template<typename T, typename Iterator>
+class DBIterator : public util::CloseableIterator<T> {
+ public:
+
+    DBIterator(Iterator *it)
+            : it(it) {
+        it->SeekToFirst();
+    }
+
+    virtual ~DBIterator() override {
+        delete it;
+    };
+
+    const T& next() override {
+        // Parse current position, then iterate to next position for next call.
+        proto.ParseFromString(it->value().ToString());
+        it->Next();
+        return proto;
+    };
+
+    const T& current() const override {
+        return proto;
+    };
+
+    virtual bool hasNext() override {
+        return it->Valid();
+    }
+
+ protected:
+    Iterator* it;
+    T proto;
+};
+
+
+// Return true if the statement matches the pattern. Wildcards (empty fields)
+// in the pattern are ignored.
+bool Matches(const rdf::proto::Statement& pattern,
+             const rdf::proto::Statement& stmt);
+
+}  // namespace persistence
+}  // namespace marmotta
+
+#endif //MARMOTTA_BASE_PERSISTENCE_H

http://git-wip-us.apache.org/repos/asf/marmotta/blob/b8d122a1/libraries/ostrich/backend/persistence/leveldb_persistence.cc
----------------------------------------------------------------------
diff --git a/libraries/ostrich/backend/persistence/leveldb_persistence.cc b/libraries/ostrich/backend/persistence/leveldb_persistence.cc
index efc1dc6..23e2df4 100644
--- a/libraries/ostrich/backend/persistence/leveldb_persistence.cc
+++ b/libraries/ostrich/backend/persistence/leveldb_persistence.cc
@@ -30,7 +30,6 @@
 
 #include "leveldb_persistence.h"
 #include "model/rdf_operators.h"
-#include "util/murmur3.h"
 #include "util/unique.h"
 
 #define CHECK_STATUS(s) CHECK(s.ok()) << "Writing to database failed: " << s.ToString()
@@ -38,8 +37,8 @@
 DEFINE_int64(write_batch_size, 1000000,
              "Maximum number of statements to write in a single batch to the database");
 
-using dbimpl::WriteBatch;
-using dbimpl::Slice;
+using leveldb::WriteBatch;
+using leveldb::Slice;
 using marmotta::rdf::proto::Statement;
 using marmotta::rdf::proto::Namespace;
 using marmotta::rdf::proto::Resource;
@@ -48,218 +47,26 @@ namespace marmotta {
 namespace persistence {
 namespace {
 
-
-// Creates an index key based on hashing values of the 4 messages in proper order.
-inline void computeKey(const std::string* a, const std::string* b, const std::string* c, const std::string* d, char* result) {
-    // 128bit keys, use murmur
-    int offset = 0;
-    for (auto m : {a, b, c, d}) {
-        if (m != nullptr) {
-#ifdef __x86_64__
-            MurmurHash3_x64_128(m->data(), m->size(), 13, &result[offset]);
-#else
-            MurmurHash3_x86_128(m->data(), m->size(), 13, &result[offset]);
-#endif
-        } else {
-            return;
-        }
-        offset += KEY_LENGTH;
-    }
-}
-
-enum Position {
-    S = 0, P = 1, O = 2, C = 3
-};
-
-// Reorder a hash key from the generated SPOC key without requiring to recompute murmur.
-inline void orderKey(char* dest, const char* src, Position a, Position b, Position c, Position d) {
-    int offset = 0;
-    for (int m : {a, b, c, d}) {
-        memcpy(&dest[offset], &src[m * KEY_LENGTH], KEY_LENGTH * sizeof(char));
-        offset += KEY_LENGTH;
-    }
-}
-
-/**
- * Helper class to define proper cache keys and identify the index to use based on
- * fields available in the pattern.
- */
-class PatternQuery {
- public:
-    enum IndexType {
-        SPOC, CSPO, OPSC, PCOS
-    };
-
-    PatternQuery(const Statement& pattern) : pattern(pattern), needsFilter(true) {
-        if (pattern.has_subject()) {
-            s.reset(new std::string());
-            pattern.subject().SerializeToString(s.get());
-        }
-        if (pattern.has_predicate()) {
-            p.reset(new std::string());
-            pattern.predicate().SerializeToString(p.get());
-        }
-        if (pattern.has_object()) {
-            o.reset(new std::string());
-            pattern.object().SerializeToString(o.get());
-        }
-        if (pattern.has_context()) {
-            c.reset(new std::string());
-            pattern.context().SerializeToString(c.get());
-        }
-
-        if (pattern.has_subject()) {
-            // Subject is usually most selective, so if it is present use the
-            // subject-based databases first.
-            if (pattern.has_context()) {
-                type_ = CSPO;
-            } else {
-                type_ = SPOC;
-            }
-
-            // Filter needed if there is no predicate but an object.
-            needsFilter = !(pattern.has_predicate()) && pattern.has_object();
-        } else if (pattern.has_object()) {
-            // Second-best option is object.
-            type_ = OPSC;
-
-            // Filter needed if there is a context (subject already checked, predicate irrelevant).
-            needsFilter = pattern.has_context();
-        } else if (pattern.has_predicate()) {
-            // Predicate is usually least selective.
-            type_ = PCOS;
-
-            // No filter needed, object and subject are not set.
-            needsFilter = false;
-        } else if (pattern.has_context()) {
-            type_ = CSPO;
-
-            // No filter needed, subject, predicate object are not set.
-            needsFilter = false;
-        } else {
-            // Fall back to SPOC.
-            type_ = SPOC;
-
-            // No filter needed, we just scan from the beginning.
-            needsFilter = false;
-        }
-    }
-
-    /**
-     * Return the lower key for querying the index (range [MinKey,MaxKey) ).
-     */
-    char* MinKey() const {
-        char* result = (char*)calloc(4 * KEY_LENGTH, sizeof(char));
-        compute(result);
-        return result;
-    }
-
-    /**
-     * Return the upper key for querying the index (range [MinKey,MaxKey) ).
-     */
-    char* MaxKey() const {
-        char* result = (char*)malloc(4 * KEY_LENGTH * sizeof(char));
-        for (int i=0; i < 4 * KEY_LENGTH; i++) {
-            result[i] = (char)0xFF;
-        }
-
-        compute(result);
-        return result;
-    }
-
-    IndexType Type() const {
-        return type_;
-    }
-
-    PatternQuery& Type(IndexType t) {
-        type_ = t;
-        return *this;
-    }
-
-    // Returns true in case this query pattern cannot be answered by the index alone.
-    bool NeedsFilter() const {
-        return needsFilter;
-    }
-
- private:
-    const Statement& pattern;
-    std::unique_ptr<std::string> s, p, o, c;
-
-    // Creates a cache key based on hashing values of the 4 messages in proper order.
-    void compute(char* result) const {
-        switch(Type()) {
-            case SPOC:
-                computeKey(s.get(), p.get(), o.get(), c.get(), result);
-                break;
-            case CSPO:
-                computeKey(c.get(), s.get(), p.get(), o.get(), result);
-                break;
-            case OPSC:
-                computeKey(o.get(), p.get(), s.get(), c.get(), result);
-                break;
-            case PCOS:
-                computeKey(p.get(), c.get(), o.get(), s.get(), result);
-                break;
-        }
-    }
-
-    IndexType type_;
-    bool needsFilter = true;
-};
-
-
-// Base tterator for wrapping a LevelDB iterators.
 template<typename T>
-class LevelDBIterator : public util::CloseableIterator<T> {
- public:
-
-    LevelDBIterator(dbimpl::Iterator *it)
-        : it(it) {
-        it->SeekToFirst();
-    }
-
-    virtual ~LevelDBIterator() override {
-        delete it;
-    };
-
-    const T& next() override {
-        // Parse current position, then iterate to next position for next call.
-        proto.ParseFromString(it->value().ToString());
-        it->Next();
-        return proto;
-    };
-
-    const T& current() const override {
-        return proto;
-    };
-
-    virtual bool hasNext() override {
-        return it->Valid();
-    }
-
- protected:
-    dbimpl::Iterator* it;
-    T proto;
-};
-
+using LevelDBIterator = DBIterator<T, leveldb::Iterator>;
 
 
 // Iterator wrapping a LevelDB Statement iterator over a given key range.
 class StatementRangeIterator : public LevelDBIterator<Statement> {
  public:
 
-    StatementRangeIterator(dbimpl::Iterator *it, char *loKey, char *hiKey)
-            : LevelDBIterator(it), loKey(loKey), hiKey(hiKey) {
-        it->Seek(dbimpl::Slice(loKey, 4 * KEY_LENGTH));
+    StatementRangeIterator(leveldb::Iterator *it, char *loKey, char *hiKey)
+            : DBIterator(it), loKey(loKey), hiKey(hiKey) {
+        it->Seek(leveldb::Slice(loKey, 4 * KEY_LENGTH));
     }
 
     ~StatementRangeIterator() override {
-        free(loKey);
-        free(hiKey);
+        delete[] loKey;
+        delete[] hiKey;
     };
 
     bool hasNext() override {
-        return it->Valid() && it->key().compare(dbimpl::Slice(hiKey, 4 * KEY_LENGTH)) <= 0;
+        return it->Valid() && it->key().compare(leveldb::Slice(hiKey, 4 * KEY_LENGTH)) <= 0;
     }
 
  private:
@@ -267,44 +74,23 @@ class StatementRangeIterator : public LevelDBIterator<Statement> {
     char *hiKey;
 };
 
-// Return true if the statement matches the pattern. Wildcards (empty fields)
-// in the pattern are ignored.
-bool Matches(const Statement& pattern, const Statement& stmt) {
-    // equality operators defined in rdf_model.h
-    if (pattern.has_context() && stmt.context() != pattern.context()) {
-        return false;
-    }
-    if (pattern.has_subject() && stmt.subject() != pattern.subject()) {
-        return false;
-    }
-    if (pattern.has_predicate() && stmt.predicate() != pattern.predicate()) {
-        return false;
-    }
-    return !(pattern.has_object() && stmt.object() != pattern.object());
-}
-
-
 }  // namespace
 
 
 /**
  * Build database with default options.
  */
-dbimpl::DB* buildDB(const std::string& path, const std::string& suffix, const dbimpl::Options& options) {
-    dbimpl::DB* db;
-    dbimpl::Status status = dbimpl::DB::Open(options, path + "/" + suffix + ".db", &db);
+leveldb::DB* buildDB(const std::string& path, const std::string& suffix, const leveldb::Options& options) {
+    leveldb::DB* db;
+    leveldb::Status status = leveldb::DB::Open(options, path + "/" + suffix + ".db", &db);
     CHECK_STATUS(status);
     return db;
 }
 
-dbimpl::Options* buildOptions(KeyComparator* cmp, dbimpl::Cache* cache) {
-    dbimpl::Options *options = new dbimpl::Options();
+leveldb::Options* buildOptions(KeyComparator* cmp, leveldb::Cache* cache) {
+    leveldb::Options *options = new leveldb::Options();
     options->create_if_missing = true;
 
-#ifdef HAVE_ROCKSDB
-    options->IncreaseParallelism();
-    options->OptimizeLevelStyleCompaction();
-#else
     // Custom comparator for our keys.
     options->comparator = cmp;
 
@@ -315,20 +101,20 @@ dbimpl::Options* buildOptions(KeyComparator* cmp, dbimpl::Cache* cache) {
     options->write_buffer_size = 16384 * 1024;
 
     // Set a bloom filter of 10 bits.
-    options->filter_policy = dbimpl::NewBloomFilterPolicy(10);
-#endif
+    options->filter_policy = leveldb::NewBloomFilterPolicy(10);
+
     return options;
 }
 
-dbimpl::Options buildNsOptions() {
-    dbimpl::Options options;
+leveldb::Options buildNsOptions() {
+    leveldb::Options options;
     options.create_if_missing = true;
     return options;
 }
 
 LevelDBPersistence::LevelDBPersistence(const std::string &path, int64_t cacheSize)
         : workers(8), comparator(new KeyComparator())
-        , cache(dbimpl::NewLRUCache(cacheSize))
+        , cache(leveldb::NewLRUCache(cacheSize))
         , options(buildOptions(comparator.get(), cache.get()))
         , db_ns_prefix(buildDB(path, "ns_prefix", buildNsOptions()))
         , db_ns_url(buildDB(path, "ns_url", buildNsOptions()))
@@ -350,6 +136,7 @@ LevelDBPersistence::LevelDBPersistence(const std::string &path, int64_t cacheSiz
     }));
 
 
+    LOG(INFO) << "Opening LevelDB database ...";
     for (auto& t : openers) {
         t.wait();
     }
@@ -363,22 +150,24 @@ LevelDBPersistence::LevelDBPersistence(const std::string &path, int64_t cacheSiz
 }
 
 
-int64_t LevelDBPersistence::AddNamespaces(NamespaceIterator& it) {
+service::proto::UpdateResponse LevelDBPersistence::AddNamespaces(NamespaceIterator& it) {
     DLOG(INFO) << "Starting batch namespace import operation.";
     int64_t count = 0;
 
-    dbimpl::WriteBatch batch_prefix, batch_url;
+    leveldb::WriteBatch batch_prefix, batch_url;
 
     while (it.hasNext()) {
         AddNamespace(it.next(), batch_prefix, batch_url);
         count++;
     }
-    CHECK_STATUS(db_ns_prefix->Write(dbimpl::WriteOptions(), &batch_prefix));
-    CHECK_STATUS(db_ns_url->Write(dbimpl::WriteOptions(), &batch_url));
+    CHECK_STATUS(db_ns_prefix->Write(leveldb::WriteOptions(), &batch_prefix));
+    CHECK_STATUS(db_ns_url->Write(leveldb::WriteOptions(), &batch_url));
 
     DLOG(INFO) << "Imported " << count << " namespaces";
 
-    return count;
+    service::proto::UpdateResponse result;
+    result.set_added_namespaces(count);
+    return result;
 }
 
 std::unique_ptr<LevelDBPersistence::NamespaceIterator> LevelDBPersistence::GetNamespaces(
@@ -387,7 +176,7 @@ std::unique_ptr<LevelDBPersistence::NamespaceIterator> LevelDBPersistence::GetNa
 
     Namespace ns;
 
-    dbimpl::DB *db = nullptr;
+    leveldb::DB *db = nullptr;
     std::string key, value;
     if (pattern.prefix() != "") {
         key = pattern.prefix();
@@ -398,7 +187,7 @@ std::unique_ptr<LevelDBPersistence::NamespaceIterator> LevelDBPersistence::GetNa
     }
     if (db != nullptr) {
         // Either prefix or uri given, report the correct namespace value.
-        dbimpl::Status s = db->Get(dbimpl::ReadOptions(), key, &value);
+        leveldb::Status s = db->Get(leveldb::ReadOptions(), key, &value);
         if (s.ok()) {
             ns.ParseFromString(value);
             return util::make_unique<util::SingletonIterator<Namespace>>(std::move(ns));
@@ -408,7 +197,7 @@ std::unique_ptr<LevelDBPersistence::NamespaceIterator> LevelDBPersistence::GetNa
     } else {
         // Pattern was empty, iterate over all namespaces and report them.
         return util::make_unique<LevelDBIterator<Namespace>>(
-                db_ns_prefix->NewIterator(dbimpl::ReadOptions()));
+                db_ns_prefix->NewIterator(leveldb::ReadOptions()));
     }
 }
 
@@ -427,28 +216,28 @@ void LevelDBPersistence::GetNamespaces(
 }
 
 
-int64_t LevelDBPersistence::AddStatements(StatementIterator& it) {
+service::proto::UpdateResponse LevelDBPersistence::AddStatements(StatementIterator& it) {
     auto start = std::chrono::steady_clock::now();
     LOG(INFO) << "Starting batch statement import operation.";
     int64_t count = 0;
 
-    dbimpl::WriteBatch batch_spoc, batch_cspo, batch_opsc, batch_pcos;
+    leveldb::WriteBatch batch_spoc, batch_cspo, batch_opsc, batch_pcos;
     auto writeBatches = [&]{
         std::vector<std::future<void>> writers;
         writers.push_back(workers.push([&](int id) {
-            CHECK_STATUS(db_pcos->Write(dbimpl::WriteOptions(), &batch_pcos));
+            CHECK_STATUS(db_pcos->Write(leveldb::WriteOptions(), &batch_pcos));
             batch_pcos.Clear();
         }));
         writers.push_back(workers.push([&](int id) {
-            CHECK_STATUS(db_opsc->Write(dbimpl::WriteOptions(), &batch_opsc));
+            CHECK_STATUS(db_opsc->Write(leveldb::WriteOptions(), &batch_opsc));
             batch_opsc.Clear();
         }));
         writers.push_back(workers.push([&](int id) {
-            CHECK_STATUS(db_cspo->Write(dbimpl::WriteOptions(), &batch_cspo));
+            CHECK_STATUS(db_cspo->Write(leveldb::WriteOptions(), &batch_cspo));
             batch_cspo.Clear();
         }));
         writers.push_back(workers.push([&](int id) {
-            CHECK_STATUS(db_spoc->Write(dbimpl::WriteOptions(), &batch_spoc));
+            CHECK_STATUS(db_spoc->Write(leveldb::WriteOptions(), &batch_spoc));
             batch_spoc.Clear();
         }));
 
@@ -473,7 +262,9 @@ int64_t LevelDBPersistence::AddStatements(StatementIterator& it) {
                    std::chrono::steady_clock::now() - start).count()
               << "ms).";
 
-    return count;
+    service::proto::UpdateResponse result;
+    result.set_added_statements(count);
+    return result;
 }
 
 
@@ -481,23 +272,23 @@ std::unique_ptr<LevelDBPersistence::StatementIterator> LevelDBPersistence::GetSt
         const rdf::proto::Statement &pattern) {
     DLOG(INFO) << "Get statements matching pattern " << pattern.DebugString();
 
-    PatternQuery query(pattern);
+    Pattern query(pattern);
 
-    dbimpl::DB* db;
+    leveldb::DB* db;
     switch (query.Type()) {
-        case PatternQuery::SPOC:
+        case IndexTypes::SPOC:
             db = db_spoc.get();
             DLOG(INFO) << "Query: Using index type SPOC";
             break;
-        case PatternQuery::CSPO:
+        case IndexTypes::CSPO:
             db = db_cspo.get();
             DLOG(INFO) << "Query: Using index type CSPO";
             break;
-        case PatternQuery::OPSC:
+        case IndexTypes::OPSC:
             db = db_opsc.get();
             DLOG(INFO) << "Query: Using index type OPSC";
             break;
-        case PatternQuery::PCOS:
+        case IndexTypes::PCOS:
             db = db_pcos.get();
             DLOG(INFO) << "Query: Using index type PCOS";
             break;
@@ -507,12 +298,12 @@ std::unique_ptr<LevelDBPersistence::StatementIterator> LevelDBPersistence::GetSt
         DLOG(INFO) << "Retrieving statements with filter.";
         return util::make_unique<util::FilteringIterator<Statement>>(
                 new StatementRangeIterator(
-                        db->NewIterator(dbimpl::ReadOptions()), query.MinKey(), query.MaxKey()),
+                        db->NewIterator(leveldb::ReadOptions()), query.MinKey(), query.MaxKey()),
                 [&pattern](const Statement& stmt) -> bool { return Matches(pattern, stmt); });
     } else {
         DLOG(INFO) << "Retrieving statements without filter.";
         return util::make_unique<StatementRangeIterator>(
-                db->NewIterator(dbimpl::ReadOptions()), query.MinKey(), query.MaxKey());
+                db->NewIterator(leveldb::ReadOptions()), query.MinKey(), query.MaxKey());
     }
 }
 
@@ -535,29 +326,28 @@ void LevelDBPersistence::GetStatements(
 }
 
 
-int64_t LevelDBPersistence::RemoveStatements(const rdf::proto::Statement& pattern) {
+service::proto::UpdateResponse LevelDBPersistence::RemoveStatements(
+        const rdf::proto::Statement& pattern) {
     auto start = std::chrono::steady_clock::now();
     DLOG(INFO) << "Remove statements matching pattern " << pattern.DebugString();
 
-    int64_t count = 0;
+    leveldb::WriteBatch batch_spoc, batch_cspo, batch_opsc, batch_pcos;
 
-    Statement stmt;
-    dbimpl::WriteBatch batch_spoc, batch_cspo, batch_opsc, batch_pcos;
-
-    count = RemoveStatements(pattern, batch_spoc, batch_cspo, batch_opsc, batch_pcos);
+    int64_t count =
+            RemoveStatements(pattern, batch_spoc, batch_cspo, batch_opsc, batch_pcos);
 
     std::vector<std::future<void>> writers;
     writers.push_back(workers.push([&](int id) {
-        CHECK_STATUS(db_pcos->Write(dbimpl::WriteOptions(), &batch_pcos));
+        CHECK_STATUS(db_pcos->Write(leveldb::WriteOptions(), &batch_pcos));
     }));
     writers.push_back(workers.push([&](int id) {
-        CHECK_STATUS(db_opsc->Write(dbimpl::WriteOptions(), &batch_opsc));
+        CHECK_STATUS(db_opsc->Write(leveldb::WriteOptions(), &batch_opsc));
     }));
     writers.push_back(workers.push([&](int id) {
-        CHECK_STATUS(db_cspo->Write(dbimpl::WriteOptions(), &batch_cspo));
+        CHECK_STATUS(db_cspo->Write(leveldb::WriteOptions(), &batch_cspo));
     }));
     writers.push_back(workers.push([&](int id) {
-        CHECK_STATUS(db_spoc->Write(dbimpl::WriteOptions(), &batch_spoc));
+        CHECK_STATUS(db_spoc->Write(leveldb::WriteOptions(), &batch_spoc));
     }));
 
     for (auto& t : writers) {
@@ -569,39 +359,41 @@ int64_t LevelDBPersistence::RemoveStatements(const rdf::proto::Statement& patter
                        std::chrono::steady_clock::now() - start).count()
                << "ms).";
 
-    return count;
+    service::proto::UpdateResponse result;
+    result.set_removed_statements(count);
+    return result;
 }
 
-UpdateStatistics LevelDBPersistence::Update(LevelDBPersistence::UpdateIterator &it) {
+service::proto::UpdateResponse LevelDBPersistence::Update(LevelDBPersistence::UpdateIterator &it) {
     auto start = std::chrono::steady_clock::now();
     LOG(INFO) << "Starting batch update operation.";
-    UpdateStatistics stats;
+    int64_t added_stmts = 0, removed_stmts = 0, added_ns = 0, removed_ns = 0;
 
     WriteBatch b_spoc, b_cspo, b_opsc, b_pcos, b_prefix, b_url;
     auto writeBatches = [&]{
         std::vector<std::future<void>> writers;
         writers.push_back(workers.push([&](int id) {
-            CHECK_STATUS(db_pcos->Write(dbimpl::WriteOptions(), &b_pcos));
+            CHECK_STATUS(db_pcos->Write(leveldb::WriteOptions(), &b_pcos));
             b_pcos.Clear();
         }));
         writers.push_back(workers.push([&](int id) {
-            CHECK_STATUS(db_opsc->Write(dbimpl::WriteOptions(), &b_opsc));
+            CHECK_STATUS(db_opsc->Write(leveldb::WriteOptions(), &b_opsc));
             b_opsc.Clear();
         }));
         writers.push_back(workers.push([&](int id) {
-            CHECK_STATUS(db_cspo->Write(dbimpl::WriteOptions(), &b_cspo));
+            CHECK_STATUS(db_cspo->Write(leveldb::WriteOptions(), &b_cspo));
             b_cspo.Clear();
         }));
         writers.push_back(workers.push([&](int id) {
-            CHECK_STATUS(db_spoc->Write(dbimpl::WriteOptions(), &b_spoc));
+            CHECK_STATUS(db_spoc->Write(leveldb::WriteOptions(), &b_spoc));
             b_spoc.Clear();
         }));
         writers.push_back(workers.push([&](int id) {
-            CHECK_STATUS(db_ns_prefix->Write(dbimpl::WriteOptions(), &b_prefix));
+            CHECK_STATUS(db_ns_prefix->Write(leveldb::WriteOptions(), &b_prefix));
             b_prefix.Clear();
         }));
         writers.push_back(workers.push([&](int id) {
-            CHECK_STATUS(db_ns_url->Write(dbimpl::WriteOptions(), &b_url));
+            CHECK_STATUS(db_ns_url->Write(leveldb::WriteOptions(), &b_url));
             b_url.Clear();
         }));
 
@@ -615,15 +407,16 @@ UpdateStatistics LevelDBPersistence::Update(LevelDBPersistence::UpdateIterator &
         auto next = it.next();
         if (next.has_stmt_added()) {
             AddStatement(next.stmt_added(), b_spoc, b_cspo, b_opsc, b_pcos);
-            stats.added_stmts++;
+            added_stmts++;
         } else if (next.has_stmt_removed()) {
-            stats.removed_stmts +=
+            removed_stmts +=
                     RemoveStatements(next.stmt_removed(), b_spoc, b_cspo, b_opsc, b_pcos);
         } else if(next.has_ns_added()) {
             AddNamespace(next.ns_added(), b_prefix, b_url);
-            stats.added_ns++;
+            added_ns++;
         } else if(next.has_ns_removed()) {
             RemoveNamespace(next.ns_removed(), b_prefix, b_url);
+            removed_ns++;
         }
 
         count++;
@@ -634,13 +427,18 @@ UpdateStatistics LevelDBPersistence::Update(LevelDBPersistence::UpdateIterator &
 
     writeBatches();
 
-    LOG(INFO) << "Batch update complete. (statements added: " << stats.added_stmts
-            << ", statements removed: " << stats.removed_stmts
-            << ", namespaces added: " << stats.added_ns
-            << ", namespaces removed: " << stats.removed_ns
+    LOG(INFO) << "Batch update complete. (statements added: " << added_stmts
+            << ", statements removed: " << removed_stmts
+            << ", namespaces added: " << added_ns
+            << ", namespaces removed: " << removed_ns
             << ", time=" << std::chrono::duration <double, std::milli> (
                 std::chrono::steady_clock::now() - start).count() << "ms).";
 
+    service::proto::UpdateResponse stats;
+    stats.set_added_statements(added_stmts);
+    stats.set_removed_statements(removed_stmts);
+    stats.set_added_namespaces(added_ns);
+    stats.set_removed_namespaces(removed_ns);
     return stats;
 }
 
@@ -671,35 +469,27 @@ void LevelDBPersistence::AddStatement(
         WriteBatch &spoc, WriteBatch &cspo, WriteBatch &opsc, WriteBatch &pcos) {
     DLOG(INFO) << "Adding statement " << stmt.DebugString();
 
-    std::string buffer, bufs, bufp, bufo, bufc;
+    Key key(stmt);
 
+    std::string buffer;
     stmt.SerializeToString(&buffer);
 
-    stmt.subject().SerializeToString(&bufs);
-    stmt.predicate().SerializeToString(&bufp);
-    stmt.object().SerializeToString(&bufo);
-    stmt.context().SerializeToString(&bufc);
-
-    char *k_spoc = (char *) calloc(4 * KEY_LENGTH, sizeof(char));
-    computeKey(&bufs, &bufp, &bufo, &bufc, k_spoc);
-    spoc.Put(dbimpl::Slice(k_spoc, 4 * KEY_LENGTH), buffer);
+    char *k_spoc = key.Create(IndexTypes::SPOC);
+    spoc.Put(leveldb::Slice(k_spoc, 4 * KEY_LENGTH), buffer);
 
-    char *k_cspo = (char *) calloc(4 * KEY_LENGTH, sizeof(char));
-    orderKey(k_cspo, k_spoc, C, S, P, O);
-    cspo.Put(dbimpl::Slice(k_cspo, 4 * KEY_LENGTH), buffer);
+    char *k_cspo = key.Create(IndexTypes::CSPO);
+    cspo.Put(leveldb::Slice(k_cspo, 4 * KEY_LENGTH), buffer);
 
-    char *k_opsc = (char *) calloc(4 * KEY_LENGTH, sizeof(char));
-    orderKey(k_opsc, k_spoc, O, P, S, C);
-    opsc.Put(dbimpl::Slice(k_opsc, 4 * KEY_LENGTH), buffer);
+    char *k_opsc = key.Create(IndexTypes::OPSC);
+    opsc.Put(leveldb::Slice(k_opsc, 4 * KEY_LENGTH), buffer);
 
-    char *k_pcos = (char *) calloc(4 * KEY_LENGTH, sizeof(char));
-    orderKey(k_pcos, k_spoc, P, C, O, S);
-    pcos.Put(dbimpl::Slice(k_pcos, 4 * KEY_LENGTH), buffer);
+    char *k_pcos = key.Create(IndexTypes::PCOS);
+    pcos.Put(leveldb::Slice(k_pcos, 4 * KEY_LENGTH), buffer);
 
-    free(k_spoc);
-    free(k_cspo);
-    free(k_opsc);
-    free(k_pcos);
+    delete[] k_spoc;
+    delete[] k_cspo;
+    delete[] k_opsc;
+    delete[] k_pcos;
 }
 
 
@@ -712,31 +502,24 @@ int64_t LevelDBPersistence::RemoveStatements(
 
     std::string bufs, bufp, bufo, bufc;
     GetStatements(pattern, [&](const Statement stmt) -> bool {
-        stmt.subject().SerializeToString(&bufs);
-        stmt.predicate().SerializeToString(&bufp);
-        stmt.object().SerializeToString(&bufo);
-        stmt.context().SerializeToString(&bufc);
+        Key key(stmt);
 
-        char* k_spoc = (char*)calloc(4 * KEY_LENGTH, sizeof(char));
-        computeKey(&bufs, &bufp, &bufo, &bufc, k_spoc);
-        spoc.Delete(dbimpl::Slice(k_spoc, 4 * KEY_LENGTH));
+        char* k_spoc = key.Create(IndexTypes::SPOC);
+        spoc.Delete(leveldb::Slice(k_spoc, 4 * KEY_LENGTH));
 
-        char* k_cspo = (char*)calloc(4 * KEY_LENGTH, sizeof(char));
-        orderKey(k_cspo, k_spoc, C, S, P, O);
-        cspo.Delete(dbimpl::Slice(k_cspo, 4 * KEY_LENGTH));
+        char* k_cspo = key.Create(IndexTypes::CSPO);
+        cspo.Delete(leveldb::Slice(k_cspo, 4 * KEY_LENGTH));
 
-        char* k_opsc = (char*)calloc(4 * KEY_LENGTH, sizeof(char));
-        orderKey(k_opsc, k_spoc, O, P, S, C);
-        opsc.Delete(dbimpl::Slice(k_opsc, 4 * KEY_LENGTH));
+        char* k_opsc = key.Create(IndexTypes::OPSC);
+        opsc.Delete(leveldb::Slice(k_opsc, 4 * KEY_LENGTH));
 
-        char* k_pcos = (char*)calloc(4 * KEY_LENGTH, sizeof(char));
-        orderKey(k_pcos, k_spoc, P, C, O, S);
-        pcos.Delete(dbimpl::Slice(k_pcos, 4 * KEY_LENGTH));
+        char* k_pcos = key.Create(IndexTypes::PCOS);
+        pcos.Delete(leveldb::Slice(k_pcos, 4 * KEY_LENGTH));
 
-        free(k_spoc);
-        free(k_cspo);
-        free(k_opsc);
-        free(k_pcos);
+        delete[] k_spoc;
+        delete[] k_cspo;
+        delete[] k_opsc;
+        delete[] k_pcos;
 
         count++;
 
@@ -746,14 +529,14 @@ int64_t LevelDBPersistence::RemoveStatements(
     return count;
 }
 
-int KeyComparator::Compare(const dbimpl::Slice& a, const dbimpl::Slice& b) const {
+int KeyComparator::Compare(const leveldb::Slice& a, const leveldb::Slice& b) const {
     return memcmp(a.data(), b.data(), 4 * KEY_LENGTH);
 }
 
 
 int64_t LevelDBPersistence::Size() {
     int64_t count = 0;
-    dbimpl::Iterator* it = db_cspo->NewIterator(dbimpl::ReadOptions());
+    leveldb::Iterator* it = db_cspo->NewIterator(leveldb::ReadOptions());
     for (it->SeekToFirst(); it->Valid(); it->Next()) {
         count++;
     }

http://git-wip-us.apache.org/repos/asf/marmotta/blob/b8d122a1/libraries/ostrich/backend/persistence/leveldb_persistence.h
----------------------------------------------------------------------
diff --git a/libraries/ostrich/backend/persistence/leveldb_persistence.h b/libraries/ostrich/backend/persistence/leveldb_persistence.h
index eee80e4..fe72e9f 100644
--- a/libraries/ostrich/backend/persistence/leveldb_persistence.h
+++ b/libraries/ostrich/backend/persistence/leveldb_persistence.h
@@ -22,21 +22,12 @@
 #include <string>
 #include <functional>
 
-#ifdef HAVE_ROCKSDB
-#include <rocksdb/db.h>
-#include <rocksdb/cache.h>
-#include <rocksdb/comparator.h>
-
-namespace dbimpl = rocksdb;
-#else
 #include <leveldb/db.h>
 #include <leveldb/cache.h>
 #include <leveldb/comparator.h>
 
-namespace dbimpl = leveldb;
-#endif
-
 #include "model/rdf_model.h"
+#include "persistence/base_persistence.h"
 #include "service/sail.pb.h"
 #include "util/iterator.h"
 #include "util/threadpool.h"
@@ -47,51 +38,34 @@ namespace persistence {
 /**
  * A custom comparator treating the bytes in the key as unsigned char.
  */
-class KeyComparator : public dbimpl::Comparator {
+class KeyComparator : public leveldb::Comparator {
  public:
-    int Compare(const dbimpl::Slice& a, const dbimpl::Slice& b) const override ;
+    int Compare(const leveldb::Slice& a, const leveldb::Slice& b) const override ;
 
     const char* Name() const override { return "KeyComparator"; }
-    void FindShortestSeparator(std::string*, const dbimpl::Slice&) const override { }
+    void FindShortestSeparator(std::string*, const leveldb::Slice&) const override { }
     void FindShortSuccessor(std::string*) const override { }
 };
 
-
-// Statistical data about updates.
-struct UpdateStatistics {
-    UpdateStatistics()
-            : added_stmts(0), removed_stmts(0), added_ns(0), removed_ns(0) {}
-
-    int64_t added_stmts, removed_stmts, added_ns, removed_ns;
-};
-
 /**
  * Persistence implementation based on the LevelDB high performance database.
  */
-class LevelDBPersistence {
+class LevelDBPersistence : public Persistence {
  public:
-    typedef util::CloseableIterator<rdf::proto::Statement> StatementIterator;
-    typedef util::CloseableIterator<rdf::proto::Namespace> NamespaceIterator;
-    typedef util::CloseableIterator<service::proto::UpdateRequest> UpdateIterator;
-
-    typedef std::function<bool(const rdf::proto::Statement&)> StatementHandler;
-    typedef std::function<bool(const rdf::proto::Namespace&)> NamespaceHandler;
-
-
     /**
      * Initialise a new LevelDB database using the given path and cache size (bytes).
      */
     LevelDBPersistence(const std::string& path, int64_t cacheSize);
 
     /**
-     * Add the namespaces in the iterator to the database.
-     */
-    int64_t AddNamespaces(NamespaceIterator& it);
+      * Add the namespaces in the iterator to the database.
+      */
+    service::proto::UpdateResponse AddNamespaces(NamespaceIterator& it) override;
 
     /**
      * Add the statements in the iterator to the database.
      */
-    int64_t AddStatements(StatementIterator& it);
+    service::proto::UpdateResponse AddStatements(StatementIterator& it) override;
 
     /**
      * Get all statements matching the pattern (which may have some fields
@@ -107,7 +81,7 @@ class LevelDBPersistence {
      * result.
      */
     std::unique_ptr<StatementIterator>
-            GetStatements(const rdf::proto::Statement& pattern);
+    GetStatements(const rdf::proto::Statement& pattern);
 
     /**
      * Get all namespaces matching the pattern (which may have some of all
@@ -123,59 +97,60 @@ class LevelDBPersistence {
      * each result.
      */
     std::unique_ptr<NamespaceIterator>
-            GetNamespaces(const rdf::proto::Namespace &pattern);
+    GetNamespaces(const rdf::proto::Namespace &pattern);
 
     /**
      * Remove all statements matching the pattern (which may have some fields
      * unset to indicate wildcards).
      */
-    int64_t RemoveStatements(const rdf::proto::Statement& pattern);
+    service::proto::UpdateResponse RemoveStatements(
+            const rdf::proto::Statement& pattern) override;
 
     /**
      * Apply a batch of updates (mixed statement/namespace adds and removes).
      * The updates are collected in LevelDB batches and written atomically to
      * the database when iteration ends.
      */
-    UpdateStatistics Update(UpdateIterator& it);
+    service::proto::UpdateResponse Update(
+            UpdateIterator& it) override;
 
     /**
      * Return the size of this database.
      */
-    int64_t Size();
+    int64_t Size() override;
  private:
     ctpl::thread_pool workers;
 
     std::unique_ptr<KeyComparator> comparator;
-    std::shared_ptr<dbimpl::Cache> cache;
-    std::unique_ptr<dbimpl::Options> options;
+    std::shared_ptr<leveldb::Cache> cache;
+    std::unique_ptr<leveldb::Options> options;
 
     // We currently support efficient lookups by subject, context and object.
-    std::unique_ptr<dbimpl::DB>
+    std::unique_ptr<leveldb::DB>
             // Statement databases, indexed for query performance
             db_spoc, db_cspo, db_opsc, db_pcos,
             // Namespace databases
             db_ns_prefix, db_ns_url,
             // Triple store metadata.
             db_meta;
-
     /**
      * Add the namespace to the given database batch operations.
      */
     void AddNamespace(const rdf::proto::Namespace& ns,
-                      dbimpl::WriteBatch& ns_prefix, dbimpl::WriteBatch& ns_url);
+                      leveldb::WriteBatch& ns_prefix, leveldb::WriteBatch& ns_url);
 
     /**
      * Add the namespace to the given database batch operations.
      */
     void RemoveNamespace(const rdf::proto::Namespace& ns,
-                         dbimpl::WriteBatch& ns_prefix, dbimpl::WriteBatch& ns_url);
+                         leveldb::WriteBatch& ns_prefix, leveldb::WriteBatch& ns_url);
 
     /**
      * Add the statement to the given database batch operations.
      */
     void AddStatement(const rdf::proto::Statement& stmt,
-                      dbimpl::WriteBatch& spoc, dbimpl::WriteBatch& cspo,
-                      dbimpl::WriteBatch& opsc, dbimpl::WriteBatch&pcos);
+                      leveldb::WriteBatch& spoc, leveldb::WriteBatch& cspo,
+                      leveldb::WriteBatch& opsc, leveldb::WriteBatch&pcos);
 
 
     /**
@@ -183,8 +158,8 @@ class LevelDBPersistence {
      * unset to indicate wildcards) from the given database batch operations.
      */
     int64_t RemoveStatements(const rdf::proto::Statement& pattern,
-                             dbimpl::WriteBatch& spoc, dbimpl::WriteBatch& cspo,
-                             dbimpl::WriteBatch& opsc, dbimpl::WriteBatch&pcos);
+                             leveldb::WriteBatch& spoc, leveldb::WriteBatch& cspo,
+                             leveldb::WriteBatch& opsc, leveldb::WriteBatch&pcos);
 
 
 };

http://git-wip-us.apache.org/repos/asf/marmotta/blob/b8d122a1/libraries/ostrich/backend/persistence/leveldb_server.cc
----------------------------------------------------------------------
diff --git a/libraries/ostrich/backend/persistence/leveldb_server.cc b/libraries/ostrich/backend/persistence/leveldb_server.cc
index 737a38c..d1d9a95 100644
--- a/libraries/ostrich/backend/persistence/leveldb_server.cc
+++ b/libraries/ostrich/backend/persistence/leveldb_server.cc
@@ -22,6 +22,7 @@
 #include <sys/stat.h>
 #include <signal.h>
 
+#include "leveldb_persistence.h"
 #include "leveldb_service.h"
 
 using grpc::Status;

http://git-wip-us.apache.org/repos/asf/marmotta/blob/b8d122a1/libraries/ostrich/backend/persistence/leveldb_service.cc
----------------------------------------------------------------------
diff --git a/libraries/ostrich/backend/persistence/leveldb_service.cc b/libraries/ostrich/backend/persistence/leveldb_service.cc
index f2637bd..9f02607 100644
--- a/libraries/ostrich/backend/persistence/leveldb_service.cc
+++ b/libraries/ostrich/backend/persistence/leveldb_service.cc
@@ -38,6 +38,7 @@ using marmotta::rdf::proto::Statement;
 using marmotta::rdf::proto::Namespace;
 using marmotta::rdf::proto::Resource;
 using marmotta::service::proto::ContextRequest;
+using marmotta::service::proto::UpdateResponse;
 using marmotta::persistence::sparql::LevelDBTripleSource;
 using marmotta::sparql::SparqlService;
 using marmotta::sparql::TripleSource;
@@ -87,8 +88,8 @@ Status LevelDBService::AddNamespaces(
         ServerContext* context, ServerReader<Namespace>* reader, Int64Value* result) {
 
     auto it = NamespaceIterator(reader);
-    int64_t count = persistence->AddNamespaces(it);
-    result->set_value(count);
+    UpdateResponse stats = persistence->AddNamespaces(it);
+    result->set_value(stats.added_namespaces());
 
     return Status::OK;
 }
@@ -123,8 +124,8 @@ Status LevelDBService::AddStatements(
     util::TimeLogger timeLogger("Adding statements");
 
     auto it = StatementIterator(reader);
-    int64_t count = persistence->AddStatements(it);
-    result->set_value(count);
+    UpdateResponse stats = persistence->AddStatements(it);
+    result->set_value(stats.added_statements());
 
     return Status::OK;
 }
@@ -145,7 +146,7 @@ Status LevelDBService::RemoveStatements(
         ServerContext* context, const Statement* pattern, Int64Value* result) {
     util::TimeLogger timeLogger("Removing statements");
 
-    int64_t count = persistence->RemoveStatements(*pattern);
+    int64_t count = persistence->RemoveStatements(*pattern).removed_statements();
     result->set_value(count);
 
     return Status::OK;
@@ -161,10 +162,10 @@ Status LevelDBService::Clear(
     if (contexts->context_size() > 0) {
         for (const Resource &r : contexts->context()) {
             pattern.mutable_context()->CopyFrom(r);
-            count += persistence->RemoveStatements(pattern);
+            count += persistence->RemoveStatements(pattern).removed_statements();
         }
     } else {
-        count += persistence->RemoveStatements(pattern);
+        count += persistence->RemoveStatements(pattern).removed_statements();
     }
     result->set_value(count);
 
@@ -224,12 +225,7 @@ grpc::Status LevelDBService::Update(grpc::ServerContext *context,
     util::TimeLogger timeLogger("Updating database");
 
     auto it = UpdateIterator(reader);
-    persistence::UpdateStatistics stats = persistence->Update(it);
-
-    result->set_added_namespaces(stats.added_ns);
-    result->set_removed_namespaces(stats.removed_ns);
-    result->set_added_statements(stats.added_stmts);
-    result->set_removed_statements(stats.removed_stmts);
+    *result = persistence->Update(it);
 
     return Status::OK;
 }

http://git-wip-us.apache.org/repos/asf/marmotta/blob/b8d122a1/libraries/ostrich/backend/persistence/leveldb_service.h
----------------------------------------------------------------------
diff --git a/libraries/ostrich/backend/persistence/leveldb_service.h b/libraries/ostrich/backend/persistence/leveldb_service.h
index 87b5b12..dc139e8 100644
--- a/libraries/ostrich/backend/persistence/leveldb_service.h
+++ b/libraries/ostrich/backend/persistence/leveldb_service.h
@@ -18,7 +18,7 @@
 #ifndef MARMOTTA_SERVICE_H
 #define MARMOTTA_SERVICE_H
 
-#include "leveldb_persistence.h"
+#include "base_persistence.h"
 
 #include <grpc/grpc.h>
 #include <grpc++/server.h>
@@ -49,7 +49,7 @@ class LevelDBService : public svc::SailService::Service {
      * Construct a new SailService wrapper around the LevelDB persistence passed
      * as argument. The service will not take ownership of the pointer.
      */
-    LevelDBService(persistence::LevelDBPersistence* persistance) : persistence(persistance) { };
+    LevelDBService(persistence::Persistence* persistance) : persistence(persistance) { };
 
     grpc::Status AddNamespaces(grpc::ServerContext* context,
                                grpc::ServerReader<rdf::proto::Namespace>* reader,
@@ -92,7 +92,7 @@ class LevelDBService : public svc::SailService::Service {
                       google::protobuf::Int64Value* result) override;
 
  private:
-    persistence::LevelDBPersistence* persistence;
+    persistence::Persistence* persistence;
 };
 
 
@@ -105,7 +105,7 @@ class LevelDBSparqlService : public spq::SparqlService::Service {
      * Construct a new SparqlService wrapper around the LevelDB persistence passed
      * as argument. The service will not take ownership of the pointer.
      */
-    LevelDBSparqlService(persistence::LevelDBPersistence* persistence) : persistence(persistence) { };
+    LevelDBSparqlService(persistence::Persistence* persistence) : persistence(persistence) { };
 
     grpc::Status TupleQuery(grpc::ServerContext* context,
                             const spq::SparqlRequest* pattern,
@@ -119,7 +119,7 @@ class LevelDBSparqlService : public spq::SparqlService::Service {
                           const spq::SparqlRequest* pattern,
                           google::protobuf::BoolValue* result) override;
  private:
-    persistence::LevelDBPersistence* persistence;
+    persistence::Persistence* persistence;
 };
 
 }

http://git-wip-us.apache.org/repos/asf/marmotta/blob/b8d122a1/libraries/ostrich/backend/persistence/leveldb_sparql.h
----------------------------------------------------------------------
diff --git a/libraries/ostrich/backend/persistence/leveldb_sparql.h b/libraries/ostrich/backend/persistence/leveldb_sparql.h
index 7b103a4..0b2f3bc 100644
--- a/libraries/ostrich/backend/persistence/leveldb_sparql.h
+++ b/libraries/ostrich/backend/persistence/leveldb_sparql.h
@@ -19,7 +19,7 @@
 #define MARMOTTA_SPARQL_H
 
 #include "sparql/rasqal_adapter.h"
-#include "leveldb_persistence.h"
+#include "base_persistence.h"
 
 namespace marmotta {
 namespace persistence {
@@ -33,7 +33,7 @@ using std::experimental::optional;
 class LevelDBTripleSource : public ::marmotta::sparql::TripleSource {
  public:
 
-    LevelDBTripleSource(LevelDBPersistence *persistence) : persistence(persistence) { }
+    LevelDBTripleSource(Persistence *persistence) : persistence(persistence) { }
 
 
     bool HasStatement(
@@ -47,7 +47,7 @@ class LevelDBTripleSource : public ::marmotta::sparql::TripleSource {
 
  private:
     // A pointer to the persistence instance wrapped by this triple source.
-    LevelDBPersistence* persistence;
+    Persistence* persistence;
 };
 
 

http://git-wip-us.apache.org/repos/asf/marmotta/blob/b8d122a1/libraries/ostrich/backend/persistence/rocksdb_persistence.cc
----------------------------------------------------------------------
diff --git a/libraries/ostrich/backend/persistence/rocksdb_persistence.cc b/libraries/ostrich/backend/persistence/rocksdb_persistence.cc
new file mode 100644
index 0000000..4b5879d
--- /dev/null
+++ b/libraries/ostrich/backend/persistence/rocksdb_persistence.cc
@@ -0,0 +1,489 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#define KEY_LENGTH 16
+
+#include <chrono>
+#include <stdlib.h>
+#include <malloc.h>
+
+#include <gflags/gflags.h>
+#include <glog/logging.h>
+#include <rocksdb/filter_policy.h>
+#include <rocksdb/write_batch.h>
+#include <google/protobuf/wrappers.pb.h>
+#include <thread>
+#include <algorithm>
+
+#include "rocksdb_persistence.h"
+#include "model/rdf_operators.h"
+#include "util/murmur3.h"
+#include "util/unique.h"
+
+#define CHECK_STATUS(s) CHECK(s.ok()) << "Writing to database failed: " << s.ToString()
+
+DEFINE_int64(write_batch_size, 1000000,
+             "Maximum number of statements to write in a single batch to the database");
+
+
+constexpr char kSPOC[] = "spoc";
+constexpr char kCSPO[] = "cspo";
+constexpr char kOPSC[] = "opsc";
+constexpr char kPCOS[] = "pcos";
+constexpr char kNSPREFIX[] = "nsprefix";
+constexpr char kNSURI[] = "nsuri";
+constexpr char kMETA[] = "meta";
+
+using rocksdb::ColumnFamilyDescriptor;
+using rocksdb::ColumnFamilyHandle;
+using rocksdb::ColumnFamilyOptions;
+using rocksdb::WriteBatch;
+using rocksdb::Slice;
+using marmotta::rdf::proto::Statement;
+using marmotta::rdf::proto::Namespace;
+using marmotta::rdf::proto::Resource;
+
+namespace marmotta {
+namespace persistence {
+namespace {
+
+// Base iterator for wrapping a RocksDB iterators.
+template<typename T>
+using RocksDBIterator = DBIterator<T, rocksdb::Iterator>;
+
+// Iterator wrapping a RocksDB Statement iterator over a given key range.
+class StatementRangeIterator : public RocksDBIterator<Statement> {
+ public:
+    StatementRangeIterator(rocksdb::Iterator *it, char *loKey, char *hiKey)
+            : DBIterator(it), loKey(loKey), hiKey(hiKey) {
+        it->Seek(rocksdb::Slice(loKey, 4 * KEY_LENGTH));
+    }
+
+    ~StatementRangeIterator() override {
+        delete[] loKey;
+        delete[] hiKey;
+    };
+
+    bool hasNext() override {
+        return it->Valid() && it->key().compare(rocksdb::Slice(hiKey, 4 * KEY_LENGTH)) <= 0;
+    }
+
+ private:
+    char *loKey;
+    char *hiKey;
+};
+
+
+}  // namespace
+
+
+/**
+ * Build database with default options.
+ */
+rocksdb::DB* buildDB(const std::string& path, const std::string& suffix, const rocksdb::Options& options) {
+    rocksdb::DB* db;
+    rocksdb::Status status = rocksdb::DB::Open(options, path + "/" + suffix + ".db", &db);
+    CHECK_STATUS(status);
+    return db;
+}
+
+rocksdb::Options* buildOptions(KeyComparator* cmp) {
+    rocksdb::Options *options = new rocksdb::Options();
+    options->create_if_missing = true;
+    options->create_missing_column_families = true;
+
+    options->IncreaseParallelism();
+    options->OptimizeLevelStyleCompaction();
+
+    // Custom comparator for our keys.
+    options->comparator = cmp;
+
+    // Write buffer size 16MB (fast bulk imports)
+    options->write_buffer_size = 16384 * 1024;
+
+    return options;
+}
+
+RocksDBPersistence::RocksDBPersistence(const std::string &path, int64_t cacheSize)
+        : workers_(8) {
+    rocksdb::Options options;
+    options.create_if_missing = true;
+    options.create_missing_column_families = true;
+
+    options.IncreaseParallelism();
+    options.OptimizeLevelStyleCompaction();
+
+    // Custom comparator for our keys.
+    options.comparator = &comparator_;
+
+    // Write buffer size 16MB (fast bulk imports)
+    options.write_buffer_size = 16384 * 1024;
+
+    ColumnFamilyOptions cfOptions;
+    cfOptions.OptimizeLevelStyleCompaction();
+
+    // Initialise column families.
+    std::vector<ColumnFamilyDescriptor> columnFamilies = {
+            ColumnFamilyDescriptor(kSPOC, cfOptions),
+            ColumnFamilyDescriptor(kCSPO, cfOptions),
+            ColumnFamilyDescriptor(kOPSC, cfOptions),
+            ColumnFamilyDescriptor(kPCOS, cfOptions),
+            ColumnFamilyDescriptor(kNSPREFIX, cfOptions),
+            ColumnFamilyDescriptor(kNSURI, cfOptions),
+            ColumnFamilyDescriptor(kMETA, cfOptions),
+            ColumnFamilyDescriptor("default", cfOptions),
+    };
+
+    rocksdb::DB* db;
+    rocksdb::Status status = rocksdb::DB::Open(options, path + "/data.db", columnFamilies, &handles_, &db);
+    CHECK_STATUS(status);
+    database_.reset(db);
+
+    LOG(INFO) << "RocksDB Database initialised.";
+}
+
+RocksDBPersistence::~RocksDBPersistence() {
+    std::for_each(handles_.begin(), handles_.end(), [](ColumnFamilyHandle* h) {
+        delete h;
+    });
+}
+
+
+service::proto::UpdateResponse  RocksDBPersistence::AddNamespaces(NamespaceIterator& it) {
+    DLOG(INFO) << "Starting batch namespace import operation.";
+    int64_t count = 0;
+
+    rocksdb::WriteBatch batch;
+
+    while (it.hasNext()) {
+        AddNamespace(it.next(), batch);
+        count++;
+    }
+    CHECK_STATUS(database_->Write(rocksdb::WriteOptions(), &batch));
+
+    DLOG(INFO) << "Imported " << count << " namespaces";
+
+    service::proto::UpdateResponse result;
+    result.set_added_namespaces(count);
+    return result;
+}
+
+std::unique_ptr<RocksDBPersistence::NamespaceIterator> RocksDBPersistence::GetNamespaces(
+        const rdf::proto::Namespace &pattern) {
+    DLOG(INFO) << "Get namespaces matching pattern " << pattern.DebugString();
+
+    Namespace ns;
+
+    ColumnFamilyHandle *h = nullptr;
+    std::string key, value;
+    if (pattern.prefix() != "") {
+        key = pattern.prefix();
+        h = handles_[Handles::NSPREFIX];
+    } else if(pattern.uri() != "") {
+        key = pattern.uri();
+        h = handles_[Handles::NSURI];
+    }
+    if (h != nullptr) {
+        // Either prefix or uri given, report the correct namespace value.
+        rocksdb::Status s = database_->Get(rocksdb::ReadOptions(), h, key, &value);
+        if (s.ok()) {
+            ns.ParseFromString(value);
+            return util::make_unique<util::SingletonIterator<Namespace>>(std::move(ns));
+        } else {
+            return util::make_unique<util::EmptyIterator<Namespace>>();
+        }
+    } else {
+        // Pattern was empty, iterate over all namespaces and report them.
+        return util::make_unique<RocksDBIterator<Namespace>>(
+                database_->NewIterator(rocksdb::ReadOptions(), handles_[Handles::NSPREFIX]));
+    }
+}
+
+
+void RocksDBPersistence::GetNamespaces(
+        const Namespace &pattern, RocksDBPersistence::NamespaceHandler callback) {
+    int64_t count = 0;
+
+    bool cbsuccess = true;
+    for(auto it = GetNamespaces(pattern); cbsuccess && it->hasNext();) {
+        cbsuccess = callback(it->next());
+        count++;
+    }
+
+    DLOG(INFO) << "Get namespaces done (count=" << count <<")";
+}
+
+
+service::proto::UpdateResponse RocksDBPersistence::AddStatements(StatementIterator& it) {
+    auto start = std::chrono::steady_clock::now();
+    LOG(INFO) << "Starting batch statement import operation.";
+    int64_t count = 0;
+
+    rocksdb::WriteBatch batch;
+    while (it.hasNext()) {
+        AddStatement(it.next(), batch);
+        count++;
+
+        if (count % FLAGS_write_batch_size == 0) {
+            CHECK_STATUS(database_->Write(rocksdb::WriteOptions(), &batch));
+            batch.Clear();
+        }
+    }
+
+    CHECK_STATUS(database_->Write(rocksdb::WriteOptions(), &batch));
+
+    LOG(INFO) << "Imported " << count << " statements (time="
+              << std::chrono::duration <double, std::milli> (
+                   std::chrono::steady_clock::now() - start).count()
+              << "ms).";
+
+    service::proto::UpdateResponse result;
+    result.set_added_statements(count);
+    return result;
+}
+
+
+std::unique_ptr<RocksDBPersistence::StatementIterator> RocksDBPersistence::GetStatements(
+        const rdf::proto::Statement &pattern) {
+    DLOG(INFO) << "Get statements matching pattern " << pattern.DebugString();
+
+    Pattern query(pattern);
+
+    ColumnFamilyHandle* h;
+    switch (query.Type()) {
+        case IndexTypes::SPOC:
+            h = handles_[Handles::ISPOC];
+            DLOG(INFO) << "Query: Using index type SPOC";
+            break;
+        case IndexTypes::CSPO:
+            h = handles_[Handles::ICSPO];
+            DLOG(INFO) << "Query: Using index type CSPO";
+            break;
+        case IndexTypes::OPSC:
+            h = handles_[Handles::IOPSC];
+            DLOG(INFO) << "Query: Using index type OPSC";
+            break;
+        case IndexTypes::PCOS:
+            h = handles_[Handles::IPCOS];
+            DLOG(INFO) << "Query: Using index type PCOS";
+            break;
+    };
+
+    if (query.NeedsFilter()) {
+        DLOG(INFO) << "Retrieving statements with filter.";
+        return util::make_unique<util::FilteringIterator<Statement>>(
+                new StatementRangeIterator(
+                        database_->NewIterator(rocksdb::ReadOptions(), h), query.MinKey(), query.MaxKey()),
+                [&pattern](const Statement& stmt) -> bool { return Matches(pattern, stmt); });
+    } else {
+        DLOG(INFO) << "Retrieving statements without filter.";
+        return util::make_unique<StatementRangeIterator>(
+                database_->NewIterator(rocksdb::ReadOptions(), h), query.MinKey(), query.MaxKey());
+    }
+}
+
+
+void RocksDBPersistence::GetStatements(
+        const Statement& pattern, std::function<bool(const Statement&)> callback) {
+    auto start = std::chrono::steady_clock::now();
+    int64_t count = 0;
+
+    bool cbsuccess = true;
+    for(auto it = GetStatements(pattern); cbsuccess && it->hasNext(); ) {
+        cbsuccess = callback(it->next());
+        count++;
+    }
+
+    DLOG(INFO) << "Get statements done (count=" << count << ", time="
+               << std::chrono::duration <double, std::milli> (
+                    std::chrono::steady_clock::now() - start).count()
+               << "ms).";
+}
+
+
+service::proto::UpdateResponse RocksDBPersistence::RemoveStatements(const rdf::proto::Statement& pattern) {
+    auto start = std::chrono::steady_clock::now();
+    DLOG(INFO) << "Remove statements matching pattern " << pattern.DebugString();
+
+    rocksdb::WriteBatch batch;
+
+    int64_t count = RemoveStatements(pattern, batch);
+    CHECK_STATUS(database_->Write(rocksdb::WriteOptions(), &batch));
+
+    DLOG(INFO) << "Removed " << count << " statements (time=" <<
+               std::chrono::duration <double, std::milli> (
+                       std::chrono::steady_clock::now() - start).count()
+               << "ms).";
+
+    service::proto::UpdateResponse result;
+    result.set_removed_statements(count);
+    return result;
+}
+
+service::proto::UpdateResponse RocksDBPersistence::Update(RocksDBPersistence::UpdateIterator &it) {
+    auto start = std::chrono::steady_clock::now();
+    LOG(INFO) << "Starting batch update operation.";
+
+    WriteBatch batch;
+    int64_t added_stmts = 0, removed_stmts = 0, added_ns = 0, removed_ns = 0;
+
+    long count = 0;
+    while (it.hasNext()) {
+        auto next = it.next();
+        if (next.has_stmt_added()) {
+            AddStatement(next.stmt_added(), batch);
+            added_stmts++;
+        } else if (next.has_stmt_removed()) {
+            removed_stmts +=
+                    RemoveStatements(next.stmt_removed(), batch);
+        } else if(next.has_ns_added()) {
+            AddNamespace(next.ns_added(), batch);
+            added_ns++;
+        } else if(next.has_ns_removed()) {
+            RemoveNamespace(next.ns_removed(), batch);
+            removed_ns++;
+        }
+
+        count++;
+        if (count % FLAGS_write_batch_size == 0) {
+            CHECK_STATUS(database_->Write(rocksdb::WriteOptions(), &batch));
+            batch.Clear();
+        }
+    }
+
+    CHECK_STATUS(database_->Write(rocksdb::WriteOptions(), &batch));
+    batch.Clear();
+
+    LOG(INFO) << "Batch update complete. (statements added: " << added_stmts
+            << ", statements removed: " << removed_stmts
+            << ", namespaces added: " << added_ns
+            << ", namespaces removed: " << removed_ns
+            << ", time=" << std::chrono::duration <double, std::milli> (
+                std::chrono::steady_clock::now() - start).count() << "ms).";
+
+    service::proto::UpdateResponse stats;
+    stats.set_added_statements(added_stmts);
+    stats.set_removed_statements(removed_stmts);
+    stats.set_added_namespaces(added_ns);
+    stats.set_removed_namespaces(removed_ns);
+    return stats;
+}
+
+void RocksDBPersistence::AddNamespace(
+        const Namespace &ns, WriteBatch &batch) {
+    DLOG(INFO) << "Adding namespace " << ns.DebugString();
+
+    std::string buffer;
+    ns.SerializeToString(&buffer);
+    batch.Put(handles_[Handles::NSPREFIX], ns.prefix(), buffer);
+    batch.Put(handles_[Handles::NSURI], ns.uri(), buffer);
+}
+
+void RocksDBPersistence::RemoveNamespace(
+        const Namespace &pattern, WriteBatch &batch) {
+    DLOG(INFO) << "Removing namespaces matching pattern " << pattern.DebugString();
+
+    GetNamespaces(pattern, [&batch, this](const rdf::proto::Namespace& ns) -> bool {
+        batch.Delete(handles_[Handles::NSPREFIX], ns.prefix());
+        batch.Delete(handles_[Handles::NSURI], ns.uri());
+        return true;
+    });
+}
+
+
+void RocksDBPersistence::AddStatement(
+        const Statement &stmt, WriteBatch &batch) {
+    DLOG(INFO) << "Adding statement " << stmt.DebugString();
+
+    Key key(stmt);
+
+    std::string buffer;
+    stmt.SerializeToString(&buffer);
+
+    char *k_spoc = key.Create(IndexTypes::SPOC);
+    batch.Put(handles_[Handles::ISPOC], rocksdb::Slice(k_spoc, 4 * KEY_LENGTH), buffer);
+
+    char *k_cspo = key.Create(IndexTypes::CSPO);
+    batch.Put(handles_[Handles::ICSPO], rocksdb::Slice(k_cspo, 4 * KEY_LENGTH), buffer);
+
+    char *k_opsc = key.Create(IndexTypes::OPSC);
+    batch.Put(handles_[Handles::IOPSC], rocksdb::Slice(k_opsc, 4 * KEY_LENGTH), buffer);
+
+    char *k_pcos = key.Create(IndexTypes::PCOS);
+    batch.Put(handles_[Handles::IPCOS], rocksdb::Slice(k_pcos, 4 * KEY_LENGTH), buffer);
+
+    delete[] k_spoc;
+    delete[] k_cspo;
+    delete[] k_opsc;
+    delete[] k_pcos;
+}
+
+
+int64_t RocksDBPersistence::RemoveStatements(
+        const Statement& pattern, WriteBatch& batch) {
+    DLOG(INFO) << "Removing statements matching " << pattern.DebugString();
+
+    int64_t count = 0;
+
+    GetStatements(pattern, [&](const Statement stmt) -> bool {
+        Key key(stmt);
+
+        char* k_spoc = key.Create(IndexTypes::SPOC);
+        batch.Delete(handles_[Handles::ISPOC], rocksdb::Slice(k_spoc, 4 * KEY_LENGTH));
+
+        char* k_cspo = key.Create(IndexTypes::CSPO);
+        batch.Delete(handles_[Handles::ICSPO], rocksdb::Slice(k_cspo, 4 * KEY_LENGTH));
+
+        char* k_opsc = key.Create(IndexTypes::OPSC);
+        batch.Delete(handles_[Handles::IOPSC], rocksdb::Slice(k_opsc, 4 * KEY_LENGTH));
+
+        char* k_pcos = key.Create(IndexTypes::PCOS);
+        batch.Delete(handles_[Handles::IPCOS], rocksdb::Slice(k_pcos, 4 * KEY_LENGTH));
+
+        delete[] k_spoc;
+        delete[] k_cspo;
+        delete[] k_opsc;
+        delete[] k_pcos;
+
+        count++;
+
+        return true;
+    });
+
+    return count;
+}
+
+int KeyComparator::Compare(const rocksdb::Slice& a, const rocksdb::Slice& b) const {
+    return memcmp(a.data(), b.data(), 4 * KEY_LENGTH);
+}
+
+
+int64_t RocksDBPersistence::Size() {
+    int64_t count = 0;
+    rocksdb::Iterator* it = database_->NewIterator(rocksdb::ReadOptions(), handles_[Handles::ISPOC]);
+    for (it->SeekToFirst(); it->Valid(); it->Next()) {
+        count++;
+    }
+
+    delete it;
+    return count;
+}
+
+}  // namespace persistence
+}  // namespace marmotta
+
+

http://git-wip-us.apache.org/repos/asf/marmotta/blob/b8d122a1/libraries/ostrich/backend/persistence/rocksdb_persistence.h
----------------------------------------------------------------------
diff --git a/libraries/ostrich/backend/persistence/rocksdb_persistence.h b/libraries/ostrich/backend/persistence/rocksdb_persistence.h
new file mode 100644
index 0000000..a04169b
--- /dev/null
+++ b/libraries/ostrich/backend/persistence/rocksdb_persistence.h
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef MARMOTTA_PERSISTENCE_H
+#define MARMOTTA_PERSISTENCE_H
+
+#include <memory>
+#include <string>
+#include <functional>
+
+#include <rocksdb/db.h>
+#include <rocksdb/cache.h>
+#include <rocksdb/comparator.h>
+
+#include "persistence/base_persistence.h"
+#include "util/threadpool.h"
+
+namespace marmotta {
+namespace persistence {
+
+/**
+ * A custom comparator treating the bytes in the key as unsigned char.
+ */
+class KeyComparator : public rocksdb::Comparator {
+ public:
+    int Compare(const rocksdb::Slice& a, const rocksdb::Slice& b) const override ;
+
+    const char* Name() const override { return "KeyComparator"; }
+    void FindShortestSeparator(std::string*, const rocksdb::Slice&) const override { }
+    void FindShortSuccessor(std::string*) const override { }
+};
+
+
+// Symbolic handle indices,
+enum Handles {
+    ISPOC = 0, ICSPO = 1, IOPSC = 2, IPCOS = 3, NSPREFIX = 4, NSURI = 5, META = 6
+};
+
+/**
+ * Persistence implementation based on the RocksDB high performance database.
+ */
+class RocksDBPersistence : public Persistence {
+ public:
+    /**
+     * Initialise a new LevelDB database using the given path and cache size (bytes).
+     */
+    RocksDBPersistence(const std::string& path, int64_t cacheSize);
+
+    ~RocksDBPersistence();
+
+    /**
+     * Add the namespaces in the iterator to the database.
+     */
+    service::proto::UpdateResponse AddNamespaces(NamespaceIterator& it) override;
+
+    /**
+     * Add the statements in the iterator to the database.
+     */
+    service::proto::UpdateResponse  AddStatements(StatementIterator& it) override;
+
+    /**
+     * Get all statements matching the pattern (which may have some fields
+     * unset to indicate wildcards). Call the callback function for each
+     * result.
+     */
+    void GetStatements(const rdf::proto::Statement& pattern,
+                       StatementHandler callback);
+
+    /**
+     * Get all statements matching the pattern (which may have some fields
+     * unset to indicate wildcards). Call the callback function for each
+     * result.
+     */
+    std::unique_ptr<StatementIterator>
+            GetStatements(const rdf::proto::Statement& pattern);
+
+    /**
+     * Get all namespaces matching the pattern (which may have some of all
+     * fields unset to indicate wildcards). Call the callback function for
+     * each result.
+     */
+    void GetNamespaces(const rdf::proto::Namespace &pattern,
+                       NamespaceHandler callback);
+
+    /**
+     * Get all namespaces matching the pattern (which may have some of all
+     * fields unset to indicate wildcards). Call the callback function for
+     * each result.
+     */
+    std::unique_ptr<NamespaceIterator>
+            GetNamespaces(const rdf::proto::Namespace &pattern);
+
+    /**
+     * Remove all statements matching the pattern (which may have some fields
+     * unset to indicate wildcards).
+     */
+    service::proto::UpdateResponse  RemoveStatements(
+            const rdf::proto::Statement& pattern) override;
+
+    /**
+     * Apply a batch of updates (mixed statement/namespace adds and removes).
+     * The updates are collected in LevelDB batches and written atomically to
+     * the database when iteration ends.
+     */
+    service::proto::UpdateResponse Update(
+            UpdateIterator& it) override;
+
+    /**
+     * Return the size of this database.
+     */
+    int64_t Size() override;
+ private:
+    ctpl::thread_pool workers_;
+
+    KeyComparator comparator_;
+    std::unique_ptr<rocksdb::DB> database_;
+
+    // Column Families for the different index access types.
+    std::vector<rocksdb::ColumnFamilyHandle*> handles_;
+
+    /**
+     * Add the namespace to the given database batch operations.
+     */
+    void AddNamespace(const rdf::proto::Namespace& ns, rocksdb::WriteBatch& batch);
+
+    /**
+     * Add the namespace to the given database batch operations.
+     */
+    void RemoveNamespace(const rdf::proto::Namespace& ns, rocksdb::WriteBatch& batch);
+
+    /**
+     * Add the statement to the given database batch operations.
+     */
+    void AddStatement(const rdf::proto::Statement& stmt, rocksdb::WriteBatch& batch);
+
+
+    /**
+     * Remove all statements matching the pattern (which may have some fields
+     * unset to indicate wildcards) from the given database batch operations.
+     */
+    int64_t RemoveStatements(const rdf::proto::Statement& pattern, rocksdb::WriteBatch& batch);
+};
+
+
+
+}  // namespace persistence
+}  // namespace marmotta
+
+#endif //MARMOTTA_PERSISTENCE_H

http://git-wip-us.apache.org/repos/asf/marmotta/blob/b8d122a1/libraries/ostrich/backend/persistence/rocksdb_server.cc
----------------------------------------------------------------------
diff --git a/libraries/ostrich/backend/persistence/rocksdb_server.cc b/libraries/ostrich/backend/persistence/rocksdb_server.cc
new file mode 100644
index 0000000..837242a
--- /dev/null
+++ b/libraries/ostrich/backend/persistence/rocksdb_server.cc
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+// Binary to start a persistence server implementing the sail.proto API.
+#include <gflags/gflags.h>
+#include <glog/logging.h>
+#include <sys/stat.h>
+#include <signal.h>
+
+#include "rocksdb_persistence.h"
+#include "leveldb_service.h"
+
+using grpc::Status;
+using grpc::Server;
+using grpc::ServerBuilder;
+
+
+DEFINE_string(host, "0.0.0.0", "Address/name of server to access.");
+DEFINE_string(port, "10000", "Port of server to access.");
+DEFINE_string(db, "/tmp/testdb", "Path to database. Will be created if non-existant.");
+DEFINE_int64(cache_size, 100 * 1048576, "Cache size used by the database (in bytes).");
+
+std::unique_ptr<Server> server;
+
+void stopServer(int signal) {
+    if (server.get() != nullptr) {
+        LOG(INFO) << "RocksDB Persistence Server shutting down";
+        server->Shutdown();
+    }
+}
+
+int main(int argc, char** argv) {
+    // Initialize Google's logging library.
+    google::InitGoogleLogging(argv[0]);
+    google::ParseCommandLineFlags(&argc, &argv, true);
+
+    mkdir(FLAGS_db.c_str(), 0700);
+    marmotta::persistence::RocksDBPersistence persistence(FLAGS_db, FLAGS_cache_size);
+
+    marmotta::service::LevelDBService sailService(&persistence);
+    marmotta::service::LevelDBSparqlService sparqlService(&persistence);
+
+    ServerBuilder builder;
+    builder.AddListeningPort(FLAGS_host + ":" + FLAGS_port, grpc::InsecureServerCredentials());
+    builder.RegisterService(&sailService);
+    builder.RegisterService(&sparqlService);
+    builder.SetMaxMessageSize(INT_MAX);
+
+    server = builder.BuildAndStart();
+    std::cout << "RocksDB Persistence Server listening on " << FLAGS_host << ":" << FLAGS_port << std::endl;
+
+    LOG(INFO) << "RocksDB Persistence Server listening on " << FLAGS_host << ":" << FLAGS_port;
+
+    signal(SIGINT, stopServer);
+    signal(SIGTERM, stopServer);
+
+    server->Wait();
+
+    return 0;
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/marmotta/blob/b8d122a1/libraries/ostrich/backend/test/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/libraries/ostrich/backend/test/CMakeLists.txt b/libraries/ostrich/backend/test/CMakeLists.txt
index 29491ba..c3beef9 100644
--- a/libraries/ostrich/backend/test/CMakeLists.txt
+++ b/libraries/ostrich/backend/test/CMakeLists.txt
@@ -12,9 +12,13 @@ target_link_libraries(model_tests gtest marmotta_model ${GLOG_LIBRARY})
 add_executable(sparql_tests SparqlTest.cc main.cc)
 target_link_libraries(sparql_tests gtest marmotta_model marmotta_sparql ${GLOG_LIBRARY})
 
+add_executable(leveldb_tests main.cc LevelDBTest.cc)
+target_link_libraries(leveldb_tests gtest marmotta_leveldb ${GLOG_LIBRARY} ${Boost_LIBRARIES})
+
 add_executable(persistence_tests main.cc PersistenceTest.cc)
 target_link_libraries(persistence_tests gtest marmotta_leveldb ${GLOG_LIBRARY} ${Boost_LIBRARIES})
 
 add_test(NAME ModelTest COMMAND model_tests)
 add_test(NAME SparqlTest COMMAND sparql_tests)
+add_test(NAME LevelDBTest COMMAND leveldb_tests)
 add_test(NAME PersistenceTest COMMAND persistence_tests)
\ No newline at end of file


[4/8] marmotta git commit: Ostrich: use a thread pool instead of starting and stopping individual threads

Posted by ss...@apache.org.
Ostrich: use a thread pool instead of starting and stopping individual threads


Project: http://git-wip-us.apache.org/repos/asf/marmotta/repo
Commit: http://git-wip-us.apache.org/repos/asf/marmotta/commit/d811ef38
Tree: http://git-wip-us.apache.org/repos/asf/marmotta/tree/d811ef38
Diff: http://git-wip-us.apache.org/repos/asf/marmotta/diff/d811ef38

Branch: refs/heads/develop
Commit: d811ef382445cdccf2c23c417ae912cda16ccdf3
Parents: 76ed061
Author: Sebastian Schaffert <ss...@apache.org>
Authored: Tue Aug 23 12:30:39 2016 +0200
Committer: Sebastian Schaffert <ss...@apache.org>
Committed: Tue Aug 23 12:30:39 2016 +0200

----------------------------------------------------------------------
 .../backend/persistence/leveldb_persistence.cc  |  56 ++---
 .../backend/persistence/leveldb_persistence.h   |   2 +
 libraries/ostrich/backend/util/CMakeLists.txt   |   3 +-
 libraries/ostrich/backend/util/threadpool.h     | 251 +++++++++++++++++++
 4 files changed, 283 insertions(+), 29 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/marmotta/blob/d811ef38/libraries/ostrich/backend/persistence/leveldb_persistence.cc
----------------------------------------------------------------------
diff --git a/libraries/ostrich/backend/persistence/leveldb_persistence.cc b/libraries/ostrich/backend/persistence/leveldb_persistence.cc
index fbb1cb2..efc1dc6 100644
--- a/libraries/ostrich/backend/persistence/leveldb_persistence.cc
+++ b/libraries/ostrich/backend/persistence/leveldb_persistence.cc
@@ -293,7 +293,7 @@ bool Matches(const Statement& pattern, const Statement& stmt) {
 dbimpl::DB* buildDB(const std::string& path, const std::string& suffix, const dbimpl::Options& options) {
     dbimpl::DB* db;
     dbimpl::Status status = dbimpl::DB::Open(options, path + "/" + suffix + ".db", &db);
-    assert(status.ok());
+    CHECK_STATUS(status);
     return db;
 }
 
@@ -327,7 +327,7 @@ dbimpl::Options buildNsOptions() {
 }
 
 LevelDBPersistence::LevelDBPersistence(const std::string &path, int64_t cacheSize)
-        : comparator(new KeyComparator())
+        : workers(8), comparator(new KeyComparator())
         , cache(dbimpl::NewLRUCache(cacheSize))
         , options(buildOptions(comparator.get(), cache.get()))
         , db_ns_prefix(buildDB(path, "ns_prefix", buildNsOptions()))
@@ -335,23 +335,23 @@ LevelDBPersistence::LevelDBPersistence(const std::string &path, int64_t cacheSiz
         , db_meta(buildDB(path, "metadata", buildNsOptions())) {
 
     // Open databases in separate threads as LevelDB does a lot of computation on open.
-    std::vector<std::thread> openers;
-    openers.push_back(std::thread([&]() {
+    std::vector<std::future<void>> openers;
+    openers.push_back(workers.push([&](int id) {
         db_spoc.reset(buildDB(path, "spoc", *options));
     }));
-    openers.push_back(std::thread([&]() {
+    openers.push_back(workers.push([&](int id) {
         db_cspo.reset(buildDB(path, "cspo", *options));
     }));
-    openers.push_back(std::thread([&]() {
+    openers.push_back(workers.push([&](int id) {
         db_opsc.reset(buildDB(path, "opsc", *options));
     }));
-    openers.push_back(std::thread([&]() {
+    openers.push_back(workers.push([&](int id) {
         db_pcos.reset(buildDB(path, "pcos", *options));
     }));
 
 
     for (auto& t : openers) {
-        t.join();
+        t.wait();
     }
 
     CHECK_NOTNULL(db_spoc.get());
@@ -434,26 +434,26 @@ int64_t LevelDBPersistence::AddStatements(StatementIterator& it) {
 
     dbimpl::WriteBatch batch_spoc, batch_cspo, batch_opsc, batch_pcos;
     auto writeBatches = [&]{
-        std::vector<std::thread> writers;
-        writers.push_back(std::thread([&]() {
+        std::vector<std::future<void>> writers;
+        writers.push_back(workers.push([&](int id) {
             CHECK_STATUS(db_pcos->Write(dbimpl::WriteOptions(), &batch_pcos));
             batch_pcos.Clear();
         }));
-        writers.push_back(std::thread([&]() {
+        writers.push_back(workers.push([&](int id) {
             CHECK_STATUS(db_opsc->Write(dbimpl::WriteOptions(), &batch_opsc));
             batch_opsc.Clear();
         }));
-        writers.push_back(std::thread([&]() {
+        writers.push_back(workers.push([&](int id) {
             CHECK_STATUS(db_cspo->Write(dbimpl::WriteOptions(), &batch_cspo));
             batch_cspo.Clear();
         }));
-        writers.push_back(std::thread([&]() {
+        writers.push_back(workers.push([&](int id) {
             CHECK_STATUS(db_spoc->Write(dbimpl::WriteOptions(), &batch_spoc));
             batch_spoc.Clear();
         }));
 
         for (auto& t : writers) {
-            t.join();
+            t.wait();
         }
     };
 
@@ -546,22 +546,22 @@ int64_t LevelDBPersistence::RemoveStatements(const rdf::proto::Statement& patter
 
     count = RemoveStatements(pattern, batch_spoc, batch_cspo, batch_opsc, batch_pcos);
 
-    std::vector<std::thread> writers;
-    writers.push_back(std::thread([&]() {
+    std::vector<std::future<void>> writers;
+    writers.push_back(workers.push([&](int id) {
         CHECK_STATUS(db_pcos->Write(dbimpl::WriteOptions(), &batch_pcos));
     }));
-    writers.push_back(std::thread([&]() {
+    writers.push_back(workers.push([&](int id) {
         CHECK_STATUS(db_opsc->Write(dbimpl::WriteOptions(), &batch_opsc));
     }));
-    writers.push_back(std::thread([&]() {
+    writers.push_back(workers.push([&](int id) {
         CHECK_STATUS(db_cspo->Write(dbimpl::WriteOptions(), &batch_cspo));
     }));
-    writers.push_back(std::thread([&]() {
+    writers.push_back(workers.push([&](int id) {
         CHECK_STATUS(db_spoc->Write(dbimpl::WriteOptions(), &batch_spoc));
     }));
 
     for (auto& t : writers) {
-        t.join();
+        t.wait();
     }
 
     DLOG(INFO) << "Removed " << count << " statements (time=" <<
@@ -579,34 +579,34 @@ UpdateStatistics LevelDBPersistence::Update(LevelDBPersistence::UpdateIterator &
 
     WriteBatch b_spoc, b_cspo, b_opsc, b_pcos, b_prefix, b_url;
     auto writeBatches = [&]{
-        std::vector<std::thread> writers;
-        writers.push_back(std::thread([&]() {
+        std::vector<std::future<void>> writers;
+        writers.push_back(workers.push([&](int id) {
             CHECK_STATUS(db_pcos->Write(dbimpl::WriteOptions(), &b_pcos));
             b_pcos.Clear();
         }));
-        writers.push_back(std::thread([&]() {
+        writers.push_back(workers.push([&](int id) {
             CHECK_STATUS(db_opsc->Write(dbimpl::WriteOptions(), &b_opsc));
             b_opsc.Clear();
         }));
-        writers.push_back(std::thread([&]() {
+        writers.push_back(workers.push([&](int id) {
             CHECK_STATUS(db_cspo->Write(dbimpl::WriteOptions(), &b_cspo));
             b_cspo.Clear();
         }));
-        writers.push_back(std::thread([&]() {
+        writers.push_back(workers.push([&](int id) {
             CHECK_STATUS(db_spoc->Write(dbimpl::WriteOptions(), &b_spoc));
             b_spoc.Clear();
         }));
-        writers.push_back(std::thread([&]() {
+        writers.push_back(workers.push([&](int id) {
             CHECK_STATUS(db_ns_prefix->Write(dbimpl::WriteOptions(), &b_prefix));
             b_prefix.Clear();
         }));
-        writers.push_back(std::thread([&]() {
+        writers.push_back(workers.push([&](int id) {
             CHECK_STATUS(db_ns_url->Write(dbimpl::WriteOptions(), &b_url));
             b_url.Clear();
         }));
 
         for (auto& t : writers) {
-            t.join();
+            t.wait();
         }
     };
 

http://git-wip-us.apache.org/repos/asf/marmotta/blob/d811ef38/libraries/ostrich/backend/persistence/leveldb_persistence.h
----------------------------------------------------------------------
diff --git a/libraries/ostrich/backend/persistence/leveldb_persistence.h b/libraries/ostrich/backend/persistence/leveldb_persistence.h
index 9fd1924..eee80e4 100644
--- a/libraries/ostrich/backend/persistence/leveldb_persistence.h
+++ b/libraries/ostrich/backend/persistence/leveldb_persistence.h
@@ -39,6 +39,7 @@ namespace dbimpl = leveldb;
 #include "model/rdf_model.h"
 #include "service/sail.pb.h"
 #include "util/iterator.h"
+#include "util/threadpool.h"
 
 namespace marmotta {
 namespace persistence {
@@ -142,6 +143,7 @@ class LevelDBPersistence {
      */
     int64_t Size();
  private:
+    ctpl::thread_pool workers;
 
     std::unique_ptr<KeyComparator> comparator;
     std::shared_ptr<dbimpl::Cache> cache;

http://git-wip-us.apache.org/repos/asf/marmotta/blob/d811ef38/libraries/ostrich/backend/util/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/libraries/ostrich/backend/util/CMakeLists.txt b/libraries/ostrich/backend/util/CMakeLists.txt
index a4ad8b3..d7e13e0 100644
--- a/libraries/ostrich/backend/util/CMakeLists.txt
+++ b/libraries/ostrich/backend/util/CMakeLists.txt
@@ -1,6 +1,7 @@
 include_directories(.. ${CMAKE_CURRENT_BINARY_DIR}/..)
 
-add_library(marmotta_util murmur3.cc murmur3.h split.cc split.h iterator.h unique.h time_logger.cc time_logger.h)
+add_library(marmotta_util murmur3.cc murmur3.h split.cc split.h
+        iterator.h unique.h time_logger.cc time_logger.h threadpool.h)
 
 add_library(marmotta_raptor_util raptor_util.h raptor_util.cc)
 target_link_libraries(marmotta_raptor_util marmotta_model ${CMAKE_THREAD_LIBS_INIT} ${RAPTOR_LIBRARY} ${GFLAGS_LIBRARY})
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/marmotta/blob/d811ef38/libraries/ostrich/backend/util/threadpool.h
----------------------------------------------------------------------
diff --git a/libraries/ostrich/backend/util/threadpool.h b/libraries/ostrich/backend/util/threadpool.h
new file mode 100644
index 0000000..9047aad
--- /dev/null
+++ b/libraries/ostrich/backend/util/threadpool.h
@@ -0,0 +1,251 @@
+/*********************************************************
+*
+*  Copyright (C) 2014 by Vitaliy Vitsentiy
+*
+*  Licensed under the Apache License, Version 2.0 (the "License");
+*  you may not use this file except in compliance with the License.
+*  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+*  Unless required by applicable law or agreed to in writing, software
+*  distributed under the License is distributed on an "AS IS" BASIS,
+*  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+*  See the License for the specific language governing permissions and
+*  limitations under the License.
+*
+*********************************************************/
+
+
+#ifndef __ctpl_stl_thread_pool_H__
+#define __ctpl_stl_thread_pool_H__
+
+#include <functional>
+#include <thread>
+#include <atomic>
+#include <vector>
+#include <memory>
+#include <exception>
+#include <future>
+#include <mutex>
+#include <queue>
+
+
+
+// thread pool to run user's functors with signature
+//      ret func(int id, other_params)
+// where id is the index of the thread that runs the functor
+// ret is some return type
+
+
+namespace ctpl {
+
+namespace detail {
+template <typename T>
+class Queue {
+ public:
+    bool push(T const & value) {
+        std::unique_lock<std::mutex> lock(this->mutex);
+        this->q.push(value);
+        return true;
+    }
+    // deletes the retrieved element, do not use for non integral types
+    bool pop(T & v) {
+        std::unique_lock<std::mutex> lock(this->mutex);
+        if (this->q.empty())
+            return false;
+        v = this->q.front();
+        this->q.pop();
+        return true;
+    }
+    bool empty() {
+        std::unique_lock<std::mutex> lock(this->mutex);
+        return this->q.empty();
+    }
+ private:
+    std::queue<T> q;
+    std::mutex mutex;
+};
+}
+
+class thread_pool {
+
+ public:
+
+    thread_pool() { this->init(); }
+    thread_pool(int nThreads) { this->init(); this->resize(nThreads); }
+
+    // the destructor waits for all the functions in the queue to be finished
+    ~thread_pool() {
+        this->stop(true);
+    }
+
+    // get the number of running threads in the pool
+    int size() { return static_cast<int>(this->threads.size()); }
+
+    // number of idle threads
+    int n_idle() { return this->nWaiting; }
+    std::thread & get_thread(int i) { return *this->threads[i]; }
+
+    // change the number of threads in the pool
+    // should be called from one thread, otherwise be careful to not interleave, also with this->stop()
+    // nThreads must be >= 0
+    void resize(int nThreads) {
+        if (!this->isStop && !this->isDone) {
+            int oldNThreads = static_cast<int>(this->threads.size());
+            if (oldNThreads <= nThreads) {  // if the number of threads is increased
+                this->threads.resize(nThreads);
+                this->flags.resize(nThreads);
+
+                for (int i = oldNThreads; i < nThreads; ++i) {
+                    this->flags[i] = std::make_shared<std::atomic<bool>>(false);
+                    this->set_thread(i);
+                }
+            }
+            else {  // the number of threads is decreased
+                for (int i = oldNThreads - 1; i >= nThreads; --i) {
+                    *this->flags[i] = true;  // this thread will finish
+                    this->threads[i]->detach();
+                }
+                {
+                    // stop the detached threads that were waiting
+                    std::unique_lock<std::mutex> lock(this->mutex);
+                    this->cv.notify_all();
+                }
+                this->threads.resize(nThreads);  // safe to delete because the threads are detached
+                this->flags.resize(nThreads);  // safe to delete because the threads have copies of shared_ptr of the flags, not originals
+            }
+        }
+    }
+
+    // empty the queue
+    void clear_queue() {
+        std::function<void(int id)> * _f;
+        while (this->q.pop(_f))
+            delete _f; // empty the queue
+    }
+
+    // pops a functional wrapper to the original function
+    std::function<void(int)> pop() {
+        std::function<void(int id)> * _f = nullptr;
+        this->q.pop(_f);
+        std::unique_ptr<std::function<void(int id)>> func(_f); // at return, delete the function even if an exception occurred
+        std::function<void(int)> f;
+        if (_f)
+            f = *_f;
+        return f;
+    }
+
+    // wait for all computing threads to finish and stop all threads
+    // may be called asynchronously to not pause the calling thread while waiting
+    // if isWait == true, all the functions in the queue are run, otherwise the queue is cleared without running the functions
+    void stop(bool isWait = false) {
+        if (!isWait) {
+            if (this->isStop)
+                return;
+            this->isStop = true;
+            for (int i = 0, n = this->size(); i < n; ++i) {
+                *this->flags[i] = true;  // command the threads to stop
+            }
+            this->clear_queue();  // empty the queue
+        }
+        else {
+            if (this->isDone || this->isStop)
+                return;
+            this->isDone = true;  // give the waiting threads a command to finish
+        }
+        {
+            std::unique_lock<std::mutex> lock(this->mutex);
+            this->cv.notify_all();  // stop all waiting threads
+        }
+        for (int i = 0; i < static_cast<int>(this->threads.size()); ++i) {  // wait for the computing threads to finish
+            if (this->threads[i]->joinable())
+                this->threads[i]->join();
+        }
+        // if there were no threads in the pool but some functors in the queue, the functors are not deleted by the threads
+        // therefore delete them here
+        this->clear_queue();
+        this->threads.clear();
+        this->flags.clear();
+    }
+
+    template<typename F, typename... Rest>
+    auto push(F && f, Rest&&... rest) ->std::future<decltype(f(0, rest...))> {
+        auto pck = std::make_shared<std::packaged_task<decltype(f(0, rest...))(int)>>(
+                std::bind(std::forward<F>(f), std::placeholders::_1, std::forward<Rest>(rest)...)
+        );
+        auto _f = new std::function<void(int id)>([pck](int id) {
+            (*pck)(id);
+        });
+        this->q.push(_f);
+        std::unique_lock<std::mutex> lock(this->mutex);
+        this->cv.notify_one();
+        return pck->get_future();
+    }
+
+    // run the user's function that excepts argument int - id of the running thread. returned value is templatized
+    // operator returns std::future, where the user can get the result and rethrow the catched exceptins
+    template<typename F>
+    auto push(F && f) ->std::future<decltype(f(0))> {
+        auto pck = std::make_shared<std::packaged_task<decltype(f(0))(int)>>(std::forward<F>(f));
+        auto _f = new std::function<void(int id)>([pck](int id) {
+            (*pck)(id);
+        });
+        this->q.push(_f);
+        std::unique_lock<std::mutex> lock(this->mutex);
+        this->cv.notify_one();
+        return pck->get_future();
+    }
+
+
+ private:
+
+    // deleted
+    thread_pool(const thread_pool &);// = delete;
+    thread_pool(thread_pool &&);// = delete;
+    thread_pool & operator=(const thread_pool &);// = delete;
+    thread_pool & operator=(thread_pool &&);// = delete;
+
+    void set_thread(int i) {
+        std::shared_ptr<std::atomic<bool>> flag(this->flags[i]); // a copy of the shared ptr to the flag
+        auto f = [this, i, flag/* a copy of the shared ptr to the flag */]() {
+            std::atomic<bool> & _flag = *flag;
+            std::function<void(int id)> * _f;
+            bool isPop = this->q.pop(_f);
+            while (true) {
+                while (isPop) {  // if there is anything in the queue
+                    std::unique_ptr<std::function<void(int id)>> func(_f); // at return, delete the function even if an exception occurred
+                    (*_f)(i);
+                    if (_flag)
+                        return;  // the thread is wanted to stop, return even if the queue is not empty yet
+                    else
+                        isPop = this->q.pop(_f);
+                }
+                // the queue is empty here, wait for the next command
+                std::unique_lock<std::mutex> lock(this->mutex);
+                ++this->nWaiting;
+                this->cv.wait(lock, [this, &_f, &isPop, &_flag](){ isPop = this->q.pop(_f); return isPop || this->isDone || _flag; });
+                --this->nWaiting;
+                if (!isPop)
+                    return;  // if the queue is empty and this->isDone == true or *flag then return
+            }
+        };
+        this->threads[i].reset(new std::thread(f)); // compiler may not support std::make_unique()
+    }
+
+    void init() { this->nWaiting = 0; this->isStop = false; this->isDone = false; }
+
+    std::vector<std::unique_ptr<std::thread>> threads;
+    std::vector<std::shared_ptr<std::atomic<bool>>> flags;
+    detail::Queue<std::function<void(int id)> *> q;
+    std::atomic<bool> isDone;
+    std::atomic<bool> isStop;
+    std::atomic<int> nWaiting;  // how many threads are waiting
+
+    std::mutex mutex;
+    std::condition_variable cv;
+};
+
+}
+
+#endif // __ctpl_stl_thread_pool_H__
\ No newline at end of file


[5/8] marmotta git commit: Ostrich: - separate RocksDB and LevelDB support - refactor the whole persistence architecture to be more object oriented and easier to understand

Posted by ss...@apache.org.
http://git-wip-us.apache.org/repos/asf/marmotta/blob/b8d122a1/libraries/ostrich/backend/test/LevelDBTest.cc
----------------------------------------------------------------------
diff --git a/libraries/ostrich/backend/test/LevelDBTest.cc b/libraries/ostrich/backend/test/LevelDBTest.cc
new file mode 100644
index 0000000..304de31
--- /dev/null
+++ b/libraries/ostrich/backend/test/LevelDBTest.cc
@@ -0,0 +1,268 @@
+//
+// Created by wastl on 19.12.15.
+//
+#include <cstdlib>
+#include <vector>
+
+#include <glog/logging.h>
+
+#include "gtest/gtest.h"
+#include "gmock/gmock.h"
+#include "boost/filesystem.hpp"
+
+#include "util/iterator.h"
+#include "model/rdf_operators.h"
+#include "persistence/leveldb_persistence.h"
+
+using namespace boost::filesystem;
+
+using testing::Contains;
+
+namespace marmotta {
+namespace rdf {
+namespace proto {
+std::ostream& operator<<(std::ostream& out, const Statement& stmt) {
+    out << rdf::Statement(stmt).as_turtle();
+    return out;
+}
+}
+}
+
+namespace persistence {
+namespace {
+
+
+class LevelDBTest : public ::testing::Test {
+ protected:
+    LevelDBTest() {
+        testdir = temp_directory_path()/unique_path();
+        create_directory(testdir);
+
+        LOG(INFO) << "Test DB Path: " << testdir.string();
+
+        db = new LevelDBPersistence(testdir.string(), 10 * 1048576);
+    }
+
+    ~LevelDBTest() {
+        LOG(INFO) << "Destroying Test DB: " << testdir.string();
+        delete db;
+        remove_all(testdir);
+    }
+
+    LevelDBPersistence* db;
+    path testdir;
+};
+
+TEST_F(LevelDBTest, TestAddNamespaces) {
+    std::vector<rdf::proto::Namespace> ns = {
+            rdf::Namespace("ex", "http://www.example.com/").getMessage(),
+            rdf::Namespace("foo", "http://www.foo.com/").getMessage(),
+    };
+
+    util::CollectionIterator<rdf::proto::Namespace> it(ns);
+    db->AddNamespaces(it);
+
+    {
+        rdf::Namespace pattern;
+        pattern.setPrefix("foo");
+        auto it = db->GetNamespaces(pattern.getMessage());
+        EXPECT_TRUE(it->hasNext());
+        EXPECT_EQ(ns[1], it->next());
+        EXPECT_FALSE(it->hasNext());
+    }
+
+    {
+        rdf::Namespace pattern;
+        pattern.setPrefix("bar");
+        auto it = db->GetNamespaces(pattern.getMessage());
+        EXPECT_FALSE(it->hasNext());
+    }
+
+    {
+        rdf::Namespace pattern;
+        pattern.setUri("http://www.example.com/");
+        auto it = db->GetNamespaces(pattern.getMessage());
+        EXPECT_TRUE(it->hasNext());
+        EXPECT_EQ(ns[0], it->next());
+        EXPECT_FALSE(it->hasNext());
+    }
+}
+
+
+TEST_F(LevelDBTest, TestAddStatements) {
+    std::vector<rdf::proto::Statement> stmts = {
+            rdf::Statement(rdf::URI("http://example.com/s1"), rdf::URI("http://example.com/p1"),
+                           rdf::URI("http://example.com/o1")).getMessage(),
+            rdf::Statement(rdf::URI("http://example.com/s2"), rdf::URI("http://example.com/p2"),
+                           rdf::URI("http://example.com/o2")).getMessage()
+    };
+
+    util::CollectionIterator<rdf::proto::Statement> it(stmts);
+    db->AddStatements(it);
+
+    EXPECT_EQ(2, db->Size());
+    for (const auto& stmt : stmts) {
+        auto it = db->GetStatements(stmt);
+        ASSERT_TRUE(it->hasNext());
+        EXPECT_EQ(stmt, it->next());
+        EXPECT_FALSE(it->hasNext());
+    }
+}
+
+// Test pattern queries that can be answered directly by the index.
+TEST_F(LevelDBTest, TestGetStatementsIndexed) {
+    std::vector<rdf::proto::Statement> stmts = {
+            rdf::Statement(rdf::URI("http://example.com/s1"), rdf::URI("http://example.com/p1"),
+                           rdf::URI("http://example.com/o1")).getMessage(),
+            rdf::Statement(rdf::URI("http://example.com/s2"), rdf::URI("http://example.com/p1"),
+                           rdf::URI("http://example.com/o1")).getMessage(),
+            rdf::Statement(rdf::URI("http://example.com/s1"), rdf::URI("http://example.com/p2"),
+                           rdf::URI("http://example.com/o2")).getMessage(),
+            rdf::Statement(rdf::URI("http://example.com/s2"), rdf::URI("http://example.com/p2"),
+                           rdf::URI("http://example.com/o2")).getMessage(),
+            rdf::Statement(rdf::URI("http://example.com/s1"), rdf::URI("http://example.com/p3"),
+                           rdf::URI("http://example.com/o3")).getMessage(),
+    };
+
+    util::CollectionIterator<rdf::proto::Statement> it(stmts);
+    db->AddStatements(it);
+
+    EXPECT_EQ(5, db->Size());
+
+    rdf::Statement pattern1;
+    pattern1.setSubject(rdf::URI("http://example.com/s1"));
+    auto it1 = db->GetStatements(pattern1.getMessage());
+    for (int i=0; i<3; i++) {
+        ASSERT_TRUE(it1->hasNext());
+        EXPECT_THAT(stmts, Contains(it1->next()));
+    }
+    EXPECT_FALSE(it1->hasNext());
+
+    rdf::Statement pattern2;
+    pattern2.setObject(rdf::URI("http://example.com/o1"));
+    auto it2 = db->GetStatements(pattern2.getMessage());
+    for (int i=0; i<2; i++) {
+        ASSERT_TRUE(it2->hasNext());
+        EXPECT_THAT(stmts, Contains(it2->next()));
+    }
+    EXPECT_FALSE(it2->hasNext());
+
+    rdf::Statement pattern3;
+    pattern3.setPredicate(rdf::URI("http://example.com/p1"));
+    auto it3 = db->GetStatements(pattern3.getMessage());
+    for (int i=0; i<2; i++) {
+        ASSERT_TRUE(it3->hasNext());
+        EXPECT_THAT(stmts, Contains(it3->next()));
+    }
+    EXPECT_FALSE(it3->hasNext());
+}
+
+// Test pattern queries that trigger filtering because the index alone cannot answer these queries.
+TEST_F(LevelDBTest, TestGetStatementsFiltered) {
+    std::vector<rdf::proto::Statement> stmts = {
+            rdf::Statement(rdf::URI("http://example.com/s1"), rdf::URI("http://example.com/p1"),
+                           rdf::URI("http://example.com/o1")).getMessage(),
+            rdf::Statement(rdf::URI("http://example.com/s1"), rdf::URI("http://example.com/p2"),
+                           rdf::URI("http://example.com/o1")).getMessage(),
+            rdf::Statement(rdf::URI("http://example.com/s1"), rdf::URI("http://example.com/p3"),
+                           rdf::URI("http://example.com/o1")).getMessage(),
+            rdf::Statement(rdf::URI("http://example.com/s2"), rdf::URI("http://example.com/p1"),
+                           rdf::URI("http://example.com/o2")).getMessage(),
+            rdf::Statement(rdf::URI("http://example.com/s2"), rdf::URI("http://example.com/p2"),
+                           rdf::URI("http://example.com/o2")).getMessage(),
+    };
+
+    util::CollectionIterator<rdf::proto::Statement> it(stmts);
+    db->AddStatements(it);
+
+    EXPECT_EQ(5, db->Size());
+
+    rdf::Statement pattern1;
+    pattern1.setSubject(rdf::URI("http://example.com/s1"));
+    pattern1.setObject(rdf::URI("http://example.com/o1"));
+    auto it1 = db->GetStatements(pattern1.getMessage());
+    for (int i=0; i<3; i++) {
+        ASSERT_TRUE(it1->hasNext());
+        EXPECT_THAT(stmts, Contains(it1->next()));
+    }
+    EXPECT_FALSE(it1->hasNext());
+
+    rdf::Statement pattern2;
+    pattern2.setSubject(rdf::URI("http://example.com/s2"));
+    pattern2.setObject(rdf::URI("http://example.com/o2"));
+    auto it2 = db->GetStatements(pattern2.getMessage());
+    for (int i=0; i<2; i++) {
+        ASSERT_TRUE(it2->hasNext());
+        EXPECT_THAT(stmts, Contains(it2->next()));
+    }
+    EXPECT_FALSE(it2->hasNext());
+}
+
+
+TEST_F(LevelDBTest, TestRemoveStatements) {
+    std::vector<rdf::proto::Statement> stmts = {
+            rdf::Statement(rdf::URI("http://example.com/s1"), rdf::URI("http://example.com/p1"),
+                           rdf::URI("http://example.com/o1")).getMessage(),
+            rdf::Statement(rdf::URI("http://example.com/s2"), rdf::URI("http://example.com/p2"),
+                           rdf::URI("http://example.com/o2")).getMessage()
+    };
+
+    util::CollectionIterator<rdf::proto::Statement> it(stmts);
+    db->AddStatements(it);
+    ASSERT_EQ(2, db->Size());
+
+    {
+        auto it1 = db->GetStatements(stmts[0]);
+        EXPECT_TRUE(it1->hasNext());
+    }
+
+    db->RemoveStatements(stmts[0]);
+    EXPECT_EQ(1, db->Size());
+
+    {
+        auto it2 = db->GetStatements(stmts[0]);
+        EXPECT_FALSE(it2->hasNext());
+    }
+
+}
+
+TEST_F(LevelDBTest, TestUpdates) {
+    std::vector<rdf::proto::Statement> stmts = {
+            rdf::Statement(rdf::URI("http://example.com/s1"), rdf::URI("http://example.com/p1"),
+                           rdf::URI("http://example.com/o1")).getMessage(),
+            rdf::Statement(rdf::URI("http://example.com/s2"), rdf::URI("http://example.com/p2"),
+                           rdf::URI("http://example.com/o2")).getMessage()
+    };
+
+    util::CollectionIterator<rdf::proto::Statement> it(stmts);
+    db->AddStatements(it);
+    ASSERT_EQ(2, db->Size());
+
+    service::proto::UpdateRequest removeReq;
+    *removeReq.mutable_stmt_removed() = stmts[0];
+    service::proto::UpdateRequest addReq;
+    *addReq.mutable_stmt_added() =
+            rdf::Statement(rdf::URI("http://example.com/s1"), rdf::URI("http://example.com/p1"),
+                           rdf::URI("http://example.com/o3")).getMessage();
+
+
+    util::CollectionIterator<service::proto::UpdateRequest> updates({ removeReq, addReq });
+    db->Update(updates);
+    ASSERT_EQ(2, db->Size());
+
+    {
+        auto it = db->GetStatements(stmts[0]);
+        EXPECT_FALSE(it->hasNext());
+    }
+
+    {
+        auto it = db->GetStatements(addReq.stmt_added());
+        EXPECT_TRUE(it->hasNext());
+    }
+
+}
+
+
+}
+}
+}

http://git-wip-us.apache.org/repos/asf/marmotta/blob/b8d122a1/libraries/ostrich/backend/test/PersistenceTest.cc
----------------------------------------------------------------------
diff --git a/libraries/ostrich/backend/test/PersistenceTest.cc b/libraries/ostrich/backend/test/PersistenceTest.cc
index 8fb60c9..4fe1ca9 100644
--- a/libraries/ostrich/backend/test/PersistenceTest.cc
+++ b/libraries/ostrich/backend/test/PersistenceTest.cc
@@ -1,268 +1,63 @@
 //
-// Created by wastl on 19.12.15.
+// Created by wastl on 24.08.16.
 //
-#include <cstdlib>
-#include <vector>
-
-#include <glog/logging.h>
+#include "persistence/base_persistence.h"
 
 #include "gtest/gtest.h"
 #include "gmock/gmock.h"
-#include "boost/filesystem.hpp"
 
-#include "util/iterator.h"
 #include "model/rdf_operators.h"
-#include "persistence/leveldb_persistence.h"
-
-using namespace boost::filesystem;
-
-using testing::Contains;
 
 namespace marmotta {
-namespace rdf {
-namespace proto {
-std::ostream& operator<<(std::ostream& out, const Statement& stmt) {
-    out << rdf::Statement(stmt).as_turtle();
-    return out;
-}
-}
-}
-
 namespace persistence {
-namespace {
-
-
-class PersistenceTest : public ::testing::Test {
- protected:
-    PersistenceTest() {
-        testdir = temp_directory_path()/unique_path();
-        create_directory(testdir);
-
-        LOG(INFO) << "Test DB Path: " << testdir.string();
-
-        db = new LevelDBPersistence(testdir.string(), 10 * 1048576);
-    }
-
-    ~PersistenceTest() {
-        LOG(INFO) << "Destroying Test DB: " << testdir.string();
-        delete db;
-        remove_all(testdir);
-    }
-
-    LevelDBPersistence* db;
-    path testdir;
-};
+namespace test {
 
-TEST_F(PersistenceTest, TestAddNamespaces) {
-    std::vector<rdf::proto::Namespace> ns = {
-            rdf::Namespace("ex", "http://www.example.com/").getMessage(),
-            rdf::Namespace("foo", "http://www.foo.com/").getMessage(),
-    };
-
-    util::CollectionIterator<rdf::proto::Namespace> it(ns);
-    db->AddNamespaces(it);
-
-    {
-        rdf::Namespace pattern;
-        pattern.setPrefix("foo");
-        auto it = db->GetNamespaces(pattern.getMessage());
-        EXPECT_TRUE(it->hasNext());
-        EXPECT_EQ(ns[1], it->next());
-        EXPECT_FALSE(it->hasNext());
-    }
-
-    {
-        rdf::Namespace pattern;
-        pattern.setPrefix("bar");
-        auto it = db->GetNamespaces(pattern.getMessage());
-        EXPECT_FALSE(it->hasNext());
-    }
-
-    {
-        rdf::Namespace pattern;
-        pattern.setUri("http://www.example.com/");
-        auto it = db->GetNamespaces(pattern.getMessage());
-        EXPECT_TRUE(it->hasNext());
-        EXPECT_EQ(ns[0], it->next());
-        EXPECT_FALSE(it->hasNext());
-    }
+bool keysEqual(const char* key1, const char* key2) {
+    return memcmp(key1, key2, 4 * kKeyLength) == 0;
 }
 
-
-TEST_F(PersistenceTest, TestAddStatements) {
-    std::vector<rdf::proto::Statement> stmts = {
-            rdf::Statement(rdf::URI("http://example.com/s1"), rdf::URI("http://example.com/p1"),
-                           rdf::URI("http://example.com/o1")).getMessage(),
-            rdf::Statement(rdf::URI("http://example.com/s2"), rdf::URI("http://example.com/p2"),
-                           rdf::URI("http://example.com/o2")).getMessage()
-    };
-
-    util::CollectionIterator<rdf::proto::Statement> it(stmts);
-    db->AddStatements(it);
-
-    EXPECT_EQ(2, db->Size());
-    for (const auto& stmt : stmts) {
-        auto it = db->GetStatements(stmt);
-        ASSERT_TRUE(it->hasNext());
-        EXPECT_EQ(stmt, it->next());
-        EXPECT_FALSE(it->hasNext());
-    }
+bool keysNotEqual(const char* key1, const char* key2) {
+    return memcmp(key1, key2, 4 * kKeyLength) != 0;
 }
 
-// Test pattern queries that can be answered directly by the index.
-TEST_F(PersistenceTest, TestGetStatementsIndexed) {
-    std::vector<rdf::proto::Statement> stmts = {
-            rdf::Statement(rdf::URI("http://example.com/s1"), rdf::URI("http://example.com/p1"),
-                           rdf::URI("http://example.com/o1")).getMessage(),
-            rdf::Statement(rdf::URI("http://example.com/s2"), rdf::URI("http://example.com/p1"),
-                           rdf::URI("http://example.com/o1")).getMessage(),
-            rdf::Statement(rdf::URI("http://example.com/s1"), rdf::URI("http://example.com/p2"),
-                           rdf::URI("http://example.com/o2")).getMessage(),
-            rdf::Statement(rdf::URI("http://example.com/s2"), rdf::URI("http://example.com/p2"),
-                           rdf::URI("http://example.com/o2")).getMessage(),
-            rdf::Statement(rdf::URI("http://example.com/s1"), rdf::URI("http://example.com/p3"),
-                           rdf::URI("http://example.com/o3")).getMessage(),
-    };
-
-    util::CollectionIterator<rdf::proto::Statement> it(stmts);
-    db->AddStatements(it);
-
-    EXPECT_EQ(5, db->Size());
-
-    rdf::Statement pattern1;
-    pattern1.setSubject(rdf::URI("http://example.com/s1"));
-    auto it1 = db->GetStatements(pattern1.getMessage());
-    for (int i=0; i<3; i++) {
-        ASSERT_TRUE(it1->hasNext());
-        EXPECT_THAT(stmts, Contains(it1->next()));
-    }
-    EXPECT_FALSE(it1->hasNext());
-
-    rdf::Statement pattern2;
-    pattern2.setObject(rdf::URI("http://example.com/o1"));
-    auto it2 = db->GetStatements(pattern2.getMessage());
-    for (int i=0; i<2; i++) {
-        ASSERT_TRUE(it2->hasNext());
-        EXPECT_THAT(stmts, Contains(it2->next()));
-    }
-    EXPECT_FALSE(it2->hasNext());
-
-    rdf::Statement pattern3;
-    pattern3.setPredicate(rdf::URI("http://example.com/p1"));
-    auto it3 = db->GetStatements(pattern3.getMessage());
-    for (int i=0; i<2; i++) {
-        ASSERT_TRUE(it3->hasNext());
-        EXPECT_THAT(stmts, Contains(it3->next()));
-    }
-    EXPECT_FALSE(it3->hasNext());
-}
-
-// Test pattern queries that trigger filtering because the index alone cannot answer these queries.
-TEST_F(PersistenceTest, TestGetStatementsFiltered) {
-    std::vector<rdf::proto::Statement> stmts = {
-            rdf::Statement(rdf::URI("http://example.com/s1"), rdf::URI("http://example.com/p1"),
-                           rdf::URI("http://example.com/o1")).getMessage(),
-            rdf::Statement(rdf::URI("http://example.com/s1"), rdf::URI("http://example.com/p2"),
-                           rdf::URI("http://example.com/o1")).getMessage(),
-            rdf::Statement(rdf::URI("http://example.com/s1"), rdf::URI("http://example.com/p3"),
-                           rdf::URI("http://example.com/o1")).getMessage(),
-            rdf::Statement(rdf::URI("http://example.com/s2"), rdf::URI("http://example.com/p1"),
-                           rdf::URI("http://example.com/o2")).getMessage(),
-            rdf::Statement(rdf::URI("http://example.com/s2"), rdf::URI("http://example.com/p2"),
-                           rdf::URI("http://example.com/o2")).getMessage(),
-    };
-
-    util::CollectionIterator<rdf::proto::Statement> it(stmts);
-    db->AddStatements(it);
-
-    EXPECT_EQ(5, db->Size());
-
-    rdf::Statement pattern1;
-    pattern1.setSubject(rdf::URI("http://example.com/s1"));
-    pattern1.setObject(rdf::URI("http://example.com/o1"));
-    auto it1 = db->GetStatements(pattern1.getMessage());
-    for (int i=0; i<3; i++) {
-        ASSERT_TRUE(it1->hasNext());
-        EXPECT_THAT(stmts, Contains(it1->next()));
-    }
-    EXPECT_FALSE(it1->hasNext());
-
-    rdf::Statement pattern2;
-    pattern2.setSubject(rdf::URI("http://example.com/s2"));
-    pattern2.setObject(rdf::URI("http://example.com/o2"));
-    auto it2 = db->GetStatements(pattern2.getMessage());
-    for (int i=0; i<2; i++) {
-        ASSERT_TRUE(it2->hasNext());
-        EXPECT_THAT(stmts, Contains(it2->next()));
-    }
-    EXPECT_FALSE(it2->hasNext());
+bool lessThan(const char* key1, const char* key2) {
+    return memcmp(key1, key2, 4 * kKeyLength) < 0;
 }
 
+TEST(KeyTest, StatementsDiffer) {
+    rdf::Statement stmt1(rdf::URI("http://example.com/s1"), rdf::URI("http://example.com/p1"),
+                         rdf::URI("http://example.com/o1"));
+    rdf::Statement stmt2(rdf::URI("http://example.com/s2"), rdf::URI("http://example.com/p2"),
+                         rdf::URI("http://example.com/o2"));
 
-TEST_F(PersistenceTest, TestRemoveStatements) {
-    std::vector<rdf::proto::Statement> stmts = {
-            rdf::Statement(rdf::URI("http://example.com/s1"), rdf::URI("http://example.com/p1"),
-                           rdf::URI("http://example.com/o1")).getMessage(),
-            rdf::Statement(rdf::URI("http://example.com/s2"), rdf::URI("http://example.com/p2"),
-                           rdf::URI("http://example.com/o2")).getMessage()
-    };
-
-    util::CollectionIterator<rdf::proto::Statement> it(stmts);
-    db->AddStatements(it);
-    ASSERT_EQ(2, db->Size());
+    Key key1(stmt1.getMessage());
+    Key key2(stmt2.getMessage());
 
-    {
-        auto it1 = db->GetStatements(stmts[0]);
-        EXPECT_TRUE(it1->hasNext());
+    for (auto t : {SPOC, CSPO, OPSC, PCOS}) {
+        char* k1 = key1.Create(t);
+        char* k2 = key2.Create(t);
+        EXPECT_PRED2(keysNotEqual, k1, k2);
+        delete[] k1;
+        delete[] k2;
     }
-
-    db->RemoveStatements(stmts[0]);
-    EXPECT_EQ(1, db->Size());
-
-    {
-        auto it2 = db->GetStatements(stmts[0]);
-        EXPECT_FALSE(it2->hasNext());
-    }
-
 }
 
-TEST_F(PersistenceTest, TestUpdates) {
-    std::vector<rdf::proto::Statement> stmts = {
-            rdf::Statement(rdf::URI("http://example.com/s1"), rdf::URI("http://example.com/p1"),
-                           rdf::URI("http://example.com/o1")).getMessage(),
-            rdf::Statement(rdf::URI("http://example.com/s2"), rdf::URI("http://example.com/p2"),
-                           rdf::URI("http://example.com/o2")).getMessage()
-    };
-
-    util::CollectionIterator<rdf::proto::Statement> it(stmts);
-    db->AddStatements(it);
-    ASSERT_EQ(2, db->Size());
-
-    service::proto::UpdateRequest removeReq;
-    *removeReq.mutable_stmt_removed() = stmts[0];
-    service::proto::UpdateRequest addReq;
-    *addReq.mutable_stmt_added() =
-            rdf::Statement(rdf::URI("http://example.com/s1"), rdf::URI("http://example.com/p1"),
-                           rdf::URI("http://example.com/o3")).getMessage();
-
+TEST(KeyTest, BoundsDiffer) {
+    rdf::Statement stmt(rdf::URI("http://example.com/s1"), rdf::URI("http://example.com/p1"),
+                         rdf::URI("http://example.com/o1"));
 
-    util::CollectionIterator<service::proto::UpdateRequest> updates({ removeReq, addReq });
-    db->Update(updates);
-    ASSERT_EQ(2, db->Size());
+    Key key(stmt.getMessage());
 
-    {
-        auto it = db->GetStatements(stmts[0]);
-        EXPECT_FALSE(it->hasNext());
+    for (auto t : {SPOC, CSPO, OPSC, PCOS}) {
+        char* k1 = key.Create(t, LOWER);
+        char* k2 = key.Create(t, UPPER);
+        EXPECT_PRED2(keysNotEqual, k1, k2);
+        EXPECT_PRED2(lessThan, k1, k2);
+        delete[] k1;
+        delete[] k2;
     }
-
-    {
-        auto it = db->GetStatements(addReq.stmt_added());
-        EXPECT_TRUE(it->hasNext());
-    }
-
 }
 
-
-}
-}
-}
+}  // namespace test
+}  // namespace persistence
+}  // namespace marmotta