You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@marmotta.apache.org by ss...@apache.org on 2016/08/25 11:04:25 UTC
[4/8] marmotta git commit: Ostrich: use a thread pool instead of
starting and stopping individual threads
Ostrich: use a thread pool instead of starting and stopping individual threads
Project: http://git-wip-us.apache.org/repos/asf/marmotta/repo
Commit: http://git-wip-us.apache.org/repos/asf/marmotta/commit/d811ef38
Tree: http://git-wip-us.apache.org/repos/asf/marmotta/tree/d811ef38
Diff: http://git-wip-us.apache.org/repos/asf/marmotta/diff/d811ef38
Branch: refs/heads/develop
Commit: d811ef382445cdccf2c23c417ae912cda16ccdf3
Parents: 76ed061
Author: Sebastian Schaffert <ss...@apache.org>
Authored: Tue Aug 23 12:30:39 2016 +0200
Committer: Sebastian Schaffert <ss...@apache.org>
Committed: Tue Aug 23 12:30:39 2016 +0200
----------------------------------------------------------------------
.../backend/persistence/leveldb_persistence.cc | 56 ++---
.../backend/persistence/leveldb_persistence.h | 2 +
libraries/ostrich/backend/util/CMakeLists.txt | 3 +-
libraries/ostrich/backend/util/threadpool.h | 251 +++++++++++++++++++
4 files changed, 283 insertions(+), 29 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/marmotta/blob/d811ef38/libraries/ostrich/backend/persistence/leveldb_persistence.cc
----------------------------------------------------------------------
diff --git a/libraries/ostrich/backend/persistence/leveldb_persistence.cc b/libraries/ostrich/backend/persistence/leveldb_persistence.cc
index fbb1cb2..efc1dc6 100644
--- a/libraries/ostrich/backend/persistence/leveldb_persistence.cc
+++ b/libraries/ostrich/backend/persistence/leveldb_persistence.cc
@@ -293,7 +293,7 @@ bool Matches(const Statement& pattern, const Statement& stmt) {
dbimpl::DB* buildDB(const std::string& path, const std::string& suffix, const dbimpl::Options& options) {
dbimpl::DB* db;
dbimpl::Status status = dbimpl::DB::Open(options, path + "/" + suffix + ".db", &db);
- assert(status.ok());
+ CHECK_STATUS(status);
return db;
}
@@ -327,7 +327,7 @@ dbimpl::Options buildNsOptions() {
}
LevelDBPersistence::LevelDBPersistence(const std::string &path, int64_t cacheSize)
- : comparator(new KeyComparator())
+ : workers(8), comparator(new KeyComparator())
, cache(dbimpl::NewLRUCache(cacheSize))
, options(buildOptions(comparator.get(), cache.get()))
, db_ns_prefix(buildDB(path, "ns_prefix", buildNsOptions()))
@@ -335,23 +335,23 @@ LevelDBPersistence::LevelDBPersistence(const std::string &path, int64_t cacheSiz
, db_meta(buildDB(path, "metadata", buildNsOptions())) {
// Open databases in separate threads as LevelDB does a lot of computation on open.
- std::vector<std::thread> openers;
- openers.push_back(std::thread([&]() {
+ std::vector<std::future<void>> openers;
+ openers.push_back(workers.push([&](int id) {
db_spoc.reset(buildDB(path, "spoc", *options));
}));
- openers.push_back(std::thread([&]() {
+ openers.push_back(workers.push([&](int id) {
db_cspo.reset(buildDB(path, "cspo", *options));
}));
- openers.push_back(std::thread([&]() {
+ openers.push_back(workers.push([&](int id) {
db_opsc.reset(buildDB(path, "opsc", *options));
}));
- openers.push_back(std::thread([&]() {
+ openers.push_back(workers.push([&](int id) {
db_pcos.reset(buildDB(path, "pcos", *options));
}));
for (auto& t : openers) {
- t.join();
+ t.wait();
}
CHECK_NOTNULL(db_spoc.get());
@@ -434,26 +434,26 @@ int64_t LevelDBPersistence::AddStatements(StatementIterator& it) {
dbimpl::WriteBatch batch_spoc, batch_cspo, batch_opsc, batch_pcos;
auto writeBatches = [&]{
- std::vector<std::thread> writers;
- writers.push_back(std::thread([&]() {
+ std::vector<std::future<void>> writers;
+ writers.push_back(workers.push([&](int id) {
CHECK_STATUS(db_pcos->Write(dbimpl::WriteOptions(), &batch_pcos));
batch_pcos.Clear();
}));
- writers.push_back(std::thread([&]() {
+ writers.push_back(workers.push([&](int id) {
CHECK_STATUS(db_opsc->Write(dbimpl::WriteOptions(), &batch_opsc));
batch_opsc.Clear();
}));
- writers.push_back(std::thread([&]() {
+ writers.push_back(workers.push([&](int id) {
CHECK_STATUS(db_cspo->Write(dbimpl::WriteOptions(), &batch_cspo));
batch_cspo.Clear();
}));
- writers.push_back(std::thread([&]() {
+ writers.push_back(workers.push([&](int id) {
CHECK_STATUS(db_spoc->Write(dbimpl::WriteOptions(), &batch_spoc));
batch_spoc.Clear();
}));
for (auto& t : writers) {
- t.join();
+ t.wait();
}
};
@@ -546,22 +546,22 @@ int64_t LevelDBPersistence::RemoveStatements(const rdf::proto::Statement& patter
count = RemoveStatements(pattern, batch_spoc, batch_cspo, batch_opsc, batch_pcos);
- std::vector<std::thread> writers;
- writers.push_back(std::thread([&]() {
+ std::vector<std::future<void>> writers;
+ writers.push_back(workers.push([&](int id) {
CHECK_STATUS(db_pcos->Write(dbimpl::WriteOptions(), &batch_pcos));
}));
- writers.push_back(std::thread([&]() {
+ writers.push_back(workers.push([&](int id) {
CHECK_STATUS(db_opsc->Write(dbimpl::WriteOptions(), &batch_opsc));
}));
- writers.push_back(std::thread([&]() {
+ writers.push_back(workers.push([&](int id) {
CHECK_STATUS(db_cspo->Write(dbimpl::WriteOptions(), &batch_cspo));
}));
- writers.push_back(std::thread([&]() {
+ writers.push_back(workers.push([&](int id) {
CHECK_STATUS(db_spoc->Write(dbimpl::WriteOptions(), &batch_spoc));
}));
for (auto& t : writers) {
- t.join();
+ t.wait();
}
DLOG(INFO) << "Removed " << count << " statements (time=" <<
@@ -579,34 +579,34 @@ UpdateStatistics LevelDBPersistence::Update(LevelDBPersistence::UpdateIterator &
WriteBatch b_spoc, b_cspo, b_opsc, b_pcos, b_prefix, b_url;
auto writeBatches = [&]{
- std::vector<std::thread> writers;
- writers.push_back(std::thread([&]() {
+ std::vector<std::future<void>> writers;
+ writers.push_back(workers.push([&](int id) {
CHECK_STATUS(db_pcos->Write(dbimpl::WriteOptions(), &b_pcos));
b_pcos.Clear();
}));
- writers.push_back(std::thread([&]() {
+ writers.push_back(workers.push([&](int id) {
CHECK_STATUS(db_opsc->Write(dbimpl::WriteOptions(), &b_opsc));
b_opsc.Clear();
}));
- writers.push_back(std::thread([&]() {
+ writers.push_back(workers.push([&](int id) {
CHECK_STATUS(db_cspo->Write(dbimpl::WriteOptions(), &b_cspo));
b_cspo.Clear();
}));
- writers.push_back(std::thread([&]() {
+ writers.push_back(workers.push([&](int id) {
CHECK_STATUS(db_spoc->Write(dbimpl::WriteOptions(), &b_spoc));
b_spoc.Clear();
}));
- writers.push_back(std::thread([&]() {
+ writers.push_back(workers.push([&](int id) {
CHECK_STATUS(db_ns_prefix->Write(dbimpl::WriteOptions(), &b_prefix));
b_prefix.Clear();
}));
- writers.push_back(std::thread([&]() {
+ writers.push_back(workers.push([&](int id) {
CHECK_STATUS(db_ns_url->Write(dbimpl::WriteOptions(), &b_url));
b_url.Clear();
}));
for (auto& t : writers) {
- t.join();
+ t.wait();
}
};
http://git-wip-us.apache.org/repos/asf/marmotta/blob/d811ef38/libraries/ostrich/backend/persistence/leveldb_persistence.h
----------------------------------------------------------------------
diff --git a/libraries/ostrich/backend/persistence/leveldb_persistence.h b/libraries/ostrich/backend/persistence/leveldb_persistence.h
index 9fd1924..eee80e4 100644
--- a/libraries/ostrich/backend/persistence/leveldb_persistence.h
+++ b/libraries/ostrich/backend/persistence/leveldb_persistence.h
@@ -39,6 +39,7 @@ namespace dbimpl = leveldb;
#include "model/rdf_model.h"
#include "service/sail.pb.h"
#include "util/iterator.h"
+#include "util/threadpool.h"
namespace marmotta {
namespace persistence {
@@ -142,6 +143,7 @@ class LevelDBPersistence {
*/
int64_t Size();
private:
+ ctpl::thread_pool workers;
std::unique_ptr<KeyComparator> comparator;
std::shared_ptr<dbimpl::Cache> cache;
http://git-wip-us.apache.org/repos/asf/marmotta/blob/d811ef38/libraries/ostrich/backend/util/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/libraries/ostrich/backend/util/CMakeLists.txt b/libraries/ostrich/backend/util/CMakeLists.txt
index a4ad8b3..d7e13e0 100644
--- a/libraries/ostrich/backend/util/CMakeLists.txt
+++ b/libraries/ostrich/backend/util/CMakeLists.txt
@@ -1,6 +1,7 @@
include_directories(.. ${CMAKE_CURRENT_BINARY_DIR}/..)
-add_library(marmotta_util murmur3.cc murmur3.h split.cc split.h iterator.h unique.h time_logger.cc time_logger.h)
+add_library(marmotta_util murmur3.cc murmur3.h split.cc split.h
+ iterator.h unique.h time_logger.cc time_logger.h threadpool.h)
add_library(marmotta_raptor_util raptor_util.h raptor_util.cc)
target_link_libraries(marmotta_raptor_util marmotta_model ${CMAKE_THREAD_LIBS_INIT} ${RAPTOR_LIBRARY} ${GFLAGS_LIBRARY})
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/marmotta/blob/d811ef38/libraries/ostrich/backend/util/threadpool.h
----------------------------------------------------------------------
diff --git a/libraries/ostrich/backend/util/threadpool.h b/libraries/ostrich/backend/util/threadpool.h
new file mode 100644
index 0000000..9047aad
--- /dev/null
+++ b/libraries/ostrich/backend/util/threadpool.h
@@ -0,0 +1,251 @@
+/*********************************************************
+*
+* Copyright (C) 2014 by Vitaliy Vitsentiy
+*
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*
+*********************************************************/
+
+
+#ifndef __ctpl_stl_thread_pool_H__
+#define __ctpl_stl_thread_pool_H__
+
+#include <functional>
+#include <thread>
+#include <atomic>
+#include <vector>
+#include <memory>
+#include <exception>
+#include <future>
+#include <mutex>
+#include <queue>
+
+
+
+// thread pool to run user's functors with signature
+// ret func(int id, other_params)
+// where id is the index of the thread that runs the functor
+// ret is some return type
+
+
+namespace ctpl {
+
+namespace detail {
+template <typename T>
+class Queue {
+ public:
+ bool push(T const & value) {
+ std::unique_lock<std::mutex> lock(this->mutex);
+ this->q.push(value);
+ return true;
+ }
+ // deletes the retrieved element, do not use for non integral types
+ bool pop(T & v) {
+ std::unique_lock<std::mutex> lock(this->mutex);
+ if (this->q.empty())
+ return false;
+ v = this->q.front();
+ this->q.pop();
+ return true;
+ }
+ bool empty() {
+ std::unique_lock<std::mutex> lock(this->mutex);
+ return this->q.empty();
+ }
+ private:
+ std::queue<T> q;
+ std::mutex mutex;
+};
+}
+
+class thread_pool {
+
+ public:
+
+ thread_pool() { this->init(); }
+ thread_pool(int nThreads) { this->init(); this->resize(nThreads); }
+
+ // the destructor waits for all the functions in the queue to be finished
+ ~thread_pool() {
+ this->stop(true);
+ }
+
+ // get the number of running threads in the pool
+ int size() { return static_cast<int>(this->threads.size()); }
+
+ // number of idle threads
+ int n_idle() { return this->nWaiting; }
+ std::thread & get_thread(int i) { return *this->threads[i]; }
+
+ // change the number of threads in the pool
+ // should be called from one thread, otherwise be careful to not interleave, also with this->stop()
+ // nThreads must be >= 0
+ void resize(int nThreads) {
+ if (!this->isStop && !this->isDone) {
+ int oldNThreads = static_cast<int>(this->threads.size());
+ if (oldNThreads <= nThreads) { // if the number of threads is increased
+ this->threads.resize(nThreads);
+ this->flags.resize(nThreads);
+
+ for (int i = oldNThreads; i < nThreads; ++i) {
+ this->flags[i] = std::make_shared<std::atomic<bool>>(false);
+ this->set_thread(i);
+ }
+ }
+ else { // the number of threads is decreased
+ for (int i = oldNThreads - 1; i >= nThreads; --i) {
+ *this->flags[i] = true; // this thread will finish
+ this->threads[i]->detach();
+ }
+ {
+ // stop the detached threads that were waiting
+ std::unique_lock<std::mutex> lock(this->mutex);
+ this->cv.notify_all();
+ }
+ this->threads.resize(nThreads); // safe to delete because the threads are detached
+ this->flags.resize(nThreads); // safe to delete because the threads have copies of shared_ptr of the flags, not originals
+ }
+ }
+ }
+
+ // empty the queue
+ void clear_queue() {
+ std::function<void(int id)> * _f;
+ while (this->q.pop(_f))
+ delete _f; // empty the queue
+ }
+
+ // pops a functional wrapper to the original function
+ std::function<void(int)> pop() {
+ std::function<void(int id)> * _f = nullptr;
+ this->q.pop(_f);
+ std::unique_ptr<std::function<void(int id)>> func(_f); // at return, delete the function even if an exception occurred
+ std::function<void(int)> f;
+ if (_f)
+ f = *_f;
+ return f;
+ }
+
+ // wait for all computing threads to finish and stop all threads
+ // may be called asynchronously to not pause the calling thread while waiting
+ // if isWait == true, all the functions in the queue are run, otherwise the queue is cleared without running the functions
+ void stop(bool isWait = false) {
+ if (!isWait) {
+ if (this->isStop)
+ return;
+ this->isStop = true;
+ for (int i = 0, n = this->size(); i < n; ++i) {
+ *this->flags[i] = true; // command the threads to stop
+ }
+ this->clear_queue(); // empty the queue
+ }
+ else {
+ if (this->isDone || this->isStop)
+ return;
+ this->isDone = true; // give the waiting threads a command to finish
+ }
+ {
+ std::unique_lock<std::mutex> lock(this->mutex);
+ this->cv.notify_all(); // stop all waiting threads
+ }
+ for (int i = 0; i < static_cast<int>(this->threads.size()); ++i) { // wait for the computing threads to finish
+ if (this->threads[i]->joinable())
+ this->threads[i]->join();
+ }
+ // if there were no threads in the pool but some functors in the queue, the functors are not deleted by the threads
+ // therefore delete them here
+ this->clear_queue();
+ this->threads.clear();
+ this->flags.clear();
+ }
+
+ template<typename F, typename... Rest>
+ auto push(F && f, Rest&&... rest) ->std::future<decltype(f(0, rest...))> {
+ auto pck = std::make_shared<std::packaged_task<decltype(f(0, rest...))(int)>>(
+ std::bind(std::forward<F>(f), std::placeholders::_1, std::forward<Rest>(rest)...)
+ );
+ auto _f = new std::function<void(int id)>([pck](int id) {
+ (*pck)(id);
+ });
+ this->q.push(_f);
+ std::unique_lock<std::mutex> lock(this->mutex);
+ this->cv.notify_one();
+ return pck->get_future();
+ }
+
+ // run the user's function that excepts argument int - id of the running thread. returned value is templatized
+ // operator returns std::future, where the user can get the result and rethrow the catched exceptins
+ template<typename F>
+ auto push(F && f) ->std::future<decltype(f(0))> {
+ auto pck = std::make_shared<std::packaged_task<decltype(f(0))(int)>>(std::forward<F>(f));
+ auto _f = new std::function<void(int id)>([pck](int id) {
+ (*pck)(id);
+ });
+ this->q.push(_f);
+ std::unique_lock<std::mutex> lock(this->mutex);
+ this->cv.notify_one();
+ return pck->get_future();
+ }
+
+
+ private:
+
+ // deleted
+ thread_pool(const thread_pool &);// = delete;
+ thread_pool(thread_pool &&);// = delete;
+ thread_pool & operator=(const thread_pool &);// = delete;
+ thread_pool & operator=(thread_pool &&);// = delete;
+
+ void set_thread(int i) {
+ std::shared_ptr<std::atomic<bool>> flag(this->flags[i]); // a copy of the shared ptr to the flag
+ auto f = [this, i, flag/* a copy of the shared ptr to the flag */]() {
+ std::atomic<bool> & _flag = *flag;
+ std::function<void(int id)> * _f;
+ bool isPop = this->q.pop(_f);
+ while (true) {
+ while (isPop) { // if there is anything in the queue
+ std::unique_ptr<std::function<void(int id)>> func(_f); // at return, delete the function even if an exception occurred
+ (*_f)(i);
+ if (_flag)
+ return; // the thread is wanted to stop, return even if the queue is not empty yet
+ else
+ isPop = this->q.pop(_f);
+ }
+ // the queue is empty here, wait for the next command
+ std::unique_lock<std::mutex> lock(this->mutex);
+ ++this->nWaiting;
+ this->cv.wait(lock, [this, &_f, &isPop, &_flag](){ isPop = this->q.pop(_f); return isPop || this->isDone || _flag; });
+ --this->nWaiting;
+ if (!isPop)
+ return; // if the queue is empty and this->isDone == true or *flag then return
+ }
+ };
+ this->threads[i].reset(new std::thread(f)); // compiler may not support std::make_unique()
+ }
+
+ void init() { this->nWaiting = 0; this->isStop = false; this->isDone = false; }
+
+ std::vector<std::unique_ptr<std::thread>> threads;
+ std::vector<std::shared_ptr<std::atomic<bool>>> flags;
+ detail::Queue<std::function<void(int id)> *> q;
+ std::atomic<bool> isDone;
+ std::atomic<bool> isStop;
+ std::atomic<int> nWaiting; // how many threads are waiting
+
+ std::mutex mutex;
+ std::condition_variable cv;
+};
+
+}
+
+#endif // __ctpl_stl_thread_pool_H__
\ No newline at end of file