You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@marmotta.apache.org by ss...@apache.org on 2016/08/25 11:04:25 UTC

[4/8] marmotta git commit: Ostrich: use a thread pool instead of starting and stopping individual threads

Ostrich: use a thread pool instead of starting and stopping individual threads


Project: http://git-wip-us.apache.org/repos/asf/marmotta/repo
Commit: http://git-wip-us.apache.org/repos/asf/marmotta/commit/d811ef38
Tree: http://git-wip-us.apache.org/repos/asf/marmotta/tree/d811ef38
Diff: http://git-wip-us.apache.org/repos/asf/marmotta/diff/d811ef38

Branch: refs/heads/develop
Commit: d811ef382445cdccf2c23c417ae912cda16ccdf3
Parents: 76ed061
Author: Sebastian Schaffert <ss...@apache.org>
Authored: Tue Aug 23 12:30:39 2016 +0200
Committer: Sebastian Schaffert <ss...@apache.org>
Committed: Tue Aug 23 12:30:39 2016 +0200

----------------------------------------------------------------------
 .../backend/persistence/leveldb_persistence.cc  |  56 ++---
 .../backend/persistence/leveldb_persistence.h   |   2 +
 libraries/ostrich/backend/util/CMakeLists.txt   |   3 +-
 libraries/ostrich/backend/util/threadpool.h     | 251 +++++++++++++++++++
 4 files changed, 283 insertions(+), 29 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/marmotta/blob/d811ef38/libraries/ostrich/backend/persistence/leveldb_persistence.cc
----------------------------------------------------------------------
diff --git a/libraries/ostrich/backend/persistence/leveldb_persistence.cc b/libraries/ostrich/backend/persistence/leveldb_persistence.cc
index fbb1cb2..efc1dc6 100644
--- a/libraries/ostrich/backend/persistence/leveldb_persistence.cc
+++ b/libraries/ostrich/backend/persistence/leveldb_persistence.cc
@@ -293,7 +293,7 @@ bool Matches(const Statement& pattern, const Statement& stmt) {
 dbimpl::DB* buildDB(const std::string& path, const std::string& suffix, const dbimpl::Options& options) {
     dbimpl::DB* db;
     dbimpl::Status status = dbimpl::DB::Open(options, path + "/" + suffix + ".db", &db);
-    assert(status.ok());
+    CHECK_STATUS(status);
     return db;
 }
 
@@ -327,7 +327,7 @@ dbimpl::Options buildNsOptions() {
 }
 
 LevelDBPersistence::LevelDBPersistence(const std::string &path, int64_t cacheSize)
-        : comparator(new KeyComparator())
+        : workers(8), comparator(new KeyComparator())
         , cache(dbimpl::NewLRUCache(cacheSize))
         , options(buildOptions(comparator.get(), cache.get()))
         , db_ns_prefix(buildDB(path, "ns_prefix", buildNsOptions()))
@@ -335,23 +335,23 @@ LevelDBPersistence::LevelDBPersistence(const std::string &path, int64_t cacheSiz
         , db_meta(buildDB(path, "metadata", buildNsOptions())) {
 
     // Open databases in separate threads as LevelDB does a lot of computation on open.
-    std::vector<std::thread> openers;
-    openers.push_back(std::thread([&]() {
+    std::vector<std::future<void>> openers;
+    openers.push_back(workers.push([&](int id) {
         db_spoc.reset(buildDB(path, "spoc", *options));
     }));
-    openers.push_back(std::thread([&]() {
+    openers.push_back(workers.push([&](int id) {
         db_cspo.reset(buildDB(path, "cspo", *options));
     }));
-    openers.push_back(std::thread([&]() {
+    openers.push_back(workers.push([&](int id) {
         db_opsc.reset(buildDB(path, "opsc", *options));
     }));
-    openers.push_back(std::thread([&]() {
+    openers.push_back(workers.push([&](int id) {
         db_pcos.reset(buildDB(path, "pcos", *options));
     }));
 
 
     for (auto& t : openers) {
-        t.join();
+        t.wait();
     }
 
     CHECK_NOTNULL(db_spoc.get());
@@ -434,26 +434,26 @@ int64_t LevelDBPersistence::AddStatements(StatementIterator& it) {
 
     dbimpl::WriteBatch batch_spoc, batch_cspo, batch_opsc, batch_pcos;
     auto writeBatches = [&]{
-        std::vector<std::thread> writers;
-        writers.push_back(std::thread([&]() {
+        std::vector<std::future<void>> writers;
+        writers.push_back(workers.push([&](int id) {
             CHECK_STATUS(db_pcos->Write(dbimpl::WriteOptions(), &batch_pcos));
             batch_pcos.Clear();
         }));
-        writers.push_back(std::thread([&]() {
+        writers.push_back(workers.push([&](int id) {
             CHECK_STATUS(db_opsc->Write(dbimpl::WriteOptions(), &batch_opsc));
             batch_opsc.Clear();
         }));
-        writers.push_back(std::thread([&]() {
+        writers.push_back(workers.push([&](int id) {
             CHECK_STATUS(db_cspo->Write(dbimpl::WriteOptions(), &batch_cspo));
             batch_cspo.Clear();
         }));
-        writers.push_back(std::thread([&]() {
+        writers.push_back(workers.push([&](int id) {
             CHECK_STATUS(db_spoc->Write(dbimpl::WriteOptions(), &batch_spoc));
             batch_spoc.Clear();
         }));
 
         for (auto& t : writers) {
-            t.join();
+            t.wait();
         }
     };
 
@@ -546,22 +546,22 @@ int64_t LevelDBPersistence::RemoveStatements(const rdf::proto::Statement& patter
 
     count = RemoveStatements(pattern, batch_spoc, batch_cspo, batch_opsc, batch_pcos);
 
-    std::vector<std::thread> writers;
-    writers.push_back(std::thread([&]() {
+    std::vector<std::future<void>> writers;
+    writers.push_back(workers.push([&](int id) {
         CHECK_STATUS(db_pcos->Write(dbimpl::WriteOptions(), &batch_pcos));
     }));
-    writers.push_back(std::thread([&]() {
+    writers.push_back(workers.push([&](int id) {
         CHECK_STATUS(db_opsc->Write(dbimpl::WriteOptions(), &batch_opsc));
     }));
-    writers.push_back(std::thread([&]() {
+    writers.push_back(workers.push([&](int id) {
         CHECK_STATUS(db_cspo->Write(dbimpl::WriteOptions(), &batch_cspo));
     }));
-    writers.push_back(std::thread([&]() {
+    writers.push_back(workers.push([&](int id) {
         CHECK_STATUS(db_spoc->Write(dbimpl::WriteOptions(), &batch_spoc));
     }));
 
     for (auto& t : writers) {
-        t.join();
+        t.wait();
     }
 
     DLOG(INFO) << "Removed " << count << " statements (time=" <<
@@ -579,34 +579,34 @@ UpdateStatistics LevelDBPersistence::Update(LevelDBPersistence::UpdateIterator &
 
     WriteBatch b_spoc, b_cspo, b_opsc, b_pcos, b_prefix, b_url;
     auto writeBatches = [&]{
-        std::vector<std::thread> writers;
-        writers.push_back(std::thread([&]() {
+        std::vector<std::future<void>> writers;
+        writers.push_back(workers.push([&](int id) {
             CHECK_STATUS(db_pcos->Write(dbimpl::WriteOptions(), &b_pcos));
             b_pcos.Clear();
         }));
-        writers.push_back(std::thread([&]() {
+        writers.push_back(workers.push([&](int id) {
             CHECK_STATUS(db_opsc->Write(dbimpl::WriteOptions(), &b_opsc));
             b_opsc.Clear();
         }));
-        writers.push_back(std::thread([&]() {
+        writers.push_back(workers.push([&](int id) {
             CHECK_STATUS(db_cspo->Write(dbimpl::WriteOptions(), &b_cspo));
             b_cspo.Clear();
         }));
-        writers.push_back(std::thread([&]() {
+        writers.push_back(workers.push([&](int id) {
             CHECK_STATUS(db_spoc->Write(dbimpl::WriteOptions(), &b_spoc));
             b_spoc.Clear();
         }));
-        writers.push_back(std::thread([&]() {
+        writers.push_back(workers.push([&](int id) {
             CHECK_STATUS(db_ns_prefix->Write(dbimpl::WriteOptions(), &b_prefix));
             b_prefix.Clear();
         }));
-        writers.push_back(std::thread([&]() {
+        writers.push_back(workers.push([&](int id) {
             CHECK_STATUS(db_ns_url->Write(dbimpl::WriteOptions(), &b_url));
             b_url.Clear();
         }));
 
         for (auto& t : writers) {
-            t.join();
+            t.wait();
         }
     };
 

http://git-wip-us.apache.org/repos/asf/marmotta/blob/d811ef38/libraries/ostrich/backend/persistence/leveldb_persistence.h
----------------------------------------------------------------------
diff --git a/libraries/ostrich/backend/persistence/leveldb_persistence.h b/libraries/ostrich/backend/persistence/leveldb_persistence.h
index 9fd1924..eee80e4 100644
--- a/libraries/ostrich/backend/persistence/leveldb_persistence.h
+++ b/libraries/ostrich/backend/persistence/leveldb_persistence.h
@@ -39,6 +39,7 @@ namespace dbimpl = leveldb;
 #include "model/rdf_model.h"
 #include "service/sail.pb.h"
 #include "util/iterator.h"
+#include "util/threadpool.h"
 
 namespace marmotta {
 namespace persistence {
@@ -142,6 +143,7 @@ class LevelDBPersistence {
      */
     int64_t Size();
  private:
+    ctpl::thread_pool workers;
 
     std::unique_ptr<KeyComparator> comparator;
     std::shared_ptr<dbimpl::Cache> cache;

http://git-wip-us.apache.org/repos/asf/marmotta/blob/d811ef38/libraries/ostrich/backend/util/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/libraries/ostrich/backend/util/CMakeLists.txt b/libraries/ostrich/backend/util/CMakeLists.txt
index a4ad8b3..d7e13e0 100644
--- a/libraries/ostrich/backend/util/CMakeLists.txt
+++ b/libraries/ostrich/backend/util/CMakeLists.txt
@@ -1,6 +1,7 @@
 include_directories(.. ${CMAKE_CURRENT_BINARY_DIR}/..)
 
-add_library(marmotta_util murmur3.cc murmur3.h split.cc split.h iterator.h unique.h time_logger.cc time_logger.h)
+add_library(marmotta_util murmur3.cc murmur3.h split.cc split.h
+        iterator.h unique.h time_logger.cc time_logger.h threadpool.h)
 
 add_library(marmotta_raptor_util raptor_util.h raptor_util.cc)
 target_link_libraries(marmotta_raptor_util marmotta_model ${CMAKE_THREAD_LIBS_INIT} ${RAPTOR_LIBRARY} ${GFLAGS_LIBRARY})
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/marmotta/blob/d811ef38/libraries/ostrich/backend/util/threadpool.h
----------------------------------------------------------------------
diff --git a/libraries/ostrich/backend/util/threadpool.h b/libraries/ostrich/backend/util/threadpool.h
new file mode 100644
index 0000000..9047aad
--- /dev/null
+++ b/libraries/ostrich/backend/util/threadpool.h
@@ -0,0 +1,251 @@
+/*********************************************************
+*
+*  Copyright (C) 2014 by Vitaliy Vitsentiy
+*
+*  Licensed under the Apache License, Version 2.0 (the "License");
+*  you may not use this file except in compliance with the License.
+*  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+*  Unless required by applicable law or agreed to in writing, software
+*  distributed under the License is distributed on an "AS IS" BASIS,
+*  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+*  See the License for the specific language governing permissions and
+*  limitations under the License.
+*
+*********************************************************/
+
+
+#ifndef __ctpl_stl_thread_pool_H__
+#define __ctpl_stl_thread_pool_H__
+
+#include <functional>
+#include <thread>
+#include <atomic>
+#include <vector>
+#include <memory>
+#include <exception>
+#include <future>
+#include <mutex>
+#include <queue>
+
+
+
+// thread pool to run user's functors with signature
+//      ret func(int id, other_params)
+// where id is the index of the thread that runs the functor
+// ret is some return type
+
+
+namespace ctpl {
+
+namespace detail {
+template <typename T>
+class Queue {
+ public:
+    bool push(T const & value) {
+        std::unique_lock<std::mutex> lock(this->mutex);
+        this->q.push(value);
+        return true;
+    }
+    // deletes the retrieved element, do not use for non integral types
+    bool pop(T & v) {
+        std::unique_lock<std::mutex> lock(this->mutex);
+        if (this->q.empty())
+            return false;
+        v = this->q.front();
+        this->q.pop();
+        return true;
+    }
+    bool empty() {
+        std::unique_lock<std::mutex> lock(this->mutex);
+        return this->q.empty();
+    }
+ private:
+    std::queue<T> q;
+    std::mutex mutex;
+};
+}
+
+class thread_pool {
+
+ public:
+
+    thread_pool() { this->init(); }
+    thread_pool(int nThreads) { this->init(); this->resize(nThreads); }
+
+    // the destructor waits for all the functions in the queue to be finished
+    ~thread_pool() {
+        this->stop(true);
+    }
+
+    // get the number of running threads in the pool
+    int size() { return static_cast<int>(this->threads.size()); }
+
+    // number of idle threads
+    int n_idle() { return this->nWaiting; }
+    std::thread & get_thread(int i) { return *this->threads[i]; }
+
+    // change the number of threads in the pool
+    // should be called from one thread, otherwise be careful to not interleave, also with this->stop()
+    // nThreads must be >= 0
+    void resize(int nThreads) {
+        if (!this->isStop && !this->isDone) {
+            int oldNThreads = static_cast<int>(this->threads.size());
+            if (oldNThreads <= nThreads) {  // if the number of threads is increased
+                this->threads.resize(nThreads);
+                this->flags.resize(nThreads);
+
+                for (int i = oldNThreads; i < nThreads; ++i) {
+                    this->flags[i] = std::make_shared<std::atomic<bool>>(false);
+                    this->set_thread(i);
+                }
+            }
+            else {  // the number of threads is decreased
+                for (int i = oldNThreads - 1; i >= nThreads; --i) {
+                    *this->flags[i] = true;  // this thread will finish
+                    this->threads[i]->detach();
+                }
+                {
+                    // stop the detached threads that were waiting
+                    std::unique_lock<std::mutex> lock(this->mutex);
+                    this->cv.notify_all();
+                }
+                this->threads.resize(nThreads);  // safe to delete because the threads are detached
+                this->flags.resize(nThreads);  // safe to delete because the threads have copies of shared_ptr of the flags, not originals
+            }
+        }
+    }
+
+    // empty the queue
+    void clear_queue() {
+        std::function<void(int id)> * _f;
+        while (this->q.pop(_f))
+            delete _f; // empty the queue
+    }
+
+    // pops a functional wrapper to the original function
+    std::function<void(int)> pop() {
+        std::function<void(int id)> * _f = nullptr;
+        this->q.pop(_f);
+        std::unique_ptr<std::function<void(int id)>> func(_f); // at return, delete the function even if an exception occurred
+        std::function<void(int)> f;
+        if (_f)
+            f = *_f;
+        return f;
+    }
+
+    // wait for all computing threads to finish and stop all threads
+    // may be called asynchronously to not pause the calling thread while waiting
+    // if isWait == true, all the functions in the queue are run, otherwise the queue is cleared without running the functions
+    void stop(bool isWait = false) {
+        if (!isWait) {
+            if (this->isStop)
+                return;
+            this->isStop = true;
+            for (int i = 0, n = this->size(); i < n; ++i) {
+                *this->flags[i] = true;  // command the threads to stop
+            }
+            this->clear_queue();  // empty the queue
+        }
+        else {
+            if (this->isDone || this->isStop)
+                return;
+            this->isDone = true;  // give the waiting threads a command to finish
+        }
+        {
+            std::unique_lock<std::mutex> lock(this->mutex);
+            this->cv.notify_all();  // stop all waiting threads
+        }
+        for (int i = 0; i < static_cast<int>(this->threads.size()); ++i) {  // wait for the computing threads to finish
+            if (this->threads[i]->joinable())
+                this->threads[i]->join();
+        }
+        // if there were no threads in the pool but some functors in the queue, the functors are not deleted by the threads
+        // therefore delete them here
+        this->clear_queue();
+        this->threads.clear();
+        this->flags.clear();
+    }
+
+    template<typename F, typename... Rest>
+    auto push(F && f, Rest&&... rest) ->std::future<decltype(f(0, rest...))> {
+        auto pck = std::make_shared<std::packaged_task<decltype(f(0, rest...))(int)>>(
+                std::bind(std::forward<F>(f), std::placeholders::_1, std::forward<Rest>(rest)...)
+        );
+        auto _f = new std::function<void(int id)>([pck](int id) {
+            (*pck)(id);
+        });
+        this->q.push(_f);
+        std::unique_lock<std::mutex> lock(this->mutex);
+        this->cv.notify_one();
+        return pck->get_future();
+    }
+
+    // run the user's function that excepts argument int - id of the running thread. returned value is templatized
+    // operator returns std::future, where the user can get the result and rethrow the catched exceptins
+    template<typename F>
+    auto push(F && f) ->std::future<decltype(f(0))> {
+        auto pck = std::make_shared<std::packaged_task<decltype(f(0))(int)>>(std::forward<F>(f));
+        auto _f = new std::function<void(int id)>([pck](int id) {
+            (*pck)(id);
+        });
+        this->q.push(_f);
+        std::unique_lock<std::mutex> lock(this->mutex);
+        this->cv.notify_one();
+        return pck->get_future();
+    }
+
+
+ private:
+
+    // deleted
+    thread_pool(const thread_pool &);// = delete;
+    thread_pool(thread_pool &&);// = delete;
+    thread_pool & operator=(const thread_pool &);// = delete;
+    thread_pool & operator=(thread_pool &&);// = delete;
+
+    void set_thread(int i) {
+        std::shared_ptr<std::atomic<bool>> flag(this->flags[i]); // a copy of the shared ptr to the flag
+        auto f = [this, i, flag/* a copy of the shared ptr to the flag */]() {
+            std::atomic<bool> & _flag = *flag;
+            std::function<void(int id)> * _f;
+            bool isPop = this->q.pop(_f);
+            while (true) {
+                while (isPop) {  // if there is anything in the queue
+                    std::unique_ptr<std::function<void(int id)>> func(_f); // at return, delete the function even if an exception occurred
+                    (*_f)(i);
+                    if (_flag)
+                        return;  // the thread is wanted to stop, return even if the queue is not empty yet
+                    else
+                        isPop = this->q.pop(_f);
+                }
+                // the queue is empty here, wait for the next command
+                std::unique_lock<std::mutex> lock(this->mutex);
+                ++this->nWaiting;
+                this->cv.wait(lock, [this, &_f, &isPop, &_flag](){ isPop = this->q.pop(_f); return isPop || this->isDone || _flag; });
+                --this->nWaiting;
+                if (!isPop)
+                    return;  // if the queue is empty and this->isDone == true or *flag then return
+            }
+        };
+        this->threads[i].reset(new std::thread(f)); // compiler may not support std::make_unique()
+    }
+
+    void init() { this->nWaiting = 0; this->isStop = false; this->isDone = false; }
+
+    std::vector<std::unique_ptr<std::thread>> threads;
+    std::vector<std::shared_ptr<std::atomic<bool>>> flags;
+    detail::Queue<std::function<void(int id)> *> q;
+    std::atomic<bool> isDone;
+    std::atomic<bool> isStop;
+    std::atomic<int> nWaiting;  // how many threads are waiting
+
+    std::mutex mutex;
+    std::condition_variable cv;
+};
+
+}
+
+#endif // __ctpl_stl_thread_pool_H__
\ No newline at end of file